深入 rdd-初始案例-代码编写 | 学习笔记

简介: 快速学习 深入 rdd-初始案例-代码编写

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段深入 rdd-初始案例-代码编写】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11958


深入 rdd-初始案例-代码编写


内容介绍:

一、明确步骤

二、创建 spark context

三、读取文件,生成数据集。

四、取出 IP,赋予出现次数为1

五、简单清洗

六、根据 IP 出现次数进行聚合

七、根据 IP 出现次数进行排序

八、取出结果,打印结果

九、运行结果

十、回顾

 

一、明确步骤

在编写代码之前,先明确一下具体步骤:

第一步:需要创建一个 spark context

第二步:读取文件,生成数据集。

第三步:取出 IP,赋予出现次数为1

第四步:简单清洗。

第五步:根据 IP 和出现次数进行聚合。

第六步:根据 IP 出现次数进行排序。

第七步:取出结果,打印结果。

 

二、创建 spark context

设置 spark 接受参数,使用 sparkconf 来进行包装,应该在里面设置俩个参数,第一是设置 appnamemaster。命名为 conf,传入 spark context 中。到此完成创建。

第一步:创建 sparkcontext

Val conf = new SparkConf().setMaster("local[6]").setAppName("ip_agg")

Val Sc = new Spark Context(conf)

 

三、读取文件,生成数据集

使用 sc textc 的方法进行读取,目录应该是 detaset 下的 access log sample.txt,复制过来之后,删除无用的目录,留下 sample.xt,生成 rdd 对应的名字,生成 source rdd 来表示数据源。

第二步:读取文件,生成数据集

ValsourceRDD= sc.textFile( path="dataset/access_log_sample.txt")

 

四、取出 IP,赋予出现次数为1

数据源对应 access sample 的文件,在 source rdd 中进行一个转换,取出 IP,生成元祖。access sample 中每一个数据对应了一个 item。只是以字符串的形式出现。Item 是一个字符串,要取出 IP 的话,item 按照空格+split。取出第0项即为 IP,最终元祖赋予其出现次数为1IP 只出现了一次,出现次数即为1

第三步:取出 IP,赋予出现次数为1

val ip RDD =sourceRDD.map(item=> (item.split(regex="")(0), 1))

 

五、简单清洗

元组最终会全部聚合起来,在聚合之前,应该进行简单清洗。清洗有两件事要做,第一步是去除空数据,第二步是去掉非法数据。根据业务再规整一些数据。

在此实例中,为了简单操作,仅去除一些空数据。

去除一部分数据,拿到 ip rdd 后,用 filter 算子去掉一部分数据,string int 的数据,第一部分是出现的 IP 地址,第二部分是1,去除数据,是使 IP 为空,IP是字符串类型的数据。使用 al cleanRDD = ipRDD.,意思是传入一个字符串,如果不为空,返回触,那么这个 filter 就将 item 收录进来。因此 isnotempty 中需要接受一个字符串。判断 IP 不为空,就放在数据集中,否则清除。

al cleanRDD = ipRDD.filter(item => StringUtils.isNotEmpty(item._1))

 

六、根据 IP 出现次数进行聚合

输入 cleanrdd 后,使用 reducebyke,里面接受了一个函数,函数有俩个参数。第一个叫做 curr,第二个叫做 agg。使用之后,可以直接使用 curr agg 进行统计。Curr 1agg 应为当前 IP 地址的局部内部结构.聚合完成后,为其生成一个新的 rdd

val ip AggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg)

 

七、根据 IP 出现次数进行排序

按照 IP rdd agg 来进行排序,应按照后面的频率来进行排序。一般默认为升序,因此需在代码中说明此排序应该要降序。此过程中涉及到俩个算子,一个是 filter,为过滤算子,第二个 sortby 是排序算子。

val sortedRDD = ipAggRDD.sortBy(item => item._2, ascending = false)

 

八、取出结果,打印结果

调用 sortedRDD.take(),使用 take 调用其中前十项,取出前十个结果后,打印前十项结果:printinitem)。

sortedRDD.take( num= 10).foreach(item => println(item))


九、运行结果

运行结果如图,拉到最底部如下图:

image.png

图中91.145.130.78出现了17次,下面的数据同理,82开头的数据出现了9次。下面的出现次数依次递减,说明运行结果无误。

 

十、回顾

第一步读取数据集,第二步是取出 IP,赋予初始频率为1,利用 item,利用算子,使用 filter 算子进行过滤。进行聚合之后,可以得到一个元组的结果。

接下来根据 IP 出现次数来进行排序。利用频率来进行排序,需要通过 accending=false 来说明是降序,最终通过 take 来获取前十项。并且输出前十项结果。

defipAgg():Unit=

// 1. 创建 SparkContext

val conf = new SparkConf().setMaster("local[6]").setAppName("ip_agg")val sc = new SparkContext(conf)

// 2.读取文件,生成数据集

valsourceRDD= sc.textFile( path="dataset/access_log_sample.txt")

// 3. 取出 IP,赋予出现次数为1

val ipRDD = sourceRDD.map(item=> (item.split( regex="")(0), 1))

// 4. 简单清洗

// 4.1. 去掉空的数据

// 4.2.去掉非法的数据

// 4.3. 根据业务再规整一下数据

val cleanRDD = ipRDD.filter(item => StringUtils.isNotEmpty(item._1))

// 5. 根据 IP 出现的次数进行聚合

val ipAggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg)

// 6. 根据 IP 出现的次数进行排序

val sortedRDD = ipAggRDD.sortBy(item => item._2, ascending = false)

//7.取出结果,打印结果

sortedRDD.take( num= 10).foreach(item => println(item))I

相关文章
|
JavaScript
VUE3中watch与watchEffect —— 全网最详细系列
VUE3中watch与watchEffect —— 全网最详细系列
|
8月前
|
监控 测试技术 数据库连接
RunnerGo API 性能测试实战:从问题到解决的全链路剖析
API性能测试是保障软件系统稳定性与用户体验的关键环节。本文详细探讨了使用RunnerGo全栈测试平台进行API性能测试的全流程,涵盖测试计划创建、场景设计、执行分析及优化改进。通过电商平台促销活动的实际案例,展示了如何设置测试目标、选择压测模式并分析结果。针对发现的性能瓶颈,提出了代码优化、数据库调优、服务器资源配置和缓存策略等解决方案。最终,系统性能显著提升,满足高并发需求。持续关注与优化API性能,对系统稳定运行至关重要。
|
缓存 网络协议 NoSQL
基于UDP的可靠性传输协议-KCP简介
基于UDP的可靠性传输协议-KCP简介
614 0
|
存储 运维 容灾
带你读《云上自动化运维宝典》——一文详解云上跨可用区容灾解决方案和异地多活能力建设最佳案例(3)
带你读《云上自动化运维宝典》——一文详解云上跨可用区容灾解决方案和异地多活能力建设最佳案例(3)
436 0
|
SQL Java 数据库连接
SQL DISTINCT关键字详解
SQL DISTINCT关键字详解
|
存储 数据库连接 Nacos
在Nacos中,db怎么加密?
在Nacos中,db怎么加密?
522 1
|
Shell C# 调度
WPF PRISM开发入门一( 初始化PRISM WPF程序)
原文:WPF PRISM开发入门一( 初始化PRISM WPF程序) 这篇博客将介绍在WPF项目中引入PRISM框架进行开发的一些基础知识。目前最新的PRISM的版本是Prism 6.1.0,可以在Github上获取PRISM的源码。
4010 0
|
SQL 测试技术 索引
【解决方案 二十六】DateGrip一键生成DML语句用于上线
【解决方案 二十六】DateGrip一键生成DML语句用于上线
411 0