开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-封装ProcessedData下】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11669
数据预处理-封装ProcessedData下
内容介绍:
一、封装 ProcessedData
二、总结
一、封装 ProcessedData
流程中进行结构化,接下来进行数据推送,将数据推送到 Kafka 中,推送模块做数据推送工作
推送数据到 Kafka 中,数据与数据用#CS#进行拼接
所看到的数据都是逗号的
数据结构化之后将数据推送到 Kafka 中
结果显示的数据是空格的、逗号拼接的、有空数据的
原因是有一段代码为读取
进入 ProcessedData 中,之前将下方数据传入、封装
case class ProcessedData(sourceData: string,requestwethod: string,request: string,
remoteAddr: string,httpUserAgent: string, timeIso8601: string,
serverAddr: string, highFrqIPGroup: Boolean,
requestType: RequestType,travelType: TravelTypeEnum,
requestParams: CoreRequestParams, cookievalue_JSESSIONID: string, cookievalue_USERID:string,
queryRequestData: option[ QueryRequestData], bookRequestData: option[BookRequestData],httpReferrer: string) {
ProcessedData
往下查看
toKafkaString 方法,有一个默认参数#CS#
def toKafkastring(fieldseparator: string = ""cs#"): string ={
数据采集完,将数据按照#CS#进行拼接
两个数据之间用#CS#进行拼接,拼接完成后打入 Kafka 中,消息对接 Kafka 集群中是#CS#拼接的,数据结构化后数据推送也要将数据打入 Kafka 中,也需要#CS#进行拼接
提供 toKafkaString 方法,将上方参数每两个之间用#CS#进行拼接,fieldseparator 是一个默认参数,
//_0-原始数据
sourceData.repEmptystr()+fieldSeparator+
//_1-请求类型GET/POST
requestMethod.repEmptystr()+fieldseparator+
//_2-请求http://xxxxx
request.repEmptystr()+ fieldseparator+
//_3-客户端地址(IP)
remoteAddr.repEmptystr()+fieldseparator+
//_4-客户端浏览器(UA)
httpUserAgent.repEmptystr()+fieldseparator+
//_5-服务器时间的ISO 8610格式
timeIso8601.repEmptystr()+fieldseparator+
//_6-服务器地址
serverAddr.repEmptystr()+fieldSeparator+
//_7-是否属于高频IP段
highFrqIPGroup+fieldSeparator+
//_8-航班类型-National/International/other
requestType.flightType+fieldSeparator+
//_9-消求行为-Query/Book/other
requestType.behaviorType+fieldseparator+
//_10-行程类型-OneWay/RoundTrip/Unknown
travelType+fieldseparator+
//_11-航班日期 -requestParams.flightDate.repEmptystr()
+fieldseparator
//_12-始发地 -
requestParams.depcity.repEmptystr()+fieldseparator+
//13-目的地 -
requestParams.arrcity.repEmptystr(()+fieldseparator+
//14-关键cookie-JSESSIONID
cookieValue JSESSIONID.repEmptystr()+fieldseparator+
//15-关键Cookie-用户ID
cookieValue USERID.repEmptystr()+fieldseparator+
//16-解析的查询参数对象JSON
aueryReauestDatastr.repEmptystr()+fieldseparator+
//17-解析的购票参数对象JSON
bookRequestDataStr.repEmptystr()+fieldseparator+
//18-当前请求是从哪个请求跳转过来的 httpReferrer.repEmptystr()
}
}
里面全部都是#CS#,字段、变量就是#CS#,sourceData是第一个数据,repEmptyStr是StringUtils,implicit隐式转换
implicitclass stringutils(s: string) {
def repEmptystr(replacement: string =“NULL"): string = {
if (s.isEmpty) replacement else s
}
}
调用 repEmptyStr 方法时,在 repEmptyStr 调用,调用隐式转换,传入一个参数,默认 NULL,调用隐式转换,如果数据是空的,返回 NULL,如果不是空的,是什么就返回什么,在调用sourceData时,如果是空,返回 NULL,不是空是什么就返回什么,即原始数据是空的返回 NULL,加一个#CS#
如果 requestMethod 是空的,替换成 NULL,不是空的是什么就返回什么,拼接一个#CS#,依次到最后都是这样将前面传递的参数判断是否为空,如果是空,则返回 NULL,不是空的是什么就返回什么,两个字段用#CS#拼接,方法的调用称为 toKafkaString
查看效果
之前返回的是 processedData,调用 toKafkaString
processedData.toKafkaString()
执行出现#CS#拼接
解析 processedData 调用 toKafkaString,转化成可以推动到 Kafka 中的数据类型
NULL 是因为,前面给的数据是空 str=””
将数据结构化的代码全部读取完成
二、总结
1.目标
将经过拆分的数据和与前面计算出来的各个指标(国内查询、国际预定、单程、往返、解析出的数据、是否是历史爬虫)封装成 ProcessedData
2.思路与关键代码
(1)根据 ProcessedData 所需的参数,提供参数,准备数据
(2)参数中缺少 requestParams: CoreRequestParams,此参数需要使用经过解析后的数据(出发地、目的地、起飞时间)进行封装
封装之前解析三个数据
var depcity=""
var arrcity =""
var flightDate=""
//在查询解析后的数据中抽取出出发地、目的地、起飞时间queryRequestData match {
case Some(datas)=>
depcity=datas.depCity
arrcity=datas.arrCity
flightDate=datas.flightDate
case None=>
}
解析三个数据,封装核心解析参数
实际最终代码,封装
val requestParams:CoreRequestParams=CoreRequestParams(
flightDate ,depcity,arrcity)
(3)封装好 CoreRequestParams 后所有参数就绪,封装 ProcessedData,并返回processedData("",requestMethod requestUr1,remoteAddr,httpUserAgent,timeiso8601,serverAddr,isFreIP,
requestType,travelType,requestPar ams , cookievalue_SESSIONID,cookievalue_USERID, queryRequestData,
bookRequestData,httpReferrer)
(4)ProcessedData 内的 toKafkaString 方法将每个属性之间使用"#CS#"进行拼接,最终形成一个字符串
processedData.toKafkaString()
第一步根据所需要的参数准备数据,第二部数据准备后缺少参数,分析核心解析参数,在查询解析后的数据中抽取出出发地、目的地、起飞时间,对出发地、目的地、起飞时间进行封装,分装 ProcessedData 所有参数,使用toKafkaString 方法拿到用#CS#拼接的数据,以上即为数据结构化的过程。