数据预处理-数据推送-效果与总结|学习笔记

简介: 快速学习数据预处理-数据推送-效果与总结

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段数据预处理-数据推送-效果与总结】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/672/detail/11672


数据预处理-数据推送-效果与总结


数据推送-效果与总结

代码写完后,查看实际效果

程序运行,进入代码中,过滤出纯查询的数据,找到topic,点击 target.query.topic 是 processedQuery,拿到参数,写入 topic 中,会看到 topic 中有数据查看当前目录,在kafka目录下查看数据中有哪些 topic 只有 test、test01和推送数据的 topic

1.png

没有配置文件中的 processedQuery 运行程序,数据会增加一个 topic,名字为 processedQuery 运行反爬虫项目,预处理

1.png

结果未报错,运行爬虫,右键 run 执行爬虫

1.png

数据未报错,输出 ProcessedData

1.png

新的 ProcessedData 出现

Kafka 的topic创建出来,将 topic 名称换为 processedQuery,回车有数据

1.png

查看 processedQuery 数据,出现很多数据查看数据预处理程序

1.png

数据预处理程序刷新,左侧程序也刷新推送查询数据效果已看到,代码流程都没有问题,流程已跑通以上是推送查询类数据

预定类推送数据,推送预定类数据流程与推送查询类数据流程很像,只需要改动几个参数

Datasend.sendBookDataToKafka()

数据是 DataProcessDatasend 方法还未创建,进入方法中,复制发送查询的代码,更改参数

//将预定的数据推送到预定的 topic

defsendBookDataToKafka(DataProcess:RDD[ProcessedData]):

unit={

//过滤出纯预定的数据(过滤掉查询的数据)使用"#CS#”进行拼接

ValbookDatas=DataProcess.filter(message=message.

requestType.behaviorType==BehaviorTypeEnum.Book).map(messag

e-message.toKafkastring())

//将数据推送到 kafka

//1在配置文件中读取预定类的 Topic 到程序中

(#难送预订数据

target.book.topic=processedBook)

ValbookTopic=propertiesutil.getstringByKey(key=

"target.Book.topic",propName="kafkaConfig.properties")

//实例kafka参数

valprops=newutil.HashMap[string,object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI6,Propertie

sutil.getstringByKey(key="default.brokers",propName=

"kafkaconfig.properties"))

props.put(ProducerConfig.KEY_SERIALIZER_CLASs_CONFIG,Proper

tiesutil.getstringByKey(key="default.key_serializer_cla

ss_config",propName=“kafkaconfig.properties"))

props.put(Producercojfig.vALUE_SERIALIZER_CLASs_CONFTG,Prop

ertiesUtil.getstringByKey(key="default.value_serializer

_class_config",propName="kafkaConfig.properties"))

props.put(Producerconfig.BATCH_SIZE_CONFTG,PropertiesUtil.

getstringByKey(key="default.batch_size_config",propName

="kafkaConfig.properties"))

props.put(ProducerConfig.LINGER_MS_CONFIG,Propertiesutil.ge

tstringByKey(key="default.linger_ms_config",propName

="kafkaconfig.properties"))

//遍历数据的分区

BookDatas.foreachPartition(partition=>{

//2创建kafka生产者

valkafkaProducer=newKafkaProducer[string,string](props)

//遍历partition内的数据

partition.foreach(message=>{

//3数据的载体

valrecord=new

ProducerRecord[string,string](bookTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

参数不需要修改,遍历分区改为 bookData,创建生产者、遍历分区的数据没有问题,数据载体 query 改为 book,数据发送和关闭生产者都没有问题

查询类数据与预定类数据的流程是相同的没有预定的数据,所以查看不到效果运行一下,如果查询类的 topic 有数据打入,说明程序代码没有问题,发送 book 代码也没有问题,只不过没有数据运行数据预处理的代码

1.png

右键执行爬虫,开始执行

1.png

程序未报错,流程有数据流入,说明推送的代码都没有问题数据推送预定代码流程全部完成

总结:数据推送(查询类)

目标:根据业务场景,将相应的数据推送到相应的 topic 内。

查询的数据推送到查询的 Topic 内,预定的数据推送到预定的 Topic 内思路与关键代码:

(1)获取到结构化后数据数据,过虑掉预定的数据,只保留查询的数据过滤出纯查询的数据(过滤掉预定的数据)使用"#CS#”进行拼接

valqueryDatas=DataProcess.filter(message=message.

requestType.behaviorType==BehaviorTypeEnum.Query).map(messa

ge-message.toKafkastring())

(2)在配置文件中读取查询类的Topic到程序中

valqueryTopic=propertiesutil.getstringByKey(key="target.

query.topic",propName="kafkaConfig.properties")

(1)创建 kafka 生产者

(2)数据的载体

(3)数据的发送

(4)关闭生成者

//遍历数据的分区

queryDatas.foreachPartition(partition=>{

//2创建 kafka 生产者

valkafkaProducer=newKafkaProducer[string,string](props)

//遍历 partition 内的数据

partition.foreach(message=>{

//3数据的载体

valrecord=new

ProducerRecord[string,string](queryTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

创建生产者、数据载体、数据发送、关闭生产者整体拷贝,有整体感,推送预定类的数据流程是相同的数据推送(预定类)

目标:根据业务场景,将相应的数据推送到相应的 topic 内。

查询的数据推送到查询的 Topic 内,预定的数据推送到预定的 Topic内思路与关键代码:

(1)获取到结构化后数据数据,过滤掉查询的数据,只保留预定的数据

ValbookDatas=DataProcess.filter(message=message.

requestType.behaviorType==BehaviorTypeEnum.Book).map(messag

e-message.toKafkastring())

(2)在配置文件中读取预定类的 Topic 到程序中

ValbookTopic=propertiesutil.getstringByKey(key=

"target.Book.topic",propName="kafkaConfig.properties")

(3)创建 kafka 生产者

(4)数据的载体

(5)数据的发送

(6)关闭生成者

//遍历数据的分区

BookDatas.foreachPartition(partition=>{

//2创建 kafka 生产者

valkafkaProducer=newKafkaProducer[string,string](props)

//遍历 partition 内的数据

partition.foreach(message=>{

//3数据的载体

valrecord=new

ProducerRecord[string,string](bookTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

数据推送模块查询类与预定类的数据推送全部完成

相关文章
|
Linux 测试技术 Shell
Linux expect命令详解
在Linux系统中,expect 是一款非常有用的工具,它允许用户自动化与需要用户输入进行交互的程序。本文将深入探讨expect命令的基本语法、使用方法以及一些最佳实践。
951 5
Linux expect命令详解
|
小程序 前端开发 IDE
校园二手书交易小程序源码下载
校园二手书交易小程序有四个模块:首页、发布、消息和我的。用户可以在小程序上进行二手书交易、扫码或者输入ISBN发布二手书、用户之间可以发送聊天消息,同时小程序支持购买书籍后跑腿兼职配送,以及对订单评价等多个特色功能。
524 0
校园二手书交易小程序源码下载
|
前端开发
html 格式
【10月更文挑战第14天】html 格式
1092 4
|
存储 架构师 安全
【亲测有用】数据中台数据安全管理能力演示(更新篇)
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
|
域名解析 缓存 网络协议
DNS解析过程详解
【10月更文挑战第11天】 DNS(域名系统)解析过程是将域名转换为IP地址的关键步骤。客户端输入域名后,本地DNS服务器先检查缓存,如有记录则直接返回IP地址;否则依次向根DNS服务器、顶级域名服务器和权威DNS服务器查询,最终获取并缓存IP地址,返回给客户端,实现域名解析。这一过程确保了用户通过域名方便访问互联网资源。
1218 59
|
缓存 前端开发 JavaScript
JavaScript加载优化
JavaScript加载优化
|
网络协议 Linux Shell
CentOS7系统命令学习笔记(一)
CentOS7系统命令学习笔记(一)
482 12
|
安全 jenkins 持续交付
如何在 Jenkins 中配置邮件通知?
如何在 Jenkins 中配置邮件通知?
904 11
|
机器学习/深度学习 自然语言处理 自动驾驶
CNN的魅力:探索卷积神经网络的无限可能
卷积神经网络(Convolutional Neural Networks, CNN)作为人工智能的重要分支,在图像识别、自然语言处理、医疗诊断及自动驾驶等领域展现了卓越性能。本文将介绍CNN的起源、独特优势及其广泛应用,并通过具体代码示例展示如何使用TensorFlow和Keras构建和训练CNN模型。
|
安全 虚拟化 Python
和信创天云桌面系统_命令执行_任意文件上传
和信创天云桌面系统_命令执行_任意文件上传
和信创天云桌面系统_命令执行_任意文件上传