六、【计算】大数据Shuffle原理与实践(中) | 青训营笔记

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 六、【计算】大数据Shuffle原理与实践(中) | 青训营笔记

 👉引言💎


学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。 热爱写作,愿意让自己成为更好的人............

铭记于心
🎉✨🎉我唯一知道的,便是我一无所知🎉✨🎉


三、shuffle过程


0 发展历程


  1. Spark 0.8 及以前 Hash Based Shuffle
  2. Spark 0.8.1为 Hash Based Shuffle引入File Consolidation机制
  3. Spark 0.9 引入 ExternalAppendOnlyMap
  4. Spark 1.1 引入 Sort Based Shuffle, 但默认仍 Hash Based Shuffle
  5. Spark 1.2 默认的 Shuffle方式改为 Sort Based Shuffle
  6. Spark 1.4 引入 Tungsten-Sort Based Shuffle
  7. Spark 1.6 Tungsten-Sort Based Shuffle 并入 Sort Based Shuffle
  8. Spark 2.0 Hash Based Shuffle退出历史舞台


1 spark中的shuffle变迁过程


  • 1.1 HashShuffle


  • 优点:不需要排序
  • 缺点:打开,创建的文件过多
  • Writer Data:
    每个partition映射到一个独立的文件:
    image.png


优化:


每个partition映射到一个文件片段


image.png


  • 1.2 SortShuffle


  • 优点:打开的文件少、支持map-side combine
  • 缺点:需要排序
  • Writer Data:
    每个task生成一个包含所有partiton数据的文件image.png


  • 1.3 TungstenSortShuffle


  • 优点:更快的排序效率,更高的内存利用效率
  • 缺点:不支持map-side combine


2 Shuffle - Read Data


每个reduce task分别获取所有map task生成的属于自己的片段

image.png


Shuffle 过程的触发流程


image.png

image.png


3 Register Shuffle


Register Shuffle 时做的最重要的事情是根据不同条件创建不同的shuffle Handle

  • 由action算子触发DAG Scheduler进行shuffle register
  • Shuffle Register会根据不同的条件决定注册不同的ShuffleHandle

image.png


4 Shuffle Handle


  • 4.1 三种ShuffleHandle对应了三种不同的ShuffleWriter的实现image.png


  • 4.2 Writer实现 - BypassMergeShuffleWriter


  • 不需要排序,节省时间
  • 写操作的时候会打开大量文件
  • 类似于Hash Shuffle

image.png


  • 4.3 Writer实现 - UnsafeShuffleWriter


使用类似内存页储存序列化数据
数据写入后不再反序列化


  • 4.4 Writer实现 - UnsafeShuffleWriter
    只根据 partition 排序 Long Array
    数据不移动

网络异常,图片无法展示
|

  • 4.5 Writer实现 - SortShuffleWriter
    网络异常,图片无法展示
    |
  • 支持combine
  • 需要combine时,使用PartitionedAppendOnlyMap,本质是个HashTable
  • 不需要combine时PartitionedPairBuffer本质是个array

5 Reader实现 - 网络时序图

  • ShuffleReader网络请求流程
    image.png
    使用netty作为网络框架提供网络服务,并接受reducetask的fetch请求
    首先发起openBlocks请求获得streamId,然后再处理stream或者chunk请求
  • ShuffleBlockFetchIterator

image.png

  • 区分local和remote节省网络消耗
  • 防止OOM
  • maxBytesInFlight
  • maxReqsInFlight
  • maxBlocksInFlightPerAddress
  • maxReqSizeShuffleToMem
  • maxAttemptsOnNettyOOM
  • External Shuffle Serviceimage.png

为了解决Executor为了服务数据的fetch请求导致无法退出问题,我们在每个节点上部署一个External Shuffle Service,这样产生数据的Executor在不需要继续处理任务时,可以随意退出


6 shuffle优化技术


  • 6.1 Zero Copy


  • 不使用zero copy

image.png

  • 使用sendfileimage.png
  • 使用sendfile + DMA gather copyimage.png

  • 6.2 Netty Zero Copy


  • 可堆外内存,避免JVM堆内存到堆外内存的数据拷贝
  • CompositeByteBuf, Unpooled.wrappedBuffer, ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
  • Netty 使用FileRegion 实现文件传输, FileRegion底层封装了FileChannel#transferTo()方法,可以将文件缓冲区的数据直接传输到目标Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝


  • 6.3常见问题


数据存储在本地磁盘,没有备份
IO并发:大量RPC请求(M*R)
IO吞吐:随机读、写放大(3X)
GC频繁,影响 NodeManager


7 shuffle 优化


  • 避免shuffle ——使用broadcast替代join
//传统的join操作会导致shuffle操作。
//因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
//Broadcast+map的join操作,不会导致shuffle操作。
//使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
//在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
//然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
//此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
//注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
//因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
  • 使用可以map-side预聚合的算子
  • Shuffle 参数优化
  • spark.default.parallelism && spark.sql.shuffle.partitions
  • spark.hadoopRDD.ignoreEmptySplits
  • spark.hadoop.mapreduce.input.fileinputformat.split.minsize
  • spark.sql.file.maxPartitionBytes
  • spark.sql.adaptive.enabled && spark.sql.adaptive.shuffle.targetPostShuffleInputSize
  • spark.reducer.maxSizeInFlight
  • spark.reducer.maxReqsInFlight spark.reducer.maxBlocksInFlightPerAddress


8 Shuffle 倾斜优化


image.png


  • 解决倾斜方法举例


  • 增大并发度


实现简单,但无法从本质上解决问题

image.png


  • AQE


AQE 根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join

image.png

  • 零拷贝
  • sendfile+DMA gather copy
  • Netty 零拷贝
  • 可堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf 、 Unpooled.wrappedBuffer、 ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
  • Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝

🌹写在最后💖: 路漫漫其修远兮,吾将上下而求索!伙伴们,再见!🌹🌹🌹

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
存储 人工智能 分布式计算
Hadoop基础学习---1、大数据概论
Hadoop基础学习---1、大数据概论
|
分布式计算 大数据 测试技术
六、【计算】大数据Shuffle原理与实践(上) | 青训营笔记
六、【计算】大数据Shuffle原理与实践(上) | 青训营笔记
六、【计算】大数据Shuffle原理与实践(上) | 青训营笔记
|
存储 分布式计算 大数据
六、【计算】大数据Shuffle原理与实践(下) | 青训营笔记
六、【计算】大数据Shuffle原理与实践(下) | 青训营笔记
六、【计算】大数据Shuffle原理与实践(下) | 青训营笔记
|
存储 分布式计算 Java
大数据 Shuffle 原理与实践|青训营笔记
本文包括:1.shuffle概述;2.spark中的shuffle算子的基本特性;3.spark中的shuffle的过程;4.push shuffle的原理与实现
223 0
大数据 Shuffle 原理与实践|青训营笔记
|
机器学习/深度学习 SQL 数据采集
数据分析理论与实践 | 青训营笔记
埋点:埋点数据是指上报的记录着触发原因和状态信息的日志数据。按照上报方来看,可以划分为"服务端埋点”和"客户端埋点”,按照上报形式,可以划分为"代码埋点”、“可视化全埋点” 。
163 0
数据分析理论与实践 | 青训营笔记
|
存储 大数据 OLAP
【读书笔记】《大数据之路》——维度设计总结(1)
【读书笔记】《大数据之路》——维度设计总结(1)
【读书笔记】《大数据之路》——维度设计总结(1)
|
存储 分布式计算 关系型数据库
大数据基础-MapReduce原理及核心编程思想
MapReduce原理及核心编程思想
200 0
|
存储 分布式计算 大数据
大数据Shuffle原理与实践
大数据Shuffle原理与实践
736 0
大数据Shuffle原理与实践
|
SQL 存储 关系型数据库
23篇大数据系列(三)sql基础知识(上)(史上最全,建议收藏)
23篇大数据系列(三)sql基础知识(上)(史上最全,建议收藏)
23篇大数据系列(三)sql基础知识(上)(史上最全,建议收藏)
|
存储 大数据 数据库
【读书笔记】《大数据之路》——维度设计总结(2)
【读书笔记】《大数据之路》——维度设计总结(2)