Spark技术内幕: Shuffle详解(三)

简介:

前两篇文章写了Shuffle Read的一些实现细节。但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的;本篇开始,将按照Job的执行顺序,来讲解Shuffle。即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的;结果是如何处理的;结果是如何读取的。

在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend。它在接收到LaunchTask的命令后,通过在Driver创建SparkContext时已经创建的org.apache.spark.executor.Executor的实例的launchTask,启动Task:

  deflaunchTask(
     context: ExecutorBackend, taskId: Long, taskName: String,serializedTask: ByteBuffer) {
   val tr = new TaskRunner(context, taskId, taskName, serializedTask)
   runningTasks.put(taskId, tr)
   threadPool.execute(tr) // 开始在executor中运行
  }

最终Task的执行是在org.apache.spark.executor.Executor.TaskRunner#run。org.apache.spark.executor.ExecutorBackend是Executor与Driver通信的接口,它实际上是一个trait:

private[spark] trait ExecutorBackend {
  defstatusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
}
 TaskRunner会将Task执行的状态汇报给Driver(org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor)。 而Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。

在Executor运行Task时,得到计算结果会存入org.apache.spark.scheduler.DirectTaskResult。在将结果回传到Driver时,会根据结果的大小有不同的策略:对于“较大”的结果,将其以taskid为key存入org.apache.spark.storage.BlockManager;如果结果不大,那么直接回传给Driver。那么如何判定这个阈值呢?

这里的回传是直接通过akka的消息传递机制。因此这个大小首先不能超过这个机制设置的消息的最大值。这个最大值是通过spark.akka.frameSize设置的,单位是Bytes,默认值是10MB。除此之外,还有200KB的预留空间。因此这个阈值就是conf.getInt("spark.akka.frameSize", 10) * 1024 *1024 – 200KB。

       // directSend = sending directly back to the driver
       val (serializedResult, directSend) = {
         if (resultSize >=akkaFrameSize - AkkaUtils.reservedSizeBytes) { //如果结果太大,那么存入BlockManager
           val blockId = TaskResultBlockId(taskId)
           env.blockManager.putBytes(
              blockId, serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)
           (ser.serialize(new IndirectTaskResult[Any](blockId)), false)
         } else { // 如果大小合适,则直接发送结果给Driver
           (serializedDirectResult, true)
         }
       }
       execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)


TaskRunner将Task的执行状态汇报给Driver后,Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。而在这里不同的状态有不同的处理:

1.    如果类型是TaskState.FINISHED,那么调用org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask进行处理。

2.    如果类型是TaskState.FAILED或者TaskState.KILLED或者TaskState.LOST,调用org.apache.spark.scheduler.TaskResultGetter#enqueueFailedTask进行处理。对于TaskState.LOST,还需要将其所在的Executor标记为failed, 并且根据更新后的Executor重新调度。

 enqueueSuccessfulTask的逻辑也比较简单,就是如果是IndirectTaskResult,那么需要通过blockid来获取结果:sparkEnv.blockManager.getRemoteBytes(blockId);如果是DirectTaskResult,那么结果就无需远程获取了。然后调用

1.    org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask

2.    org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask

3.    org.apache.spark.scheduler.DAGScheduler#taskEnded

4.    org.apache.spark.scheduler.DAGScheduler#eventProcessActor

5.    org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion

进行处理。核心逻辑都在第5个调用栈。如果task是ResultTask,处理逻辑比较简单,停止job,更新一些状态,发送一些event即可。

    if (!job.finished(rt.outputId)){
        job.finished(rt.outputId) =true
        job.numFinished += 1
        // If the whole job hasfinished, remove it
        if (job.numFinished ==job.numPartitions) {
          markStageAsFinished(stage)
         cleanupStateForJobAndIndependentStages(job)
          listenerBus.post(SparkListenerJobEnd(job.jobId,JobSucceeded))
        }
 
        // taskSucceeded runs someuser code that might throw an exception.
        // Make sure we areresilient against that.
        try {
         job.listener.taskSucceeded(rt.outputId, event.result)
        } catch {
          case e: Exception =>
            // TODO: Perhaps we wantto mark the stage as failed?
           job.listener.jobFailed(new SparkDriverExecutionException(e))
        }
    }

如果task是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游的Stage,以便于其可以作为下游Stage的输入。这个机制是怎么实现的?

实际上,对于ShuffleMapTask来说,其结果实际上是org.apache.spark.scheduler.MapStatus;其序列化后存入了DirectTaskResult或者IndirectTaskResult中。而DAGScheduler#handleTaskCompletion通过下面的方式来获取这个结果:

val status =event.result.asInstanceOf[MapStatus]

通过将这个status注册到org.apache.spark.MapOutputTrackerMaster,就实现了

    mapOutputTracker.registerMapOutputs(
                 stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list=> if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)


目录
相关文章
|
4月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
681 1
|
8天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
27 0
|
2月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
93 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
3月前
|
分布式计算 Hadoop 大数据
大数据技术:Hadoop与Spark的对比
【6月更文挑战第15天】**Hadoop与Spark对比摘要** Hadoop是分布式系统基础架构,擅长处理大规模批处理任务,依赖HDFS和MapReduce,具有高可靠性和生态多样性。Spark是快速数据处理引擎,侧重内存计算,提供多语言接口,支持机器学习和流处理,处理速度远超Hadoop,适合实时分析和交互式查询。两者在资源占用和生态系统上有差异,适用于不同应用场景。选择时需依据具体需求。
|
3月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
29 0
|
4月前
|
分布式计算 Hadoop 大数据
探索大数据技术:Hadoop与Spark的奥秘之旅
【5月更文挑战第28天】本文探讨了大数据技术中的Hadoop和Spark,Hadoop作为分布式系统基础架构,通过HDFS和MapReduce处理大规模数据,适用于搜索引擎等场景。Spark是快速数据处理引擎,采用内存计算和DAG模型,适用于实时推荐和机器学习。两者各有优势,未来将继续发展和完善,助力大数据时代的发展。
|
4月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
|
4月前
|
分布式计算 Java 调度
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
167 0
|
4月前
|
SQL 分布式计算 大数据
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
Hudi数据湖技术引领大数据新风口(三)解决spark模块依赖冲突
207 0
|
4月前
|
机器学习/深度学习 分布式计算 搜索推荐
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
150 0