Spark技术内幕:Shuffle Pluggable框架详解,你怎么开发自己的Shuffle Service?

简介:

首先介绍一下需要实现的接口。框架的类图如图所示(今天CSDN抽风,竟然上传不了图片。如果需要实现新的Shuffle机制,那么需要实现这些接口。


1.1.1  org.apache.spark.shuffle.ShuffleManager

Driver和每个Executor都会持有一个ShuffleManager,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建。Driver中的ShuffleManager负责注册Shuffle的元数据,比如Shuffle ID,map task的数量等。Executor中的ShuffleManager 则负责读和写Shuffle的数据。

需要实现的函数及其功能说明:

1)       由Driver注册元数据信息

   defregisterShuffle[K, V, C](

     shuffleId: Int,

     numMaps: Int,

dependency:ShuffleDependency[K, V, C]): ShuffleHandle

一般如果没有特殊的需求,可以使用下面的实现,实际上Hash BasedShuffle 和Sort BasedShuffle都是这么实现的。

  override def registerShuffle[K, V, C](

 

      shuffleId: Int,

      numMaps: Int,

      dependency: ShuffleDependency[K, V, C]):ShuffleHandle = {

    new BaseShuffleHandle(shuffleId, numMaps,dependency)

  }

2)       获得Shuffle Writer, 根据Shuffle Map Task的ID为其创建Shuffle Writer。

def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:TaskContext): ShuffleWriter[K, V]

3)       获得Shuffle Reader,根据Shuffle ID和partition的ID为其创建ShuffleReader。

  def getReader[K, C](

      handle: ShuffleHandle,

      startPartition: Int,

      endPartition: Int,

      context: TaskContext): ShuffleReader[K,C]

4)       为数据成员shuffleBlockManager赋值,以保存实际的ShuffleBlockManager

5)       defunregisterShuffle(shuffleId: Int): Boolean,删除本地的Shuffle的元数据。

6)       def stop(): Unit,停止Shuffle Manager。

每个接口的具体实现的例子,可以参照org.apache.spark.shuffle.sort.SortShuffleManager 和org.apache.spark.shuffle.hash.HashShuffleManager。

1.1.2  org.apache.spark.shuffle.ShuffleWriter

Shuffle Map Task通过ShuffleWriter将Shuffle数据写入本地。这个Writer主要通过ShuffleBlockManager来写入数据,因此它的功能是比较轻量级的。

1)         def write(records: Iterator[_ <:Product2[K, V]]): Unit, 写入所有的数据。需要注意的是如果需要在Map端做聚合。(aggregate),那么写入前需要将records做聚合。

2)         def stop(success: Boolean): Option[MapStatus],写入完成后提交本次写入。

对于Hash BasedShuffle,请查看org.apache.spark.shuffle.hash.HashShuffleWriter;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.sort.SortShuffleWriter。

1.1.3  org.apache.spark.shuffle.ShuffleBlockManager

主要使用从本地读取Shuffle数据的功能。这些接口都是通过org.apache.spark.storage.BlockManager调用的。

1)       def getBytes(blockId: ShuffleBlockId):Option[ByteBuffer], 一般通过调用下一个接口实现,只不过将ManagedBuffer转换成了ByteBuffer。

2)       def getBlockData(blockId:ShuffleBlockId): ManagedBuffer,核心读取逻辑。比如Hash Based Shuffle的从本地读取文件都是通过这个接口实现的。因为不同的实现可能文件的组织方式是不一样的,比如Sort Based Shuffle需要通过先读取Index索引文件获得每个partition的起始位置后,才能读取真正的数据文件。

3)       def stop(): Unit,停止该Manager。

对于Hash Based Shuffle,请查看org.apache.spark.shuffle.FileShuffleBlockManager;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.IndexShuffleBlockManager。

1.1.4  org.apache.spark.shuffle.ShuffleReader

ShuffleReader实现了下游的Task如何读取上游的ShuffleMapTask的Shuffle输出的逻辑。这个逻辑比较复杂,简单来说就是通过org.apache.spark.MapOutputTracker获得数据的位置信息,然后如果数据在本地那么调用org.apache.spark.storage.BlockManager的getBlockData读取本地数据(实际上getBlockData最终会调用org.apache.spark.shuffle.ShuffleBlockManager的getBlockData)。具体的Shuffle Read的逻辑请查看下面的章节。

1)       def read():Iterator[Product2[K, C]]


如何开发自己的Shuffle机制?到这里你应该知道怎么做了。不知道? 再看一遍吧。



如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。


目录
相关文章
|
4月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
681 1
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
22天前
|
分布式计算 资源调度 Shell
如何开始使用Spark框架?
【8月更文挑战第31天】如何开始使用Spark框架?
33 2
|
22天前
|
SQL 机器学习/深度学习 分布式计算
Spark框架
【8月更文挑战第31天】Spark框架
29 2
|
8天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
27 0
|
2月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
93 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
3月前
|
分布式计算 Hadoop 大数据
大数据技术:Hadoop与Spark的对比
【6月更文挑战第15天】**Hadoop与Spark对比摘要** Hadoop是分布式系统基础架构,擅长处理大规模批处理任务,依赖HDFS和MapReduce,具有高可靠性和生态多样性。Spark是快速数据处理引擎,侧重内存计算,提供多语言接口,支持机器学习和流处理,处理速度远超Hadoop,适合实时分析和交互式查询。两者在资源占用和生态系统上有差异,适用于不同应用场景。选择时需依据具体需求。
|
3月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
29 0
|
4月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
160 0
|
4月前
|
分布式计算 Hadoop 大数据
探索大数据技术:Hadoop与Spark的奥秘之旅
【5月更文挑战第28天】本文探讨了大数据技术中的Hadoop和Spark,Hadoop作为分布式系统基础架构,通过HDFS和MapReduce处理大规模数据,适用于搜索引擎等场景。Spark是快速数据处理引擎,采用内存计算和DAG模型,适用于实时推荐和机器学习。两者各有优势,未来将继续发展和完善,助力大数据时代的发展。