👉引言💎
学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。 热爱写作,愿意让自己成为更好的人............
铭记于心 | ||
🎉✨🎉我唯一知道的,便是我一无所知🎉✨🎉 |
三、shuffle过程
0 发展历程
- Spark 0.8 及以前 Hash Based Shuffle
- Spark 0.8.1为 Hash Based Shuffle引入File Consolidation机制
- Spark 0.9 引入 ExternalAppendOnlyMap
- Spark 1.1 引入 Sort Based Shuffle, 但默认仍 Hash Based Shuffle
- Spark 1.2 默认的 Shuffle方式改为 Sort Based Shuffle
- Spark 1.4 引入 Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-Sort Based Shuffle 并入 Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle退出历史舞台
1 spark中的shuffle变迁过程
- 1.1 HashShuffle
- 优点:不需要排序
- 缺点:打开,创建的文件过多
- Writer Data:
每个partition映射到一个独立的文件:
优化:
每个partition映射到一个文件片段
- 1.2 SortShuffle
- 优点:打开的文件少、支持map-side combine
- 缺点:需要排序
- Writer Data:
每个task生成一个包含所有partiton数据的文件
- 1.3 TungstenSortShuffle
- 优点:更快的排序效率,更高的内存利用效率
- 缺点:不支持map-side combine
2 Shuffle - Read Data
每个reduce task分别获取所有map task生成的属于自己的片段
Shuffle 过程的触发流程
3 Register Shuffle
Register Shuffle 时做的最重要的事情是根据不同条件创建不同的shuffle Handle
- 由action算子触发DAG Scheduler进行shuffle register
- Shuffle Register会根据不同的条件决定注册不同的ShuffleHandle
4 Shuffle Handle
- 4.1 三种ShuffleHandle对应了三种不同的ShuffleWriter的实现
- 4.2 Writer实现 - BypassMergeShuffleWriter
- 不需要排序,节省时间
- 写操作的时候会打开大量文件
- 类似于Hash Shuffle
- 4.3 Writer实现 - UnsafeShuffleWriter
使用类似内存页储存序列化数据
数据写入后不再反序列化
- 4.4 Writer实现 - UnsafeShuffleWriter
只根据 partition 排序 Long Array
数据不移动
网络异常,图片无法展示
|
- 4.5 Writer实现 - SortShuffleWriter
网络异常,图片无法展示|
- 支持combine
- 需要combine时,使用PartitionedAppendOnlyMap,本质是个HashTable
- 不需要combine时PartitionedPairBuffer本质是个array
5 Reader实现 - 网络时序图
- ShuffleReader网络请求流程
使用netty作为网络框架提供网络服务,并接受reducetask的fetch请求
首先发起openBlocks请求获得streamId,然后再处理stream或者chunk请求 - ShuffleBlockFetchIterator
- 区分local和remote节省网络消耗
- 防止OOM
- maxBytesInFlight
- maxReqsInFlight
- maxBlocksInFlightPerAddress
- maxReqSizeShuffleToMem
- maxAttemptsOnNettyOOM
- External Shuffle Service
为了解决Executor为了服务数据的fetch请求导致无法退出问题,我们在每个节点上部署一个External Shuffle Service,这样产生数据的Executor在不需要继续处理任务时,可以随意退出
6 shuffle优化技术
- 6.1 Zero Copy
- 不使用zero copy
- 使用sendfile
- 使用sendfile + DMA gather copy
- 6.2 Netty Zero Copy
- 可堆外内存,避免JVM堆内存到堆外内存的数据拷贝
- CompositeByteBuf, Unpooled.wrappedBuffer, ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
- Netty 使用FileRegion 实现文件传输, FileRegion底层封装了FileChannel#transferTo()方法,可以将文件缓冲区的数据直接传输到目标Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝
- 6.3常见问题
数据存储在本地磁盘,没有备份
IO并发:大量RPC请求(M*R)
IO吞吐:随机读、写放大(3X)
GC频繁,影响 NodeManager
7 shuffle 优化
- 避免shuffle ——使用broadcast替代join
//传统的join操作会导致shuffle操作。 //因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。 val rdd3 = rdd1.join(rdd2) //Broadcast+map的join操作,不会导致shuffle操作。 //使用Broadcast将一个数据量较小的RDD作为广播变量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) //在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。 //然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。 //此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) //注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。 //因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
- 使用可以map-side预聚合的算子
- Shuffle 参数优化
- spark.default.parallelism && spark.sql.shuffle.partitions
- spark.hadoopRDD.ignoreEmptySplits
- spark.hadoop.mapreduce.input.fileinputformat.split.minsize
- spark.sql.file.maxPartitionBytes
- spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
- spark.reducer.maxSizeInFlight
- spark.reducer.maxReqsInFlight spark.reducer.maxBlocksInFlightPerAddress
8 Shuffle 倾斜优化
- 解决倾斜方法举例
- 增大并发度
实现简单,但无法从本质上解决问题
- AQE
AQE 根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join
- 零拷贝
- sendfile+DMA gather copy
- Netty 零拷贝
- 可堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
- CompositeByteBuf 、 Unpooled.wrappedBuffer、 ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
- Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝
🌹写在最后💖: 路漫漫其修远兮,吾将上下而求索!伙伴们,再见!🌹🌹🌹