Flink 四大基石之 Checkpoint 使用详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 的 Checkpoint 机制通过定期插入 Barrier 将数据流切分并进行快照,确保故障时能从最近的 Checkpoint 恢复,保障数据一致性。Checkpoint 分为精确一次和至少一次两种语义,前者确保每个数据仅处理一次,后者允许重复处理但不会丢失数据。此外,Flink 提供多种重启策略,如固定延迟、失败率和无重启策略,以应对不同场景。SavePoint 是手动触发的 Checkpoint,用于作业升级和迁移。Checkpoint 执行流程包括 Barrier 注入、算子状态快照、Barrier 对齐和完成 Checkpoint。

一、Checkpoint 剖析

1. 什么是 Checkpoint

Checkpoint 是 Flink 为了实现流计算的容错性而引入的一种机制。它通过定期在数据流中插入 Barrier(屏障),将数据流切分成段,每个 Barrier 会触发 Flink 对当前状态进行一次快照,记录下算子的状态信息,包括正在处理的数据以及已经处理过的数据的位置等。当作业发生故障时,Flink 可以从最近的一个 Checkpoint 恢复,重新处理未完成的部分,从而保证数据不丢失且不重复处理。

2. Checkpoint 的作用

  • 数据一致性保障:在分布式环境下,流计算作业可能会因为各种故障(如节点宕机、网络故障等)而中断。Checkpoint 机制确保了即使发生故障,作业也能从故障前的状态恢复,保证数据处理的一致性,避免数据丢失或重复处理的问题。
  • 作业恢复:当作业出现故障时,Flink 能够快速定位到最近的 Checkpoint,并从该 Checkpoint 恢复作业的状态,使得作业能够继续运行,减少因故障导致的停机时间。

3. Checkpoint 的类型

  • 精确一次(Exactly - Once):这是 Flink 默认的 Checkpoint 语义,确保每个数据在整个流处理过程中仅被处理一次,即使发生故障恢复也不会出现数据重复或丢失的情况。Flink 通过 Barrier 对齐机制来实现精确一次语义,保证每个算子在处理数据时,只有当所有输入流都收到相同编号的 Barrier 时,才会触发 Checkpoint。
  • 至少一次(At - Least - Once):在这种语义下,数据可能会被处理多次,但不会丢失。相比于精确一次,至少一次语义的实现相对简单,不需要严格的 Barrier 对齐,性能相对较高,但可能会因为故障恢复导致部分数据重复处理。

二、重启策略解读

1. 固定延迟重启策略(Fixed Delay Restart Strategy)

这种策略在作业失败后,会按照固定的时间间隔进行重启,直到达到最大重启次数。例如,设置最大重启次数为 5 次,每次重启间隔为 10 秒。如果作业在 5 次重启内成功恢复,则继续运行;如果 5 次重启后仍未成功,作业将终止。

2. 失败率重启策略(Failure Rate Restart Strategy)

该策略根据作业的失败率来决定是否重启。在指定的时间窗口内,如果作业的失败次数超过设定的阈值,则停止重启并终止作业。例如,设定在 1 小时内,作业失败次数超过 3 次,则不再重启。这种策略适用于那些不稳定的作业,避免因为频繁重启而消耗过多资源。

3. 无重启策略(No Restart Strategy)

当作业失败时,不会进行自动重启。这种策略适用于那些不希望自动重启,而是需要人工介入处理故障的作业场景,例如涉及到敏感数据处理或需要复杂故障排查的作业。

三、SavePoint

1. 什么是 SavePoint

SavePoint 是一种手动触发的 Checkpoint,它与自动 Checkpoint 不同,通常由用户或管理员手动创建。SavePoint 包含了作业的完整状态,包括所有算子的状态以及作业的元数据信息。

2. SavePoint 的用途

  • 作业升级:当需要对作业进行升级,例如修改代码逻辑、更新依赖库等,可以先创建一个 SavePoint,然后停止当前作业,基于 SavePoint 启动新的作业版本。这样可以保证作业在升级过程中不会丢失数据,并且能够从原作业的状态继续运行。
  • 迁移作业:在不同的 Flink 集群之间迁移作业时,可以使用 SavePoint。先在原集群上创建 SavePoint,然后将 SavePoint 数据迁移到目标集群,并基于该 SavePoint 在目标集群上启动作业,实现作业的无缝迁移。

3. 如何创建和使用 SavePoint

  • 创建 SavePoint:可以通过 Flink 命令行工具flink savepoint <jobId>来创建 SavePoint,jobId为要创建 SavePoint 的作业 ID。
  • 使用 SavePoint:使用flink run -s <savepointPath> <jobJar>命令来基于 SavePoint 启动作业,其中<savepointPath>为 SavePoint 的存储路径,<jobJar>为作业的 Jar 包路径。

四、Checkpoint 执行流程

  1. Barrier 注入:Flink 的 JobManager 会周期性地向数据源发送 Checkpoint Barrier,这些 Barrier 会随着数据流在算子之间传递。
  2. 算子状态快照:当算子接收到 Barrier 时,会暂停处理新的数据,对当前的状态进行快照。例如,对于有状态的算子(如窗口算子、聚合算子等),会将其内部状态(如窗口中的数据、聚合结果等)保存到持久化存储(如 HDFS)中。
  3. Barrier 对齐:在多输入流的算子中,需要等待所有输入流都收到相同编号的 Barrier,才会继续处理新的数据,并将 Barrier 向下游传递。这一步确保了所有算子在相同的时间点进行状态快照,从而保证 Checkpoint 的一致性。
  4. 完成 Checkpoint:当所有算子都完成状态快照,并将 Barrier 传递到 Sink 算子时,整个 Checkpoint 过程完成。JobManager 会记录下这次 Checkpoint 的元数据信息,包括 Checkpoint 的编号、各个算子状态的存储位置等。

五、检查的算法

1. Chandy - Lamport 算法

Flink 的 Checkpoint 机制基于 Chandy - Lamport 算法进行改进。该算法的核心思想是通过在分布式系统中广播标记消息(即 Flink 中的 Barrier),使得各个节点能够在同一时间点对自身状态进行快照。在 Flink 中,每个算子在接收到 Barrier 后,会将其本地状态保存到持久化存储中,并继续处理已在缓冲区中的数据,直到所有输入流都收到相同编号的 Barrier,从而实现了分布式环境下的一致性状态快照。

六、应用案例

实时电商订单处理

假设我们有一个实时处理电商订单的 Flink 作业,该作业需要对订单进行实时统计分析,如计算订单总额、订单数量等。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Checkpoint间隔为5分钟
env.enableCheckpointing(5 * 60 * 1000);
// 设置精确一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DataStream<Order> orderStream = env.addSource(new OrderSource());
orderStream
 .keyBy(Order::getShopId)
 .window(TumblingProcessingTimeWindows.of(Time.minutes(10)))
 .aggregate(new OrderAggregateFunction())
 .print();
env.execute("E - commerce Order Processing");

在上述代码中,我们启用了 Checkpoint 机制,设置 Checkpoint 间隔为 5 分钟,并采用精确一次语义。这样,即使在作业处理过程中出现故障,也能保证订单数据的准确处理,不会出现重复或丢失订单的情况。同时,通过窗口机制对订单进行按店铺 ID 分组,每 10 分钟统计一次店铺的订单信息。

实时日志监控

假设有一个实时监控系统日志的 Flink 作业,需要统计不同类型日志的出现次数,并及时发现异常日志。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Checkpoint间隔为3分钟
env.enableCheckpointing(3 * 60 * 1000);
// 设置重启策略为固定延迟重启,最大重启次数为3次,每次间隔1分钟
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1 * 60 * 1000));
DataStream<LogEntry> logStream = env.addSource(new LogSource());
logStream
 .keyBy(LogEntry::getLogType)
 .window(TumblingEventTimeWindows.of(Time.minutes(5)))
 .process(new LogCountProcessFunction())
 .print();
env.execute("Real - time Log Monitoring");

在这个案例中,我们启用了 Checkpoint 机制,设置 Checkpoint 间隔为 3 分钟,以确保在故障时能够快速恢复作业状态。同时,设置了固定延迟重启策略,作业失败后最多重启 3 次,每次间隔 1 分钟。通过窗口机制对不同类型的日志进行按时间窗口统计,及时发现日志中的异常情况。

七、总结

Checkpoint 机制是 Flink 实现容错性和数据一致性的核心功能之一。通过深入理解 Checkpoint 的原理、重启策略、SavePoint 的使用以及 Checkpoint 的执行流程和相关算法,我们能够更好地在实际项目中应用 Flink 进行可靠的流数据处理。在设计和开发 Flink 作业时,合理配置 Checkpoint 和重启策略,能够有效地提高作业的稳定性和可靠性,确保在面对各种故障时,作业能够快速恢复并继续准确地处理数据。同时,SavePoint 为作业的升级和迁移提供了有力的支持,使得我们能够在不丢失数据的情况下,灵活地对作业进行管理和维护。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3天前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
56 28
|
4天前
|
传感器 监控 数据挖掘
Flink 四大基石之 Time (时间语义) 的使用详解
Flink 中的时间分为三类:Event Time(事件发生时间)、Ingestion Time(数据进入系统时间)和 Processing Time(数据处理时间)。Event Time 通过嵌入事件中的时间戳准确反映数据顺序,支持复杂窗口操作。Watermark 机制用于处理 Event Time,确保数据完整性并触发窗口计算。Flink 还提供了多种迟到数据处理方式,如默认丢弃、侧输出流和允许延迟处理,以应对不同场景需求。掌握这些时间语义对编写高效、准确的 Flink 应用至关重要。
58 21
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
5月前
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
|
5月前
|
存储 缓存 数据处理
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
|
5月前
|
存储 分布式计算 算法
Flink四大基石——4.Checkpoint容错机制
Flink四大基石——4.Checkpoint容错机制
118 1
|
5月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现

相关产品

  • 实时计算 Flink版