开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据预处理-预处理程序入口】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11629
数据预处理-预处理程序入口
消费kafka 数据
目标:
编写数据预处理程序的主程序入口代码
1、需求:
使用KafkaUtils.createDirectStream 消费lua 生产到kafka 的数据。
2、代码实践:
进入main 后,点击进入scala,找到项目进行数据的预处理dataprocess,之后点击launch 创建一个scala 的Object,输入DataProcessLauncher。以上是进入数据预处理程序的主程序步骤。
//数据预处理程序的主程序
object DataProcessLauncher {
//程序主入口
def main(args: Array[String]): Unit ={
//添加日志级别设置:
在提供的材料中,common 中有一个util,util 中有一个log4j,log4j 中有一个LoggerLevels,在这里已经做好了日志级别的设置,直接用LoggerLevels 设置setStreamingLogLevels。
就可以看到
”spark”,”kafka",”spark.sql",”spark.streaming"已经被设置好了。所以直接运用 LoggerLevels 就可以。
LoggerLevels.setStreamingLogLevels()
//当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
System.setProperty("spark.streaming.stopGracefullyOnShutdown","true")
//1、创建Spark conf
val conf=new SparkConf ().setAppName ("DataProcess").setMaster(“local[2]”)
//开启日志监控功能
.set("spark.metrics.conf.executor.source.jvm.class"
org.apache.spark.metrics.source.JvmSource")//开启集群监控功能
//在后续的数据处理过程,系统监控必须要求先开启监控功能,以上代码就是为了开启监控功能,为了后续监控做准备。
//2、创建SparkContext
val sc=new SparkContext(conf)
//3、创建streaming Context
val ssc=new StreamingContext(sc,Seconds(2))
//4、读取kafka 内的数据ssc,kafkaParams,topics)
//jssc: JavaStreamingContext,
//kafkaParams: JMap[String, String],
//topics: JSet[String]
valkafkaParams=Map("bootstrap.servers"->PropertiesUtil.getstringBykey(key = "default.brokers”,propName = "kafkaConfig.properties"))
valtopics=Set(PropertiesUtil.getStringByKey(key=”source.nginx.topic”,propName=”kafkaConfig.properties”))
//set 值:
在数据预处理时,kafka 的配置文件里有一个来自采集服务的原始数据,这里面source.nginx.topic 指的是前段lua 采集,再进行推送。
//接收kafka 的数据(key,value)
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//真正的数据
val kafkaValue=kafkaData.map(_._2)
//读取配置文件的参数:
在common 里面有一个util ,util 里面有一个jedis,jedis 里面有一个PropertiesUtil,其中有一点 getStringByKey 需要给两个参数,一个是 key ,这个key 指的是 default.Brokers,而 value 指的是名称,也就是
kafkaConfig.properties。所以用这个方法来使用即可(两个参数)。
//5、消费数据
kafkaValue.foreachRDD(rdd=>rdd.foreach(println))
//6、开启 streaming 任务+开启循环
ssc.start()
ssc.awaitTermination()
}
}
