大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节我们完成了如下的内容:


RDD 的创建

从集合创建RDD、从文件创建RDD、从RDD创建RDD

RDD操作算子:Transformation 详细解释

Transformation

RDD的操作算子分为两类:


Transformation,用来对RDD进行转换,这个操作时延迟执行的(或者是Lazy),Transformation,返回一个新的RDD

Action,用来触发RDD的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回int、double、集合(不会返回新的RDD)

上节完成了Transformation

Action

Action 用来触发RDD的计算,得到相关的计算结果


collect()/collectAsMap()

stats/count/mean/stdev/max/min

reduce(func)/fold(func)/aggregate(func)

first() 返回第一个RDD

take(n) 从开头拿一定数量的RDD

top(n) 按照将需或指定排序规则 返回前num个元素

takeSample 返回采样数据

froeach / foreachPartition 与 map、mapPartition类似

saveAsTextFile/saveAsSequenceFile/saveAsObjectFile

Key-Value RDD

RDD整体上分为 Value类型 和 Key-Value类型

前面介绍是 Value类型的RDD操作,实际上使用更多的是Key-Value类型的RDD,也称为 PairRDD


Value类型的RDD操作基本集中在RDD.scala中

Key-Value类型的RDD操作集中在PairRDDFunctions.scala中

创建PairRDD

val arr = (1 to 10).toArray
val arr1 = arr.map(x => (x, x*10, x*100))

# rdd1 不是 Pair RDD
val rdd1 = sc.makeRDD(arr1)

# rdd2 是 Pair RDD
val arr2 = arr.map(x => (x, (x*10, x*100)))
val rdd2 = sc.makeRDD(arr2)

运行查看如下的结果:

Transformation操作

mapValues

# mapValues代码更简洁
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.mapValues(x => 1 to x)
b.collect

运行结果如下图:

flatMapValues

将values压平、拍平

val c = a.flatMapValues(x => 1 to x)
c.collect

执行结果如下图所示:

groupByKey

键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26),("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25),("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))

# 三种写法
rdd.groupByKey().map(x => (x._1, x._2.sum.toDouble/x._2.size)).collect

rdd.groupByKey().map{case (k, v) => (k,v.sum.toDouble/v.size)}.collect

rdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect

执行结果如下图所示:

reduceByKey

这种方式也可以

rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).mapValues(x => (x._1.toDouble / x._2)).collect()

foldByKey

rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => {
(x._1+y._1, x._2+y._2)}).mapValues(x=>x._1.toDouble/x._2).collect

执行结果如下图所示:

sortByKey

根据key来进行排序

val a = sc.parallelize(List("wyp", "iteblog", "com","397090770", "test"))
val b = sc.parallelize(1 to a.count.toInt)
val c = a.zip(b)
c.sortByKey().collect
c.sortByKey(false).collect

执行如下图所示:

cogroup

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"),(3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"),(6,"冯七")))

# join
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)

rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect

执行结果如下图所示:

outerjoin

# 不同的JOIN操作
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect

执行结果如下图所示:

lookup

rdd1.lookup("1")
rdd1.lookup(3)
• 1
• 2

执行结果如下图所示:

文件输入输出

文本文件

数据读取:textFile(String),可指定单个文件,支持通配符

返回值RDD[(String, Sting)],其中Key是文件的名称,Value是文件的内容。

数据保存:saveAsTextFile(String) 指定输出目录

csv文件

读取CSV(Comma-Separated Values)/TSV(Tab-Separaed Values)数据和读取JSON数据相似,都需要先把文件当做普通文件来读取数据,然后通过将每一行进行解析实现对CSV的提取。

CSV/TSV 数据的输出也是需要将结构化RDD通过相关库转换成字符串的RDD,然后使用Spark的文本文件API写出去


JSON文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析

JSON数据的输出主要通过在输出之前将由结构化数据组成的RDD转换为字符串RDD,然后使用Spark的文本文件API写出去

JSON文件的处理使用 SparkSQL 最为简洁。


SequenceFile

SequenceFile文件是Hadoop用存储二进制形式的Key-Value而设计的一种平面文件(FlatFile)。

Spark有专门用来读取SequenceFile的接口,在SparkContext中,可以调用:SequenceFile[keyClass, valueClass];

调用 saveAsSequenceFile(path)保存PairRDD,系统将键和值能够自动转换为Writable类型。


对象文件

对象文件是序列化后保存的文件,采用Java的序列化机制。

通过 objectFile 接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出,因为序列化所以要指定类型。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
6天前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
45 15
|
11天前
|
存储 分布式计算 调度
Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?
Spark Master 的高可用性(HA)机制确保主节点故障时,备用主节点能无缝接管集群管理,保障稳定运行。关键在于: 1. **Driver 和 Executor 独立**:任务执行不依赖 Master。 2. **应用状态保持**:备用 Master 通过 ZooKeeper 恢复集群状态。 3. **ZooKeeper 协调**:快速选举新 Master 并同步状态。 4. **容错机制**:任务可在其他 Executor 上重新调度。 这些特性保证了集群在 Master 故障时仍能正常运行。
zdl
|
3月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
204 56
|
2月前
|
存储 负载均衡 监控
揭秘 Elasticsearch 集群架构,解锁大数据处理神器
Elasticsearch 是一个强大的分布式搜索和分析引擎,广泛应用于大数据处理、实时搜索和分析。本文深入探讨了 Elasticsearch 集群的架构和特性,包括高可用性和负载均衡,以及主节点、数据节点、协调节点和 Ingest 节点的角色和功能。
66 0
|
3月前
|
SQL 存储 大数据
单机顶集群的大数据技术来了
大数据时代,分布式数仓如MPP成为热门技术,但其高昂的成本让人望而却步。对于多数任务,数据量并未达到PB级,单体数据库即可胜任。然而,由于SQL语法的局限性和计算任务的复杂性,分布式解决方案显得更为必要。esProc SPL作为一种开源轻量级计算引擎,通过高效的算法和存储机制,实现了单机性能超越集群的效果,为低成本、高效能的数据处理提供了新选择。
|
4月前
|
分布式计算 大数据 分布式数据库
大数据-158 Apache Kylin 安装配置详解 集群模式启动(一)
大数据-158 Apache Kylin 安装配置详解 集群模式启动(一)
82 5
|
4月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
78 4
|
分布式计算 大数据 调度
Spark 集群搭建_高可用配置|学习笔记
快速学习 Spark 集群搭建_高可用配置
Spark 集群搭建_高可用配置|学习笔记
|
分布式计算 Hadoop Linux
Spark集群搭建记录 | 云计算[CentOS7] | Spark配置
写在前面 step1 Spark下载 step2 修改环境变量 ~/.bashrc /etc/profile step3 配置Master-文件修改 slaves spark-env.sh step4 配置slave节点 step5 集群启动 step6 web浏览器状态查看 step7 配置开机启动(可选)
284 0
Spark集群搭建记录 | 云计算[CentOS7] | Spark配置

热门文章

最新文章