开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:深入 rdd-初始案例-代码编写】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11958
深入 rdd-初始案例-代码编写
内容介绍:
一、明确步骤
二、创建 spark context
三、读取文件,生成数据集。
四、取出 IP,赋予出现次数为1。
五、简单清洗
六、根据 IP 出现次数进行聚合
七、根据 IP 出现次数进行排序
八、取出结果,打印结果
九、运行结果
十、回顾
一、明确步骤
在编写代码之前,先明确一下具体步骤:
第一步:需要创建一个 spark context。
第二步:读取文件,生成数据集。
第三步:取出 IP,赋予出现次数为1。
第四步:简单清洗。
第五步:根据 IP 和出现次数进行聚合。
第六步:根据 IP 出现次数进行排序。
第七步:取出结果,打印结果。
二、创建 spark context
设置 spark 接受参数,使用 sparkconf 来进行包装,应该在里面设置俩个参数,第一是设置 appname,master。命名为 conf,传入 spark context 中。到此完成创建。
第一步:创建 sparkcontext
Val conf = new SparkConf().setMaster("local[6]").setAppName("ip_agg")
Val Sc = new Spark Context(conf)
三、读取文件,生成数据集
使用 sc 的 textc 的方法进行读取,目录应该是 detaset 下的 access log sample.txt,复制过来之后,删除无用的目录,留下 sample.xt,生成 rdd 对应的名字,生成 source rdd 来表示数据源。
第二步:读取文件,生成数据集
ValsourceRDD= sc.textFile( path="dataset/access_log_sample.txt")
四、取出 IP,赋予出现次数为1
数据源对应 access sample 的文件,在 source rdd 中进行一个转换,取出 IP,生成元祖。access sample 中每一个数据对应了一个 item。只是以字符串的形式出现。Item 是一个字符串,要取出 IP 的话,item 按照空格+split。取出第0项即为 IP,最终元祖赋予其出现次数为1,IP 只出现了一次,出现次数即为1。
第三步:取出 IP,赋予出现次数为1
val ip RDD =sourceRDD.map(item=> (item.split(regex="")(0), 1))
五、简单清洗
元组最终会全部聚合起来,在聚合之前,应该进行简单清洗。清洗有两件事要做,第一步是去除空数据,第二步是去掉非法数据。根据业务再规整一些数据。
在此实例中,为了简单操作,仅去除一些空数据。
去除一部分数据,拿到 ip rdd 后,用 filter 算子去掉一部分数据,string int 的数据,第一部分是出现的 IP 地址,第二部分是1,去除数据,是使 IP 为空,IP是字符串类型的数据。使用 al cleanRDD = ipRDD.,意思是传入一个字符串,如果不为空,返回触,那么这个 filter 就将 item 收录进来。因此 isnotempty 中需要接受一个字符串。判断 IP 不为空,就放在数据集中,否则清除。
al cleanRDD = ipRDD.filter(item => StringUtils.isNotEmpty(item._1))
六、根据 IP 出现次数进行聚合
输入 cleanrdd 后,使用 reducebyke,里面接受了一个函数,函数有俩个参数。第一个叫做 curr,第二个叫做 agg。使用之后,可以直接使用 curr 和 agg 进行统计。Curr 是1,agg 应为当前 IP 地址的局部内部结构.聚合完成后,为其生成一个新的 rdd。
val ip AggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg)
七、根据 IP 出现次数进行排序
按照 IP rdd agg 来进行排序,应按照后面的频率来进行排序。一般默认为升序,因此需在代码中说明此排序应该要降序。此过程中涉及到俩个算子,一个是 filter,为过滤算子,第二个 sortby 是排序算子。
val sortedRDD = ipAggRDD.sortBy(item => item._2, ascending = false)
八、取出结果,打印结果
调用 sortedRDD.take(),使用 take 调用其中前十项,取出前十个结果后,打印前十项结果:printin(item)。
sortedRDD.take( num= 10).foreach(item => println(item))
九、运行结果
运行结果如图,拉到最底部如下图:
图中91.145.130.78出现了17次,下面的数据同理,82开头的数据出现了9次。下面的出现次数依次递减,说明运行结果无误。
十、回顾
第一步读取数据集,第二步是取出 IP,赋予初始频率为1,利用 item,利用算子,使用 filter 算子进行过滤。进行聚合之后,可以得到一个元组的结果。
接下来根据 IP 出现次数来进行排序。利用频率来进行排序,需要通过 accending=false 来说明是降序,最终通过 take 来获取前十项。并且输出前十项结果。
defipAgg():Unit=
// 1. 创建 SparkContext
val conf = new SparkConf().setMaster("local[6]").setAppName("ip_agg")val sc = new SparkContext(conf)
// 2.读取文件,生成数据集
valsourceRDD= sc.textFile( path="dataset/access_log_sample.txt")
// 3. 取出 IP,赋予出现次数为1
val ipRDD = sourceRDD.map(item=> (item.split( regex="")(0), 1))
// 4. 简单清洗
// 4.1. 去掉空的数据
// 4.2.去掉非法的数据
// 4.3. 根据业务再规整一下数据
val cleanRDD = ipRDD.filter(item => StringUtils.isNotEmpty(item._1))
// 5. 根据 IP 出现的次数进行聚合
val ipAggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg)
// 6. 根据 IP 出现的次数进行排序
val sortedRDD = ipAggRDD.sortBy(item => item._2, ascending = false)
//7.取出结果,打印结果
sortedRDD.take( num= 10).foreach(item => println(item))I
