Spark 原理 | 青训营笔记
这是我参与「第四届青训营 」笔记创作活动的的第4天
参考链接:
1.第四届字节跳动青训营
2.RDD介绍
大数据处理引擎Spark介绍
Spark生态组件:
- Spark Core:Spark核心组件,它实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。
- Spark SQL:用来操作结构化数据的核心组件,通过Spark SQL可以直接查询Hive、HBase等多种外部数据源中的数据。
- Spark Structured Streaming:Spark提供的流式计算框架,支持高吞吐量、可容错处理的实时流式数据处理。
- MLlib:Spark提供的关于机器学习功能的算法程序库,包括分类、回归、聚类、协同过滤算法等,还提供了模型评估、数据导入等额外的功能。
- GraphX:Spark提供的分布式图处理框架,拥有对图计算和图挖掘算法的API接口以及丰富的功能和运算符。
- 独立调度器、Yarn、Mesos、Kubernetes:Spark框架可以高效地在一个到数千个节点之间伸缩计算,集群管理器则主要负责各个节点的资源管理工作,为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行。
Spark 运行架构和工作原理:
- Application(应用):Spark上运行的应用。Application中包含了一个驱动器(Driver)进程和集群上的多个执行器(Executor)进程。
- Driver Program(驱动器):运行main()方法并创建SparkContext的进程。
- Cluster Manager(集群管理器):用于在集群上申请资源的外部服务(如:独立部署的集群管理器、Mesos或者Yarn)。
- Worker Node(工作节点):集群上运行应用程序代码的任意一个节点。
- Executor(执行器):在集群工作节点上为某个应用启动的工作进程,该进程负责运行计算任务,并为应用程序存储数据。
- Task(任务):执行器的工作单元。
- Job(作业):一个并行计算作业,由一组任务(Task)组成,并由Spark的行动(Action)算子(如:save、collect)触发启动。
- Stage(阶段):每个Job可以划分为更小的Task集合,每组任务被称为Stage。
\
Spark目前支持几个集群管理器:
- Standalone :Spark 附带的简单集群管理器,可以轻松设置集群。
- Apache Mesos:通用集群管理器,也可以运行 Hadoop MapReduce 和服务应用程序。(已弃用)
- Hadoop YARN: Hadoop 2 和 3 中的资源管理器。
- Kubernetes(K8s):用于自动部署、扩展和管理容器化应用程序的开源系统。
Spark 生态特点:
- 统一引擎,支持多种分布式场景
- 多语言支持
- 可读写丰富数据源(内置DataSource)
- 丰富灵活的API算子 (SparkCore -> RDD SparkSQL -> DataFrame)
- 支持K8S、YARN、Mesos 资源调度
\
Spark运行架构:
Spark应用在集群上运行时,包括了多个独立的进程,这些进程之间通过驱动程序(Driver Program)中的SparkContext对象进行协调,SparkContext对象能够与多种集群资源管理器(Cluster Manager)通信,一旦与集群资源管理器连接,Spark会为该应用在各个集群节点上申请执行器(Executor),用于执行计算任务和存储数据。Spark将应用程序代码发送给所申请到的执行器,SparkContext对象将分割出的任务(Task)发送给各个执行器去运行。
需要注意的是
- 每个Spark application都有其对应的多个executor进程。Executor进程在整个应用程序生命周期内,都保持运行状态,并以多线程方式执行任务。这样做的好处是,Executor进程可以隔离每个Spark应用。从调度角度来看,每个driver可以独立调度本应用程序的内部任务。从executor角度来看,不同Spark应用对应的任务将会在不同的JVM中运行。然而这样的架构也有缺点,多个Spark应用程序之间无法共享数据,除非把数据写到外部存储结构中。
- Spark对底层的集群管理器一无所知,只要Spark能够申请到executor进程,能与之通信即可。这种实现方式可以使Spark比较容易的在多种集群管理器上运行,例如Mesos、Yarn、Kubernetes。
- Driver Program在整个生命周期内必须监听并接受其对应的各个executor的连接请求,因此driver program必须能够被所有worker节点访问到。
- 因为集群上的任务是由driver来调度的,driver应该和worker节点距离近一些,最好在同一个本地局域网中,如果需要远程对集群发起请求,最好还是在driver节点上启动RPC服务响应这些远程请求,同时把driver本身放在离集群Worker节点比较近的机器上。
Spark下载编译:
编译:
git clone -b master https://github.com/apache/spark.git cd spark ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phive -Phive-thriftserver -Pmesos -Pyarn
编译参数可选,详见官网building-spark https://spark apache.org/dos/latest/building-spark.html)
编译完后会在目录下生成一个tgz包
下载:
官网download --> https://spark apacheorgldownloads.html
环境变量:
提交命令:
spark-shell:
spark-sql:
pyspark:
SparkCore:
RDD简介
RDD(Resilient Distributed Dataset) - (弹性分布式数据集)
表示可以并行操作的不可变的、可分区、里面的元素可以并行计算的集合。
在Spark 中,对数据的所有操作不外乎创建RDD、转化已有RDD 以及调用RDD 操作进行求值。每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含Python、Java、Scala 中任意类型的对象, 甚至可以包含用户自定义的对象。
RDD 允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度
* Internally, each RDD is characterized by five main properties: * * Partitions - A list of partitions * Compute - A function for computing each split * Dependencies - A list of dependencies on other RDDs * Partitioner - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * PreferredLocations - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
1) 一组分片(Partition),即数据集的基本组成单位。
对于RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD 时指定RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
protected def getPartitions: Array[Partition]
2) 一个计算每个分区的函数。
Spark 中RDD 的计算是以分片为单位的,每个RDD都会实现compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
def compute(split: Partition, context: TaskContext): Iterator[T]
3) RDD 之间的依赖关系。
RDD 的每次转换都会生成一个新的RDD,所以RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD 的所有分区进行重新计算。
protected def getDependencies: Seq[Dependency[_]] = deps
4) 一个Partitioner, 即RDD 的分片函数(分区器)。
当前Spark 中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value 的RDD , 才会有Partitioner, 非key-value 的RDD 的Parititioner 的值是None。Partitioner 函数不但决定了RDD 本身的分片数量, 也决定了parent RDD Shuffle 输出时的分片数量。
@transient val partitioner: Option[Partitioner] = None
5) 一个列表, 存储存取每个Partition 的优先位置( preferred location)。
对于一个HDFS 文件来说,这个列表保存的就是每个Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
RDD 是一个应用层面的逻辑概念。一个RDD 多个分片。RDD 就是一个元数据记录集,记录了RDD 内存所有的关系数据。
创建RDD
- 内置RDD
- 自定义RDD
class CustomRDD(...) extends RDD {}
实现五要素对应的函数
RDD算子:
- Transformer 算子:生成一个新的RDD
map/filter/flatMap/groupByKey/...
- Action 算子:触发job提交(向驱动器程序返回结果或把结果写入外部系统的操作)
collect/count/take/saveAsTextFile
\
Spark 采用惰性计算模式,RDD 只有第一次在一个行动操作中用到时,才会真正计算。Spark 可以优化整个计算过程。默认情况下,Spark 的RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD , 可以使用RDD.persist() 让Spark 把这个RDD 缓存下来。
\
\
RDD依赖:
RDDs 通过操作算子进行转换,转换得到的新RDD 包含了从其他RDDs 衍生所必需的信息,RDDs 之间维护着这种血缘关系,也称之为依赖。
RDD依赖:描述父子RDD之间的依赖关系(lineage)。
➢窄依赖:父RDD的每个partition至多对应一个子RDD分区。(一 一对应)
- NarrowDependency
- 1
- One ToOneDependency
override def getParents(partitionld: Int): List[Int] = List(partitionld)
- RangeDependency
override def getParents(partitionld: Int): List[Int] = if (artitionld >= outStart && partitionld < outStart + length) List(partitionld - outStart + inStart)
- PruneDependency
➢宽依赖:父RDD的每个partition都可能对应多个子RDD分区。(多对多)
ShuffleDependency
\
通过RDDs 之间的这种依赖关系,一个任务流可以描述为DAG(有向无环图),如下图所示,在实际执行过程中宽依赖对应于Shuffle(图中的reduceByKey 和join),窄依赖中的所有转换操作可以通过类似于管道的方式一气呵成执行(图中map 和union 可以一起执行)。
RDD执行过程
Job:RDD算子触发 Stage:依据宽依赖划分 Task:Stage内执行单个partition
划分Stage的整体思路:
从后往前推,遇到宽依赖就断开,划分为一个Stage。
遇到窄依赖,就将这个RDD加入该Stage中,DAG最后一个阶段会为每个结果的Partition生成一个ResultTask。每个Stage里面的Task数量由最后一个RDD的Partition数量决定,其余的阶段会生成ShuffleMapTask。
当RDD对象创建后,SparkContext会根据RDD对象构建DAG有向无环图,然后将Task提交给DAGScheduler。DAGScheduler根据ShuffleDependency将DAG划分为不同的Stage,为每个Stage生成TaskSet任务集合,并以TaskSet为单位提交给TaskScheduler。
TaskScheduler根据调度算法(FIFO/FAIR)对多个TaskSet进行调度,并通过集群中的资源管理器(Standalone模式下是Master,Yarn模式下是ResourceManager)把Task调度(locality)到集群中Worker的Executor,Executor由SchedulerBackend提供。
内存管理
Spark 作为一个基于内存的分布式计算引擎,Spark采用统一内存管理机制。重点在于动态占用机制。
- 设定基本的存储内存(Storage)和执行内存(Execution)区域,该设定确定了双方各自拥有的空间的范围,UnifiedMemoryManager统一管理多个并发Task内存分配 (每个task获取的内存区间为 ~为当前Executor中正在并发运行的task数量)
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间
- 当Storage空闲,Execution可以借用Storage的内存使用,可以减少spill等操作, Execution内存不能被Storage驱逐。
- Execution内存的空间被Storage内存占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
- 当Execution空闲,Storage可以借用Execution内存使用,当Execution需要内存时,可以驱逐被Storage借用的内存,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
user memory存储用户自定义的数据结构或者spark内部元数据
Reserverd memory:预留内存,防止OOM,
堆内(On-Heap)内存/堆外(Off-Heap)内存:Executor 内运行的并发任务共享 JVM 堆内内存。为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 可以直接操作系统堆外内存,存储经过序列化的二进制数据。减少不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。
shuffle
SortShuffleManager
每个Map Task生成一个 Shuffle数据文件和一个index文件,dataFile中的数据按照artitionld进行排序,同一个partitionld的数据聚集在一起,indexFile保存了所有paritionld在dataFlle中的位置信息,方便后续ReduceTask能Fetch到对应partitionld 的数据。
shuffle write的文件被NodeManage r中的Shuffle Service托管,供后续Reduce Task进行shuffle fetch,如果Executor空闲,DRA可以进行回收
SparkSQL:
SparkSQL执行过程
- SQL Parse: 将SparkSQL字符串或DataFrame解析为一个抽象语法树/AST,即Unresolved Logical Plan
- Analysis:遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函数绑定,然后根据元数据信息Catalog对数据表中的字段进行解析。 利用Catalog信息将Unresolved Logical Plan解析成Analyzed Logical plan
- Logical Optimization:该模块是Catalyst的核心,主要分为RBO和CBO两种优化策略,其中RBO是基于规则优化,CBO是基于代价优化。 利用一些规则将Analyzed Logical plan解析成Optimized Logic plan
- Physical Planning: Logical plan是不能被spark执行的,这个过程是把Logic plan转换为多个Physical plans
- CostModel: 主要根据过去的性能统计数据,选择最佳的物理执行计划(Selected Physical Plan)。
- Code Generation: sql逻辑生成Java字节码
影响SparkSQL性能两大技术:
- Optimizer:执行计划的优化,目标是找出最优的执行计划
- Runtime:运行时优化,目标是在既定的执行计划下尽可能快的执行完毕。
Catalyst优化
- Rule Based Optimizer(RBO): 基于规则优化,对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换。
Batch执行策略:
Once ->只执行一次
FixedPoint ->重复执行,直到plan不再改变, 或者执行达到固定次数(默认100次)
- Cost Based Optimizer(CBO): 基于代价优化,根据优化规则对关系表达式进行转换,生成多个执行计划,然后CBO会通过根据统计信息(Statistics)和代价模型(Cost Model)计算各种可能执行计划的代价,从中选用COST最低的执行方案,作为实际运行方案。CBO依赖数据库对象的统计信息,统计信息的准确与否会影响CBO做出最优的选择。
AQE
AQE对于整体的Spark SQL的执行过程做了相应的调整和优化,它最大的亮点是可以根据已经完成的计划结点真实且精确的执行统计结果来不停的反馈并重新优化剩下的执行计划。
\
AQE框架三种优化场景:
1.动态合并shuffle分区(Dynamically coalescing shuffle partitions)
Partition:合并
spark. sql shuil) partition作业粒度参数,一个作业中所有Stage都一样,但是每个Stage实际处理的数据不一样,可能某些Stage的性能比较差:
- parttion参数对某个Stage过大,则可能单个partition的大小比较小,而且Task个数会比较多,shuffle fetch阶段产生大量的小块的随机读,影响性能
- parition参数对某个Stage过小, 则可能单个partition的大小比较大,会产生更多的pill或者OOM
\
作业运行过程中,根据前面运行完的Stage的MapStatus中实际的partiton大小信息,可以将多个相邻的较小的partiton进行动态合并,由一个Task读取进行处理。
2.动态调整Join策略(Dynamically switching join strategies)
problem: Catalyst Optimizer优化阶段,算子的statistics估算不准确,生成的执行计划并不是最优
solution: AQE运行过程中动态获取准确Join的leftChild/rightChild的实际大小,将SortMergeJoin (SMJ) 转化为BroadcastHashJoin (BHJ)
3.动态优化数据倾斜Join(Dynamically optimizing skew joins)
AQE根据MapStatus信息自动检测是否有倾斜,将大的partition拆分成多个Task进行Join。
↓↓↓↓↓↓
RuntimeFilter
实现在Catalyst中。动态获取Filter内容做相关优化,当我们将一张大表和一张小表等值连接时,我们可以从小表侧收集一些统计信息,并在执行join前将其用于大表的扫描,进行分区修剪或数据过滤。可以大大提高性能。
Runtime优化分两类:
- 全局优化:从提升全局资源利用率、消除数据倾斜、降低IO等角度做优化。包括AQE。
- 局部优化:提高某个task的执行效率,主要从提高CPU与内存利用率的角度进行优化。依赖Codegen技术。
Codegen
从提高cpu的利用率的角度来进行runtime优化。
1.Expression级别
表达式常规递归求值语法树。需要做很多类型匹配、虚函数调用、对象创建等额外逻辑,这些overhead远超对表达式求值本身,为了消除这些overhead,Spark Codegen直接拼成求值表达式的java代码并进行即时编译
也就是将 (a+1)更新成为一个新值,然后将这个值乘以a
2.WholeStage级别
传统的火山模型:SQL经过解析会生成一颗查询树,查询树的每个节点为Operator,火山模型把operator看成迭代器,每个迭代器提供一个next()接口。通过自顶向下的调用 next 接口,数据则自底向上的被拉取处理,火山模型的这种处理方式也称为拉取执行模型,每个Operator 只要关心自己的处理逻辑即可,耦合性低。
火山模型问题:数据以行为单位进行处理,不利于CPU cache 发挥作用;每处理一行需要调用多次next() 函数,而next()为虚函数调用。会有大量类型转换和虚函数调用。虚函数调用会导致CPU分支预测失败,从而导致严重的性能回退
\
Spark WholestageCodegen:为了消除这些overhead,会为物理计划生成类型确定的java代码。并进行即时编译和执行。
\
Codegen打破了Stage内部算子间的界限,拼出来跟原来的逻辑保持一致的裸的代码(通常是一个大循环)然后把拼成的代码编译成可执行文件。
业界挑战与实践
1. Shuffle稳定性问题
在大规模作业下,开源ExternalShuffleService(ESS)的实现机制容易带来大量随机读导致的磁盘IOPS瓶颈、Fetch请求积压等问题,进而导致运算过程中经常会出现Stage重算甚至作业失败,继而引起资源使用的恶性循环,严重影响SLA.
稳定性解决方案:
\
2. SQL执行性能问题
压榨CPU资源
解决方向:
1.Photon: C+ +实现的向量化执行引擎
2.Intel: OAP/gazelle plugin
github.com/oap-project…
3. 参数推荐/作业诊断
Spark参数很多,资源类/Shuffle/Join/Agg...;参数不合理的作业,对资源利用率/Shufle稳定性/性能有非常大影响;同时,线上作业失败运行慢,用户排查难度大