Flink 批作业如何在 Master 节点出错重启后恢复执行进度?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文由阿里云研发工程师李俊睿撰写,介绍了Flink 1.20版中新引入的批作业进度恢复功能。文章涵盖背景、解决思路、使用效果及启用方法。此前,若JobMaster故障,批作业需重头开始,造成进度丢失。新功能通过将JM状态持久化至外部存储并在故障后利用这些状态恢复作业进度,避免了这一问题。使用该功能需启用集群高可用并配置相关参数。

摘要:本文撰写自阿里云研发工程师李俊睿(昕程),主要介绍 Flink 1.20 版本中引入了批作业在 JM failover 后的进度恢复功能。主要分为以下四个内容:

  1. 背景
  2. 解决思路
  3. 使用效果
  4. 如何启用

一、背景

在 Flink 1.20 版本之前,如果 Flink 的 JobMaster(JM)发生故障导致被终止,将会发生如下两种情况:

  • 如果作业未启用高可用性(HA),作业将失败。

  • 如果作业启用了 HA,JM 会被自动重新拉起 (JM failover)。在这种情况下,流作业将从最后一个成功的检查点恢复。然而,批作业由于缺乏检查点机制,将不得不从头开始运行,导 致之前的所有进度丢失。这对于需要长时间运行的批作业来说,意味着巨大的回退。

为了解决这一问题,我们在 Flink 1.20 版本中引入了批作业在 JM failover 后的进度恢复功能。这一功能的目的是使批作业在 JM failover 后能够尽可能地恢复到出错前的进度,避免重新运行已完成

的任务。

二、解决思路

为了实现这一目标,我们需要能够将 JM 的状态持久化到外部存储,从而在 JM 发生 failover 后,Flink 能够利用这些状态信息恢复作业到之前的运行进度。

我们设计了一种基于事件的 JM 状态恢复机制,在作业正常运行时,JM 会将状态变更事件写入外部持久化存储,以确保在 JM failover 后仍能获得作业的执行进度。此外,我们还需要解决 JM failover 后实际作业状态与状态变更事件可能不一致的问题。例如,某些 TaskManager (TM)在运行过程中意外丢失,可能导致中间数据结果无法访问。因此,Flink 必须从 TM 和 Remote Shuffle Service (RSS)获取中间结果数据的信息,来对作业运行进度的恢复结果进行校准。

该功能的整体流程分为如下几个阶段:

  1. 作业执行时 我们引入了 JobEventStore 组件,该组件负责在作业正常运行期间将 JM 的状态变更事件写入到外部文件系统中。其中需要被写入的状态变更事件分为如下以下几类:

    (1)自适应执行计划优化:Flink 会自适应地优化批作业的执行计划,这些优化结果是基于上游的执行结果来确定的。如果每次都依赖上游的执行结果进行重建,将会产生较大的开销。因此,记录这些优化结果对于任务调度和容错非常重要。

    (2)已经结束的 Task 信息:保存已完成任务的执行进度,以便在恢复作业时能够准确地继续从上次执行的位置开始。

    (3)OperatorCoordinator 状态:OperatorCoordinator 负责协调算子,实行算子之间的通信,其状态与数据一致性密切相关。例如,SourceCoordinator 中包含记录哪些数据分片已经分发的状态信息。重建该组件的状态有助于保证数据的一致性。

    (4)ShuffleMaster 状态:Flink 目前支持 RSS,而 RSS 的 Shuffle Master 可能会保存一些状态信息,如 Shuffle 数据的元数据。为了使新的 JobManager 能够复用这些中间结果,恢复 Shuffle Master 的状态是必不可少的。

  1. JM failover 期间 Flink 批作业在运行过程中,其中间结果数据会保存在 TM 上和 RSS 上。当 JM 发生故障时,TM 和 RSS 将保留与作业相关的中间结果数据,并不断地尝试重新连接到 JM。一旦新的 JM 重新被拉起来后,TM 和 RSS 将重新与 JM 建立连接,然后 TM 和 RSS 会主动上报它们持有的中间结果数据。

  1. JM failover 后的作业进度恢复

一旦 JM 重启,它会与 TM 和 RSS 重新建立连接,利用 JobEventStore 中记录的事件以及 TM 和 RSS 保留的中间结果数据,来重建作业的执行进度。

JM 首先会利用 JobEventStore 中记录的事件,恢复作业各个节点的执行状态。

然后根据 OperatorCoordinator 的状态,JM 会恢复尚未处理的 Source 数据分片,以避免数据丢失或重复。

随后,JM 将根据汇报上来的可用中间数据进一步校正执行进度。如果某个 task 产生的中间数据丢失,但这些数据仍被下游 task 所需要,那么该 task 将被重置并重新执行。

最后作业将从恢复出来的进度继续执行。

三、使用效果

以下是一个 JM 出错重启后进度恢复的效果示例。

该批作业的拓扑结构为 Source -> Map -> Sink ,当作业运行到 Map 节点时,因为外部服务的原因导致 JM 所在机器下线,从而造成了 JM failover。

随后,高可用服务将会自动拉起新的 JM 进程,作业将进入 RECONCILING 状态,表示作业进入了恢复运行进展的阶段。

当作业恢复完成后,将进入 RUNNING 状态。

点进作业详情页后,可以观察到作业已经恢复到 JM failover 前到进展了。

四、如何启用

要使用 Flink 批作业的状态恢复功能,用户需要:

  1. 确保已启用集群高可用:目前 Flink 提供了基于 Zookeeper 和 Kubernetes 的两种高可用服务,更多细节详见官方文档
  2. 配置 execution.batch.job-recovery.enabled: true

所有 new source 都支持批处理作业在 JM 出错后进行进度恢复。然而,为了实现细粒度的进度恢复,new sourceSplitEnumerator 需要实现 SupportsBatchSnapshot 接口,否则只有在该 source 的所有并发任务完成后,才能在 JM 出错恢复后避免重新执行这个 source 的 task。当前,FileSource 和 HiveSource 已经实现了该接口。详细信息请参见官方文档

考虑到不同集群和作业的差异,为了让批作业在 job master failover 后能够尽可能的恢复出错前的进度,避免重新运行已完成的任务,用户可以参考此文档进行配置项调优。


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc

retouch_2024070417440476.jpg

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL Prometheus 监控
实时计算 Flink版产品使用问题之作业频繁重启该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之集群重启后,所有的Jobs任务丢失,如何快速恢复
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之从检查点重启任务,怎么在YAML配置文件中添加检查点的路径
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之从检查点重启任务,怎么在YAML配置文件中添加检查点的路径
|
6月前
|
SQL 监控 关系型数据库
实时计算 Flink版产品使用问题之使用master分支后,如何过滤掉DML
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之修改ddl能通过savepoint进行重启吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 SQL 关系型数据库
实时计算 Flink版产品使用问题之要配置MySQL集群存储节点,该如何配置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL API 数据处理
实时计算 Flink版产品使用问题之如何避免集群重启后job信息和运行状态丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL Java 持续交付
实时计算 Flink版产品使用问题之源数据库一直在新增表或修改表结构,需要进行相应的修改和重启,该如何简化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 缓存 Oracle
实时计算 Flink版产品使用问题之如何实现重启后直接跑最新的任务而不是根据checkpoint跑历史数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版