开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):预处理及识别代码架构介绍】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11630
数据预处理-预处理程序入口优化
数据预处理-预处理程序入口优化
1、数据预处理程序的主程序
object DataProcessLauncher {
//程序主入口
def main(args: Array[String]): Unit ={
//添加日志级别设置
LoggerLevels.setStreamingLogLevels()
//当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
System.setProperty("spark.streaming.stopGracefullyOnShutdown","true")
//1、创建 Spark conf
valconf=newSparkConf().setAppName
("DataProcess").setMaster(“local[2]”)
//开启日志监控功能
.set("spark.metrics.conf.executor.source.jvm.class",
“org.apache.spark.metrics.source.JvmSource")//开启集群监控功能
//2、创建SparkContext
val sc=new SparkContext(conf)
//3、创建streaming Context
val ssc=new StreamingContext(sc,Seconds(2))
//4、读取kafka 内的数据ssc,kafkaParams,topics)
//jssc: JavaStreamingContext,
//kafkaParams: JMap[String, String],
//topics: JSet[String]
valkafkaParams=Map("bootstrap.servers
"->PropertiesUtil.getstringBykey(key="default.brokers”,propName = "kafkaConfig.properties"))
valtopics=Set(PropertiesUtil.getStringByKey(key=”source.nginx.topic”,propName=”kafkaConfig.properties”))
//接收kafka 的数据(key,value)
KafkaUtils.createDirectStream[String,String,
StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//真正的数据
val kafkaValue=kafkaData.map(_._2)
//5、消费数据
kafkaValue.foreachRDD(rdd=>rdd.foreach(println))
//数据预处理的程序
Val ssc=setupSsc(sc,kafkaParams,topics)
//6、开启 streaming 任务+开启循环
ssc.start()
ssc.awaitTermination()
}
}
2、数据预处理的程序
defsetupSsc(ssc:SparkContext,kafkaParams:Map[String, String], topics: set[String]):StreamingContext = {
//3、创建 streaming Context
val ssc=new StreamingContext(sc,Seconds(2))
//4、读取kafka 内的数据ssc,kafkaParams,topics)
//jssc: JavaStreamingContext,
//kafkaParams: JMap[String, String],
//topics: JSet[String]
valkafkaParams=Map("bootstrap.servers
"->PropertiesUtil.getstringBykey(key="default.brokers”,propName = "kafkaConfig.properties"))
valtopics=Set(PropertiesUtil.getStringByKey(key
=”source.nginx.topic”,propName=”kafkaConfig.properties”))
//接收kafka 的数据(key,value)
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//真正的数据
val kafkaValue=kafkaData.map(_._2)
//5、消费数据
kafkaValue.foreachRDD(rdd=>rdd.foreach(println))
Ssc
}
将以上代码进行运行。