阶段练习_代码编写 | 学习笔记

简介: 快速学习 阶段练习_代码编写

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段阶段练习_代码编写】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11988


阶段练习_代码编写


内容介绍

一、学前须知

二、代码编写


一、学前须知

接下来进行代码编写,在代码编写步骤:首先,第一步,拷贝数据集到工程,然后创建类。创建类以后,代码就有容器,就可以编写代码,编写代码以后去运行测试。这是整个的四个步骤。

 

二、代码编写

第一步,如何拷贝数据集。

数据集位置如下图所示:spsrk file 目录,点开 Dateset。并复制选中的 csv 文件。

image.png

默认情况下 CSV 文件当中有一个 header,第一行是头,这个头应该去掉。如果要使用代码来去掉稍微有一点繁琐。容易思路上开小差,所以要把第一个 header 去掉,然后生成一个新的数据。使用这个 header 的数据就可以了。

csv 文件复制到工程当中。放入 dateset 目录。

image.png

创建StagePractice类,Stage阶段的类。

image.png

编写代码如下:

package cn.itcast.spark.rdd

import org.apache.spark.{SparkConf,SparkContext}

import org.junit.Test

class StagePractice{

@Test  //导入 notation

def pmProcess(():Unit={//创建方法 pmProcess

// 1.创建 sc 对象

val conf =new SparkConf().setMaster("local[6]").setAppName("stage practice"

//new 一个 SparkConf(),传递2个参数:设置Master,本地位置为6,另外设置 AppName ,名称为:stage practice

val sc=newSparkContext(conf)  //创建 Spark 对象,把 conf 放到 SparkContext 中。

// 2.读取文件

val source =sc.textFile( path="dataset/BeijingPM20100101_20151231_noheader.csv")   // textfile 的形式读取文件。

CSV 文件和普通的文件最大的区别是,它也是分行的,这一点和普通文件一样。最大的区别是 CSV 文件以逗号作为分隔符,每一行当中都有固定数目的列数,列之间使用逗号分隔。文件在 dataset 下,复制地址。同时注意命名规范。

image.png

//3.通过算子处理数据

NA 表示没有值,需要过滤清洗。然后按照年月进行 Reduce by key,再去 sort 排序一下总和,出来结果。

步骤为:

image.png

// 1.抽取数据,年,月,PM,返回结果:((年,月)PM)

// 2.清洗 ,过滤掉空的字符串,过滤掉NA

// 3.聚合 ,

// 4.排序

source.map( item => ((item.split( regex = ",")(1), //调用 map 方法 ,并且接收字符串。

item.split( regex = ",")(2)),  //使用逗号 split 出来。

item.split( regex = ",")(6))   //把年,月同时当做 key ,下标为一的项是年,下标为2的项是月,在 value 部分选下标为6的项为 PM 。元组统一当做 key 处理。.filter(item=>StringUtils.isNotEmpty(item._2)&&!item._2.equalsIgnoreCase(anotherString="NA"))

//先进行判空,判断 isNotEmpty,空就收录结果,item._2前边的数据整个作为 item 的第二项数据传输 。

IgnoreCase 忽略两个大小写的判断两个字符串大小相等。相等就收录的逻辑不正确,故前面加 !。

.map(item =>(item._1,item._2.toInt))

//第二项 NA 是字符串类型,故需要转换,则第一项没变,第二项转为 int .

.reduceByKey((curr,agg)=> curr+agg)

//因为是 ByKey 。故先按照 key 来进行分组,然后把每一组里面的  value  聚合起来。直接 curr+agg  聚合起来。

.sortBy(item=> item._2)

//将元组的第二项返回,就会按照第二项 pm 进行排序

// 4.获取结果

resultRDD.take(num=10).foreach(item=> println(item))

//foreach 打印所有的数据,因为太对,故只取10项。拿到里面每一项数据后 ,println 进行打印。

// 5.运行测试

sc.stop()

//在运行结束后关掉 sc

}

运行结果如下:

image.png

第一项是 pm 值最高的2015年,36451,第二项是37612,注意到 topten 有一点点反,故在.sortBy(item=> item._2) 改为

.sortBy( item => item. 2, ascending = false),

再次运行后结果没有问题。

相关文章
|
存储 JSON druid
使用阿里云相关组件总结
总结我个人使用过的阿里组件
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之在DataWorks同步数据时,遇到乱码问题,该怎么解决(rest api数据源)
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
240 0
|
敏捷开发 测试技术
探索自动化测试在软件开发中的关键作用
【7月更文挑战第26天】本文深入探讨了自动化测试在软件开发生命周期中的不可或缺的角色,分析了它如何优化开发流程、提升软件质量和加快产品上市时间。通过比较手动测试与自动化测试的差异,文章揭示了自动化测试的显著优势和面临的挑战,并提供了实施自动化测试策略的实用建议。
|
存储 JavaScript 前端开发
在?聊聊浏览器事件循环机制
在?聊聊浏览器事件循环机制
170 0
|
DataWorks 关系型数据库 Java
DataWorks操作报错合集之实时同步能启动,但是不能同数据,错误提示"Thereplicaidentityoftablesyouselectedisnotfull,pleasealtertablefirst.need alter table"表示什么意思
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
机器学习/深度学习 数据采集 弹性计算
阿里云服务器多种登录方法
介绍了如何使用网页、putty客户端、jupyter远程来登录服务器。还有不到2元租到RTX2070+155G内存+40核CPU的服务器
阿里云服务器多种登录方法
|
编译器
【单链表增删查改接口的实现】
【单链表增删查改接口的实现】
199 0
|
设计模式 算法 安全
并发设计模式 之 CAS算法
并发设计模式 之 CAS算法
280 0
HTML知识积累及实践(三) - 列表标签
HTML知识积累及实践(三) - 列表标签
157 0
HTML知识积累及实践(三) - 列表标签
|
关系型数据库 测试技术 数据库
PostgreSQL使用函数实现merge功能
PostgreSQL使用函数实现merge功能 实验环境 操作系统:windows 10 家庭中文版 数据库系统: PostgreSQL 9.6.2 说明 oracle数据库中有merge函数,可在插入数据前判断:如果指定列数据不存在,则正常插入数据;如果指定列数据存在,则将此条数据更新为插入的数据。
2813 0