学习感悟
(1)配置环境最费劲
(2)动手写,动手写,动手写
WordCount
package wordcount import org.apache.spark.{SparkConf, SparkContext} /** * @author CBeann * @create 2019-08-10 18:02 */ object WordCount { def main(args: Array[String]): Unit = { //创建SparkConf()并且设置App名称 val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann") //创建SparkContext val sc = new SparkContext(conf); //使用sc创建rdd并且执行相应的tranformation和action val data = sc.textFile("C:\\Users\\Lenovo\\Desktop\\leetcode.txt") //操作 val result = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _,1) //控制台打印 result.collect().foreach(println _) //保存 result.saveAsTextFile("F:\\temp\\aa") sc.stop() println("-----over-----") } }
排序
第一种方式:按照某一字段排序 val result = data.sortBy(_._2, false) 第二种方式:用类继承Ordered val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)
package mysort import org.apache.spark.{SparkConf, SparkContext} /** * @author CBeann * @create 2019-08-10 18:26 */ object MysortDemo { def main(args: Array[String]): Unit = { //创建SparkConf()并且设置App名称 val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann") //创建SparkContext val sc = new SparkContext(conf); //使用sc创建rdd并且执行相应的tranformation和action val data = sc.makeRDD(List(("张三", 10, 14), ("张三", 9, 9), ("张三", 13, 15))) // //第一种方式:按照某一字段排序 // val result = data.sortBy(_._2, false) //第二种方式:用类继承Ordered val result =data.sortBy(x => Boy(x._1,x._2,x._3),false) //控制台打印 result.collect().foreach(println _) } } case class Boy(name: String, faceVale: Int, age: Int) extends Ordered[Boy]{ override def compare(that: Boy): Int = { if(this.faceVale!=that.faceVale){ this.faceVale-that.faceVale }else{ this.age-that.age } } }
自定义分区
自定义分区器
package mypartition import org.apache.spark.Partitioner import scala.collection.mutable /** * @author CBeann * @create 2019-08-10 18:36 * 自定义分区器,继承Partitioner */ class MyPartitioner extends Partitioner { val map = new mutable.HashMap[String, Int]() map.put("Java", 0) map.put("Scala", 1) map.put("Go", 2) //一共分多少个区 override def numPartitions: Int = map.size //分区的业务逻辑 override def getPartition(key: Any): Int = { map.getOrElse(key.toString, 0) } }
测试类
package mypartition import org.apache.spark.{SparkConf, SparkContext} /** * @author CBeann * @create 2019-08-10 18:59 */ object PartitionDemo { def main(args: Array[String]): Unit = { //创建SparkConf()并且设置App名称 val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann") //创建SparkContext val sc = new SparkContext(conf); val data = sc.makeRDD(List(("Java", 11), ("Java", 9), ("Scala", 13), ("Go", 11))) val result = data.partitionBy(new MyPartitioner) result.saveAsTextFile("F:\\temp\\aaa") println("--------------OVER------------") } }
SparkSQL
person.json
{ "name": "王小二", "age": 15} { "name": "王小三", "age": 25} { "name": "王小四", "age": 35}
测试类
package sparksql import org.apache.spark.sql.SparkSession /** * @author CBeann * @create 2019-08-10 18:20 */ object SparkSqlDemo { def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark SQL basic example").master("local[8]") //.config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._ val df = spark.read.json("E:\\IntelliJ IDEA 2019.1.3Workspace\\ScalaSpark\\SparkDemo\\src\\main\\resources\\json\\person.json") // Displays the content of the DataFrame to stdout df.show() df.filter($"age" > 21).show() df.createOrReplaceTempView("persons") spark.sql("SELECT * FROM persons where age > 21").show() spark.stop() printf("-----over---------") } }
SparkStream
无状态wordcount
package stream import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author CBeann * @create 2019-08-10 18:38 */ object StreamDemo { def main(args: Array[String]): Unit = { //需要新建了一个sparkconf变量 val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]") //新建一个StreamContext入口 val ssc = new StreamingContext(conf, Seconds(5)) //从hostname 机器上的9999短空不断的获取数据 val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999); //val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999)); //处理数据(wordcount) val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) result.print() //启动流式处理程序 ssc.start() //等待你的停止信号 ssc.awaitTermination() printf("--------OVER-------------") } }
有状态wordcount
updateStateByKey方法是关键,传入一个固定参数的方法
package stream.withstatus import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import stream.MyRecever /** * @author CBeann * @create 2019-08-10 19:24 */ object UpdateStateByKeyTest { def main(args: Array[String]): Unit = { //需要新建了一个sparkconf变量 val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]") //新建一个StreamContext入口 val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("F:\\temp\\aaa") //从hostname 机器上的9999短空不断的获取数据 val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999); //处理数据 val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _) //重点 val allresult = result.updateStateByKey(updateFunction) allresult.print() //启动流式处理程序 ssc.start() //等待你的停止信号 ssc.awaitTermination() printf("--------OVER-------------") } //参数列表的类型是固定的,参数名称不是固定的,参数类型是固定的 // currValues是当前批次RDD中相同的key的value集合 //preValue是框架提供的上一次的值 def updateFunction(currValues: Seq[Int], preValue: Option[Int]): Option[Int] = { //当前时间段内的数据 val currValueSum = currValues.sum //当前时间段以前的数据 val oldValueSum = preValue.getOrElse(0) //当前值的和加上历史值 Some(currValueSum+oldValueSum) } }
自定义接收器
package stream import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver /** * @author CBeann * @create 2019-08-10 18:39 */ class MyRecever (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { //recever启动调用的方法 override def onStart(): Unit = { new Thread() { override def run(): Unit = { receive() } }.start() } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while (!isStopped && userInput != null) { // 传送出来 store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } override def onStop(): Unit = ??? }
val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));
pom.xml
<properties> <mysql.version>6.0.5</mysql.version> <spring.version>4.3.6.RELEASE</spring.version> <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version> <log4j.version>1.2.17</log4j.version> <quartz.version>2.2.3</quartz.version> <slf4j.version>1.7.22</slf4j.version> <hibernate.version>5.2.6.Final</hibernate.version> <camel.version>2.18.2</camel.version> <config.version>1.10</config.version> <jackson.version>2.8.6</jackson.version> <servlet.version>3.0.1</servlet.version> <net.sf.json.version>2.4</net.sf.json.version> <activemq.version>5.14.3</activemq.version> <spark.version>2.1.1</spark.version> <scala.version>2.11.11</scala.version> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency>