那个男人竟然不会Flink的CheckPoint机制(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这篇来讲Flink另一个比较重要的知识,就是它的容错机制checkpoint原理。

TaskManager(非source Task接收)


在上一篇《背压原理》又或是这篇的基础铺垫上,其实我们可以看到在Flink接收数据用的是InputGate,所以我们还是回到org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput这个方法上

随后定位到处理数据的逻辑:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

想点击进去,发现有两个实现类:

  • BarrierBuffer
  • BarrierTracker

24.jpg

这两个实现类其实就是对应着AT_LEAST_ONCEEXACTLY_ONCE这两种模式。

/**
 * The BarrierTracker keeps track of what checkpoint barriers have been received from
 * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
 * it notifies its listener of a completed checkpoint.
 *
 * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
 * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
 * guarantees. It can, however, be used to gain "at least once" processing guarantees.
 *
 * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
 */
/**
 * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
 * all inputs have received the barrier for a given checkpoint.
 *
 * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
 * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
 * the blocks are released.
 */

简单翻译下就是:

  • BarrierTrackerat least once模式,只要inputChannel接收到barrier,就直接通知完成处理checkpoint
  • BarrierBufferexactly-once模式,当所有的inputChannel接收到barrier才通知完成处理checkpoint,如果有的inputChannel还没接收到barrier,那已接收到barrierinputChannel会读数据到缓存中,直到所有的inputChannel都接收到barrier,这有可能会造成反压。

说白了,就是BarrierBuffer会有对齐barrier的处理。

这里又提到exactly-onceat least once了。在文章开头也说过Flink是可以实现exactly-once的,含义就是:状态只持久化一次最终的存储介质中(本地数据库/HDFS)。

在这里我还是画个图和举个例子配合BarrierBuffer/BarrierTracker来解释一下。

现在我有一个Topic,假定这个Topic有两个分区partition(又或者你可以理解我设置消费的并行度是2)。现在要拉取Kafka这两个分区的数据,由算子Map进行消费转换,期间在转化的时候可能会存储些信息到State(Flink给我们提供的存储,你就当做是会存到HDFS上就好了),最终输出到Sink

25.jpg

从上面的知识点我们应该可以知道, 在Flinkcheckpoint的时候JobManager往每个Source任务(简单对应图上的两个paritiion) 发送checkpointId,然后做快照存储。

显然,source任务存储最主要的内容就是消费分区的offset嘛。比如现在source 1offerset100,而source2offset105

26.jpg

目前看来source2的数据会比source1的数据先到达Map

假定我们用的是BarrierBufferexactly-once模式,那么source2barrier到达Map算子的后,source2之后的数据只能停下来,放到buffer上,不做处理。等source1barrier来了以后,再真正处理source2放在buffer的数据。

这就是所谓的barrier对齐

27.jpg

假定我们用的是BarrierTrackerat least once模式,那么source2barrier到达Map算子的后,source2之后的数据不会停下来等待source1,后面的数据会继续处理。

28.jpg

现在问题就来了,那对不对齐的区别是什么呢?

依照上面图的的运行状态(无论是BarrierTrackerat least once模式还是BarrierBufferexactly-once模式),现在我们的checkpoint都没做,因为source1barrier还没到sink端呢。现在Flink挂了,那显然会重新拉取source 1offerset小于100,而source2offset小于105的数据,State的最终信息也不会保存。

29.jpg

checkpoint从没做过的时候,对数据不会产生任何的影响(所以这里在Flink的内部是没啥区别的)

而假设我们现在是BarrierTrackerat least once模式,没有任何问题,程序继续执行。现在source1barrier也走到了slink,最后完成了一次checkpoint

30.jpg

由于source2barriersource1barrier要快,那么source1所处理的State的数据实际是包括offset>105的数据的,自然Flink保存的时候也会把这部分保存进去。

程序继续运行,刚好保存完checkpoint后,此时系统出了问题,挂了。因为checkpoint已经做完了,所以Flink会从source 1offerset100,而source2offset105重新消费。

但是,由于我们是BarrierTrackerat least once模式,所以State里边的保存状态实际上有过source2offset 大于105 的记录了。那source2重新从offset105开始消费,那就是会重复消费!

31.jpg

理解了上面所讲的话,那再看BarrierBufferexactly-once模式应该就不难理解了(各位大哥大嫂你也要经过这个operator处理保存吗?我们一起吧?有问题,我们一起重来,没问题我们一起保存

32.jpg

无论是BarrierTracker还是BarrierBuffer也好,在处理checkpoint的时候都需要调用notifyCheckpoint() 方法,而notifyCheckpoint()方法最终调用的是triggerCheckpointOnBarrier

33.jpg

triggerCheckpointOnBarrier()最终还是会调用performCheckpoint()方法,所以无论是source接收到checkpoint还是operator接收到checkpoint,最终还是会调用performCheckpoint()方法。

34.jpg

大家有兴趣可以进去checkpointState()方法里边详细看看,里边会对State状态信息进行写入,完成后上报给TaskManager

35.jpg


TaskManager总结


36.jpg

  • TaskExecutor接收到JobManager下发的checkpoint,由triggerCheckpoint方法进行处理
  • triggerCheckpoint方法最终会调用org.apache.flink.streaming.runtime.tasks.StreamTask#triggerCheckpoint,而最主要的就是performCheckpoint方法
  • performCheckpoint方法会对checkpoint做前置处理,barrier广播到下游,处理State状态做快照,最后回到成功消息给JobManager
  • 普通算子由org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput这个方法读取数据,具体处理逻辑在getNextNonBlocked方法上。
  • 该方法有两个实例,分别是BarrierBufferBarrierTracker,这两个实例对应着checkpoint不同的模式(至少一次和精确一次)。精确一次需要对barrier对齐,有可能导致反压的情况
  • 最后处理完,会调用notifyCheckpoint方法,实际上还是会调performCheckpoint方法

所以说,最终处理checkpoint的逻辑是一致的,只是会source会直接通过TaskExecutor处理,而普通算子会根据不同的配置在接收到后有不同的实例处理:BarrierTracker/BarrierBuffer


JobManager接收回应


前面提到了,无论是source还是普通算子,都会调用performCheckpoint方法进行处理。

performCheckpoint方法里边处理完State快照的逻辑,会调用reportCompletedSnapshotStates告诉JobManager快照已经处理完了。

reportCompletedSnapshotStates方法里边又会调用acknowledgeCheckpoint方法通过RPC去通知JobManager

37.jpg

兜兜转转,最后还是会回到checkpointCoordinator上,调用receiveAcknowledgeMessage进行处理

38.jpg

进入到receiveAcknowledgeMessage上,主要就是下面图的逻辑:处理完返回不同的状态,根据不同的状态进行处理

39.jpg

主要我们看的其实就是acknowledgeTask方法里边做了些什么。

PendingCheckpoint维护了两个Map:

//  已经接收到 Ack 的算子的状态句柄
private final Map<OperatorID, OperatorState> operatorStates;
// 需要 Ack 但还没有接收到的 Task
private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;

然后我们进去acknowledgeTask简单了解一下可以发现就是在处理operatorStatesnotYetAcknowledgedTasks

synchronized (lock) {
   if (discarded) {
    return TaskAcknowledgeResult.DISCARDED;
   }
    // 接收到Task了,从notYetAcknowledgedTasks移除
   final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
   if (vertex == null) {
    if (acknowledgedTasks.contains(executionAttemptId)) {
     return TaskAcknowledgeResult.DUPLICATE;
    } else {
     return TaskAcknowledgeResult.UNKNOWN;
    }
   } else {
    acknowledgedTasks.add(executionAttemptId);
   }
    // ...
   if (operatorSubtaskStates != null) {
    for (OperatorID operatorID : operatorIDs) {
     // ...
     OperatorState operatorState = operatorStates.get(operatorID);
     // 新来的operatorID,添加到operatorStates
     if (operatorState == null) {
      operatorState = new OperatorState(
       operatorID,
       vertex.getTotalNumberOfParallelSubtasks(),
       vertex.getMaxParallelism());
      operatorStates.put(operatorID, operatorState);
     }
          //....
    }
   }

等到所有的Task都到齐以后,就会调用isFullyAcknowledged进行处理。

40.jpg

最后调用completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();来实现最终的存储,所有完毕以后会通知所有的Task 现在checkpoint已经完成了。

41.jpg


最后


总的来说,这篇文章带着大家走马观花撸了下Checkpoint,很多细节我也没去深入,但我认为这篇文章可以让你大概了解到Checkpoint的实现过程。

最后再来看看官网的图,看完应该大概就能看得懂啦:

42.jpg

相信我,或许你现在还没用到Flink,但等你真正去用Flink的时候,checkpoint是肯定得搞搞的

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
82 3
|
4月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
4月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
64 0
|
4月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
|
4月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
4月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
4月前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
|
4月前
|
流计算 索引
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
|
4月前
|
存储 Java 流计算
Flink 分布式快照,神秘机制背后究竟隐藏着怎样的惊人奥秘?快来一探究竟!
【8月更文挑战第26天】Flink是一款开源框架,支持有状态流处理与批处理任务。其核心功能之一为分布式快照,通过“检查点(Checkpoint)”机制确保系统能在故障发生时从最近的一致性状态恢复,实现可靠容错。Flink通过JobManager触发检查点,各节点暂停接收新数据并保存当前状态至稳定存储(如HDFS)。采用“异步屏障快照(Asynchronous Barrier Snapshotting)”技术,插入特殊标记“屏障(Barrier)”随数据流传播,在不影响整体流程的同时高效完成状态保存。例如可在Flink中设置每1000毫秒进行一次检查点并指定存储位置。
73 0