开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):CreateDirectStream 消费数据补充】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11628
CreateDirectStream 消费数据补充
内容介绍:
一、KafkaUtils.createStream 的特点
二、KafkaUtils.createStream 的代码实现
一、KafkaUtils.createStream 的特点
构造函数为
KafkaUtils.createStream(ssc,[zk],[consumergroup id],[per-topic,partitions])使用了receivers 来接收数据,利用的是Kafka 高层次的消费者api,对于所有的receivers 接收到的数据将会保存在spark executors 中,然后通过Spark Streaming 启动job 来处理这些数据,默认会丢失,可启用WAL 日志,该日志存储在HDFS 上。
创建一个 receiver 来对 kafka 进行定时拉取数据,ssc 的 rdd
分区和 kafka 的 topic 分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver 中消费 topic 的线程数,并不增加 spark 的并行处理数据数量。
1、对于不同的 group 和 topic 可以使用多个 receivers 创建不同
的DStream。
2、如果启用了WAL,需要设置存储级别,即
KafkaUtils.createStream(….„,StorageLevel.MEMORY_AND_DISK_SER)
二、KafkaUtils.createStream 的代码实现
package org.apache.spark.stream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
object KafkaWordCount{
val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) =>{
iter.flatMap{case(x,y,z)=>Some(y.sum+z.getOrElse(0)).map(v => (x, v))}
def main(args: Array[String]){
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
/∥回滚点设置在本地
//ssc.checkpoint("./")
//将回滚点写到 hdfs
ssc.checkpoint("hdfs://node1:9000/streamcheckpoint")
groupld,
val Array(zkQuorum,groupld,topics,num Threads)=Array[String]("node2:2181,node3:2181,node4:2181", "g1", "wangsf-test", "2")
val topicMap =topics.split(",").map((_,numThreads.tolnt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, groupld, topicMap).map(_._2)
val results = lines.flatMap(_.split(" ")).map((, 1)).updateStateBykey(updateFunction, newHashPartitioner(ssc.sparkContext.defaultParallelism), true)
results.foreachRDD(x => x.foreach(printIn))
ssc.start()
ssc.awaitTermination()
}
}
其中“回滚点”设置与否都可