Flink大规模作业调度性能优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 作者:洪志龙(柏星)& 朱翥(长耕)随着 Flink 流批一体架构不断演进和升级,越来越多的用户开始选择用 Flink 来同时承载实时和离线的业务。离线业务和实时业务有一定差异性,其中比较关键的一点是——离线作业的规模通常都远远大于实时作业。超大规模的流批作业对Flink的调度性能提出了新的挑战。在基于Flink 1.12版本部署大规模流批作业时,用户可能会遇到以下瓶颈: 需要很长时间才能

作者:洪志龙(柏星)& 朱翥(长耕)

随着 Flink 流批一体架构不断演进和升级,越来越多的用户开始选择用 Flink 来同时承载实时和离线的业务。离线业务和实时业务有一定差异性,其中比较关键的一点是——离线作业的规模通常都远远大于实时作业。超大规模的流批作业对Flink的调度性能提出了新的挑战。在基于Flink 1.12版本部署大规模流批作业时,用户可能会遇到以下瓶颈: 

  1. 需要很长时间才能完成作业的调度和部署; 
  2. 需要大量内存来存储作业的执行拓扑图以及部署时所需的临时变量,并且在运行过程中会出现频繁的长时间GC,影响集群稳定性; 

经测试,对于一个并发度为10k的word count作业,在其部署时JobManager需要30 GiB内存,并且从提交作业到所有任务节点部署完毕所需的总时间长达4分钟。 

此外,对于大规模作业,任务部署的过程可能会长时间阻塞JobManager的主线程。当主线程阻塞时,JobManager无法响应任何来自TaskManager的请求。这会使得TaskManager心跳超时进而导致作业出错失败。在最坏的情况下,作业从故障恢复(Failover)并进行新一轮部署时又会出现心跳超时,从而导致作业一直卡在部署阶段无法正常运行。 

为了优化Flink调度大规模作业的性能,我们在Flink 1.13版本和1.14版本进行了以下优化: 

  1. 针对拓扑结构引入分组概念,优化与拓扑相关的计算逻辑,主要包括作业初始化、任务调度以及故障恢复时计算需要重启的任务节点等等。与此同时,该优化降低了执行拓扑占用的内存空间; 
  2. 引入缓存机制优化任务部署,优化后部署速度更快且所需内存更少; 
  3. 基于逻辑拓扑和执行拓扑的特性进行优化以加快Pipelined Region的构建速度,从而降低作业初始化所需的时间。

性能评测结果 

为了评估优化的效果,我们对Flink 1.12(优化前)和Flink 1.14(优化后)进行了对比测试。测试作业包含两个节点,由全连接边相连,并发度均为10k。为了通过blob服务器分发ShuffleDescriptor,我们将配置项 blob.offload.minsize 的值修改为100 KiB。该配置项指定了通过blob服务器传输数据的最小阈值,大小超过该阈值的数据将会通过Blob服务器进行传输。该配置项的默认值为1 MiB,而测试作业中节点的ShuffleDescriptor大小约为270 KiB。测试结果如表1所示: 

表1 Flink 1.12和1.14各流程时间对比 

1.12 

1.14 

时间降低百分比(%) 

作业初始化 

11,431ms 

627ms 

94.51%

任务部署 

63,118ms 

17,183ms 

72.78%

故障恢复时计算重启节点 

37,195ms 

170ms 

99.55%

除了时间大幅缩短以外,内存占用也明显降低。在Flink 1.12版本上运行测试作业时,JobManager需要30 GiB内存才能保证作业稳定运行,而在Flink 1.14版本上只需要2 GiB即可。与此同时,GC情况也得以改善。在1.12版本上,测试作业在初始化和Task部署的过程中都会出现超过10秒的长GC,而在1.14版本上均未出现,这意味着心跳超时等问题出现的概率更低,作业运行更为稳定。

在1.12版本上,除去申请资源的时间,测试作业需要至少4分钟才能部署完成。而作为对比,在1.14版本上,除去申请资源的时间,测试作业在30秒内即可完成部署并开始运行。整体所需时间降低了87%。鉴于此,对于需要部署运行大规模作业的社区用户,建议将Flink版本升级至1.14以提升作业调度和部署性能。 此外,VVR 4.0的最新版本已经涵盖了上述优化,建议所有集团内用户升级至VVR 4.0的最新版本。

在接下来的部分中我们将进一步介绍各项优化的细节。

基于拓扑结构的优化 

在Flink中,分发模式(Distribution Pattern)描述了上游节点与下游节点连接的方式,上游节点计算的结果会按照连边分发到下游节点。目前Flink中有两种分发模式:点对点(Pointwise)和全连接(All-to-all)。如图1所示,当分发模式为点对点时,遍历所有边的计算复杂度为O(N);当分发模式为全连接时,所有下游节点与上游节点都有连边,遍历所有边的计算复杂度为O(N2),所需时间会随着规模增大而迅速增长。 

图1 目前Flink的两种分发模式 

Flink 1.12版本使用执行拓扑边(ExecutionEdge)存储任务节点间连接的信息。当分发模式为全连接模式时,节点间一共会有O(N2)条边相连,当作业规模较大时会占用大量内存。对于两个全连接边相连且并发度为10k的节点,其连边数量为1亿,总共需要超过4 GiB内存来存储这些连边。在生产作业中可能会有多个全连接边相连的节点,这也就意味着随着作业规模的增长,所需内存也会大幅增长。 

从图1可以看到,对于全连接边相连的任务节点,所有上游节点所产生的结果分区(Result Partition)都是同构的,也就是说这些结果分区所连接的下游任务节点都是完全相同的。全连接边相连的所有下游节点也都是同构的,因为其所消费的上游分区都是相同的。鉴于节点间的JobEdge只有一种分发模式,我们可以按照分发模式对上游分区以及下游节点进行分组。 

对于全连接边,由于其所有下游节点都是同构的,我们可以将这些下游节点划分为一组,称为节点组(ConsumerVertexGroup),全连接边相连的所有上游分区都与这个组连接。同样,所有同构的上游分区也被划分为同一组,称为分区组(ConsumedPartitionGroup),全连接边相连的所有下游节点都与这个组相连。优化方案的基本思路为:将所有消费相同结果分区的下游节点放入同一个节点组中,同时将所有与相同下游节点相连的结果分区放入同一个分区组中,如图2所示。 

图2 两种分发模式下如何对结果分区和任务节点进行分组 

在调度任务节点时,Flink需要遍历每一个上游分区和下游节点间的所有连边。在优化前,由于连边的总数量为O(N2),因此将所有边遍历一遍的总时间复杂度为O(N2)。优化后,执行拓扑边被分组的概念所替代。鉴于所有同构的分区都连接到同一个下游节点组,当Flink需要遍历所有连边时,只需要将该节点组遍历一遍即可,不需要重复遍历所有节点,这样就使得计算复杂度从O(N2)降到O(N)。 

对于点对点的分发模式,上游结果分区与下游节点逐一相连,因此分区组和节点组之间也是点对点相连,分组的数量级和执行拓扑边的数量级是一样的,也就是说,遍历所有连边的计算复杂度依旧是O(N)。 

对于上文我们提到的word count作业,采用上述的分组方式取代执行拓扑边可以将执行拓扑的内存占用从4 GiB降至12 MiB左右。基于分组的概念,我们对作业初始化、任务调度以及故障恢复时计算需要重启的节点等耗时较长的计算逻辑进行了优化。这些计算逻辑都涉及到对上下游之间所有连边进行遍历的操作。在优化后,其计算复杂度都从O(N2)降为O(N)。 

优化任务部署 

对于Flink 1.12版本,当大规模作业内包含全连接边时,部署所有节点需要花费很长时间。此外,在部署过程中容易出现TaskManager心跳超时的情况,进而导致集群不稳定。 

目前任务部署包含以下几个阶段: 

  1. JobManager在主线程内为每一个Task创建任务部署描述符( TaskDeploymentDescriptor ,以下简称TDD); 
  2. JobManager在异步线程内将这些TDD进行序列化; 
  3. JobManager通过RPC通信将序列化后的TDD发送至TaskManager; 
  4. TaskManager基于TDD创建任务并执行。

TDD包含了TaskManager创建任务(Task)时所需的所有信息。当任务部署开始时,JobManager会在主线程内为所有任务节点创建TDD。在创建过程中JobManager无法响应任何其他请求。对于大规模作业,这一过程可能会导致JobManager主线程长时间被阻塞,进一步导致心跳超时,从而触发作业故障。

鉴于任务部署时所有TDD都是由JobManager负责发送至各TaskManager,这导致JobManager可能会成为性能瓶颈。尤其是对于大规模作业,部署时产生的TDD会占用大量内存空间,导致频繁的长时间GC,进一步加重JobManager的负担。

因此,我们需要缩短创建TDD所需的时间,避免心跳超时的发生。此外,如果能够缩减TDD的大小,网络传输所需的时间也会缩短,这样可以进一步加快任务部署的速度。 

为ShuffleDescriptor添加缓存机制 

ShuffleDescriptor用于描述任务在运行时需要消费的上游结果分区的所有信息。当作业规模较大时,ShuffleDescriptor可能是TDD中所占空间最大的一部分。对于全连接边相连的节点,当上游节点和下游节点的并发度都是N时,每一个下游节点需要消费N个上游结果分区,此时ShuffleDescriptor的总数量是N2。也就是说,计算所有节点的ShuffleDescriptor的时间复杂度为O(N2)。

然而,对于同构的下游节点来说,他们所消费的上游结果分区是完全一样的,因此部署时所需要的ShuffleDescriptor内容也是一样的。鉴于此,在部署时不需要为每一个下游节点重复计算ShuffleDescriptor,只需要将计算好的ShuffleDescriptor放入缓存以供复用即可。这样计算TDD的时间复杂度就可以从O(N2)降至O(N)。 

为了缩减RPC消息的大小,进而降低网络传输的开销,我们可以对ShuffleDescriptor进行压缩。对于上文我们提到的word count作业,当节点并发度为10k时,每一个下游节点都会有10k个ShuffleDescriptor,在压缩后其序列化值的总大小降低了72%。 

通过Blob服务器分发ShuffleDescriptor 

Blob(Binary Large Object)以二进制数据的形式存储大型文件。Flink通过blob服务器在JobManager和TaskManager之间传输体积较大的文件。当JobManager需要将大文件传输至TaskManager时,它可以将文件传输至blob服务器(同时会将文件传输至分布式文件系统),并且获得访问文件所需的token。当TaskManager获取到token时,它们会从分布式文件系统(Distributed File System,DFS)下载文件。TaskManager会同时将文件存储到本地blob缓存中方便之后重复读取。 

在任务部署的过程中,JobManager负责将ShuffleDescriptor通过RPC消息分发到对应的TaskManager中。在发送完成后,RPC消息会被垃圾回收器回收处理。但当JobManager创建RPC消息的速度大于发送的速度时,RPC消息会逐渐堆积在内存中并且对GC造成影响,频繁触发长时间的GC。这些GC会导致JobManager停摆,进一步拖慢任务部署的速度。 

为了解决这个问题,Flink可以通过blob服务器来分发大体积的ShuffleDescriptor。首先JobManager将ShuffleDescriptor发送至blob服务器,而blob服务器会将ShuffleDescriptor存储至DFS中,TaskManager在开始处理TDD时会从DFS下载数据。这样JobManager不需要将所有ShuffleDescriptor始终存储在内存中直至对应的RPC消息发出。经过优化后,在部署大规模作业时长时间GC的频率会明显降低。且鉴于DFS为TaskManager提供了多个分布式节点下载数据,JobManager网络传输的压力也得以缓解,不再成为瓶颈,这样可以加快任务部署的速度。

图3 JobManager将ShuffleDescriptor分发至TaskManager

为了避免缓存过多导致本地磁盘空间不足,当ShuffleDescriptor所对应的结果分区被释放时,在blob服务器上存储的对应缓存会被清理。此外我们为TaskManager上ShuffleDescriptor的缓存添加了总大小的限制。当缓存超过一定大小时,缓存会按照最近最少使用(LRU)的顺序移除。这样可以保证本地磁盘不会被缓存占满,特别是对于session模式运行的集群。 

针对Pipelined Region构建的优化 

目前Flink中节点间有两种数据交换类型:pipelined和blocking。对于blocking的数据交换方式,结果分区会在上游全部计算完成后再交由下游进行消费,数据会持久化到本地,支持多次消费。对于pipelined数据交换,上游结果分区的产出和下游任务节点的消费是同时进行的,所有数据不会被持久化且只能读取一次。 

鉴于pipelined的数据流产出和消费同时发生,Flink需要保证pipelined边相连的上下游节点同时运行。由pipelined边相连的节点构成了一个region,被称为Pipelined Region(以下简称region)。在Flink中,region是任务调度和Failover的基本单位。在调度的过程中,同一region内的所有Task节点都会被同时调度,而整个拓扑中所有region会按照拓扑顺序逐一进行调度。 

目前在Flink的调度层面有两种region:逻辑层面的Logical Pipelined Region以及执行调度层面的Scheduling Pipelined Region。逻辑region由逻辑拓扑(JobGraph)中的节点JobVertex构成,而执行region则由执行拓扑(ExecutionGraph)中的节点ExecutionVertex构成。类似于ExecutionVertex基于JobVertex计算产生,执行region是由逻辑region计算得到的,如图4所示。

图4 逻辑region以及执行region

在构建region的过程中会遇到一个问题:region之间可能存在环形依赖。对于当前region,当且仅当其所消费的上游region都产出全部数据后才能进行调度。如果两个region之间存在环形依赖,那么就会出现调度死锁:两个region都需要等对方完成才能调度,最终两个region都无法被调度起来。因此,Flink通过Tarjan强连通分量算法来发现环形依赖,并将具有环形依赖的region合并成一个region,这样就能解决调度死锁的问题。Tarjan强连通分量算法需要遍历拓扑内的所有边,而对于全连接的分发模式来说,其边的数量为O(N2),因此算法整体的计算复杂度为O(N2),随着规模变大会显著增长,从而影响大规模作业初始化的时间。 

图5 具有调度死锁的拓扑 

为了加快region的构建速度,我们可以基于逻辑拓扑和执行拓扑之间的关联进行优化。鉴于一个执行region只能由一个逻辑region中的节点派生,不会出现跨region的情况,Flink在初始化作业时只需要遍历所有逻辑region并逐一转换成执行region即可。转换的方式跟分发模式相关。如果逻辑region内的节点间有任何全连接边,则整个逻辑region可以直接转换成一个执行region。

图6 如何将逻辑region转换成执行region

如果全连接边采用的是pipelined数据交换,所有与之相连的上下游节点都必须同时运行,也就是说全连接边所连接的所有region都要合并成一个region。如果全连接边采用的是blocking数据交换,则会引入环形依赖,如图5所示。在这种情况下所有与之相连的region都必须合并以避免调度死锁,如图6所示。鉴于只要有全连接边就直接生成一整个执行region,在这种情况下不需要用Tarjan算法,整体计算复杂度只需要O(N)即可。 

如果在逻辑region内,所有节点间都只有点对点的分发模式,那么Flink依旧直接用Tarjan算法来检测环形依赖,鉴于点对点的分发模式其边数为O(N),算法的时间复杂度也只有O(N)。 

在优化后,将逻辑region转换成执行region的整体计算复杂度从O(N2)降为O(N)。经测试,对于上文提到的word count作业,当两个节点间的连边为全连接边且数据交换方式为blocking时,构建region的总时间降低了99%,从8,257ms降至120ms。 

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
23天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
26天前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
26天前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
26天前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
|
26天前
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
26天前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
26天前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
26天前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
|
26天前
|
流计算 索引
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作