开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Flink checkpoint(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10044
Flink checkpoint(二)
四、Checkpoint 的执行机制
(1)State 的存储
在讲 Checkpoint 执行机制之前,先对 State 的存储进行一个认知,checkpoint 的主要数据就是 State ,那么 State 存储在哪里?就存储在每个Operator state-backend 里面,
如下图有三个state-backend 类型,每种 state- backend 会分别创建出自己的KeyedStateBackend 和 Operator 、 State 、 Backend ,
这三种state- backend 均会创建同一种 OperatorState Backend ,我们称之为 DefaultOperatorStateBackend ,就是一个存内存的;
这三类会创建 keyed state- backend 对 Memory 和 Fs 来说它都会创建一个 HeapKeyedStateBackend ,也就是储存在内存中的,
RocksDBStateBackend 会创建一个
RocksDBKeyedStateBackend;keyed state- backend 顾名思义会使用 RocksDBKeyedStateBackend存储,框内是需要使用 state- backend 的一个声明,
比如这里写了要使用 FsStateBackend 需要创建一个 node 进行一个配置,如果不配置,配置默认会创建一个
MemoryStateBackend ,但需要 keyed state-backend 时会有 MemoryStateBackend 专门创建相关的
HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 。
从这张图上看 MemoryStateBackend 和 FsStateBackend 完全没有任何区别,创建 keyed state-backend 一般都是创建 HeapKeyedBackend ,它们的区别在于他们在做 Checkpoint 的时候,机制不一样,即对Memory 来说数据一般是直接返回给 master 节点,而FsStateBackend 它将文件路径返回给 master,而RocksDBStateBackend 是将数据写入文件中,将文件路径传递给master ;
下图中绿颜色的数据都存储在内存中,黄颜色的数据存储在 RocksDB 中,数据是混合的。
(2)HeapKeyedStateBackend 存储格式
①支持异步 checkpoint (默认):
CopyOnWriteStateTable[],整体相当于一个 map
②仅支持同步 checkpoint : Map>[],由嵌套 map 的数组构成
③在 MemoryStateBackend 内使用时, checkpoint序列化数据阶段默认有最大 5MB 数据的限制。
(3)RocksDBKeyedStateBackend 存储格式
每个 column family 互相之间是独立的,如下图可以看到 State1、State2 ,它们是不同的 column family , column family 可以区分开不同的文件。可以看一下整个文件是怎么存储的,可以看到RocksDB 的 key 有三部分,分别是 keyGroup 、 Key 和 Namespace keyGroup , keyGroup Key 是表示说这个 key 属于哪个 group ,是个 int 值; key 就是用户所定义的 process-key ;
Namespace其实默认情况下是 void-Namespace ,它主要是使用在 Window 里面,表示 key 属于哪个 Window 。
(4)Checkpoint 执行流程
①接下来看 Checkpoint 执行流程, checkpoint 是存储在 JM 中Checkpoint 、 Coordinator 中的,首先它会向所有的 source 去切割 checkpoint ,
可以看到如下图所示:
②当 task 收到所有的 barrier 之后,它会执行一次快照,它在执行快照的时候会把 barrier 往下游进行广播,然后会将自己的状态异步的写入到持久化存储中,也就是下图中红色的三角形,
如下图:
③当 source 节点异步的写下来之后,它会产生一个 state handle ,返回给 checkpoint coordinator ,这个 handle 的表针告诉 checkpoint 说”我应该做完了我自己这部分 checkpoint ,然后我通过源数据告诉你”同时可以看到 barrier 在整个拐弯里面聚集往下流,
如下图:
④最后一个节点是 sink 节点,涉及到 barrier 之后它同样会执行checkpoint ,这里假设 sink 节点是 RocksDBStateBackend ,那么它有个增量 checkpoint ,首先它会在收集齐 barrier 之后,它会执行一次 checkpoint ,也就是红色的大三角,所谓这样的 checkpoint 其实对于 Flink 而言数据是增量的, Flink 在 task 里面记录了一些之前上传成功的 checkpoint 的信息,那么它可以做一次过滤,就是出来之前还没有上传过得文件,也就是图示中紫色的小三角,将这些部分文件上传到持久化存储中,
如下图:
⑤同样的当存储完之后,同样将需要的 state handle 告诉coordinator ,这时已经收集齐了所有三个 task 的 handle ,在整个Flink 中这次的 checkpoint 是完成的,
如下图:
⑥它会将这些 handle 再转换成 checkpoint 对象,这个对象是包含了整个所有 map 的文化,再将这些数据传入目录下面去,这样整个checkpoint 就完成了,
如下图:
(5)EXACTLY ONCE 和 AT LEAST ONCE
之前也提到了说会有一个 barrier 在,对于若干 check 来说,即EXACT ONCE 是说当它收集齐 barrier 之前,所有数据是先会 buffer ,不往下流传,当 barrier 收集齐之后再往下传,那就意味着说数据是不会重复处理的,而对于 AT LAEAST ONCE 来说,它没有一个 buffer 的过程,只要收集到 barrier 就直接往下传,最后再强调一下所谓的真正的 EXACTLY ONCE Flink ,所说的 EXACTLY ONCE 是它的计算工程,可以做到EXACTLY ONCE ,而 end-to-end 的EXACTLY ONCE 是需要 source 和 sink 的支持,也就是说 source可以 replayed ,比如可以 replay 回一分钟之前的状态,
同样 sink也是需要支持的,目前例如卡不卡是两阶段提交了,那么它就可以实现 sink 的 EXACTLY ONCE ,所以说对于用户来说,比如把数据做完,会发现数据有可能是重复写入,那其实是因为 sink 并没有支持EXACTLY ONCE ,所以要想真正做到 end-to-end 需要将 sink 升级成支持 EXACTLY ONCE 的,否则对普通的文件效果数据可能会被重复写出去,会发现你的输出结果就冗余了。
可参考下图:
(6) 增量 checkpoint
之前提到了所谓的增量 checkpoint ,其实是增量之前没有上传过的数据,可以看看下图的实例, RocksDB 本身做了三四个 checkpoint ,
第一次的时候它的文件是 123sst 和 MANIFEST , sst 文件这里涉及到数据库的概念, sst 文件生产之后是不可变得,不管执行多少次 checkpoint ,哪怕一万次,只要 checkpoint 中有sst 文件,那么文件其实都是一模一样的,正是利用了这点才实现了所谓的增量的 checkpoint ;然后可以在下图看到在第二个checkpoint 的时候是 124sst 和 MANIFEST ,如果第一次成功了只需要上传4点 sst 和一个可变文件 MANIFEST ,因为 MANIFEST 是源数据文件,是可变的,
所以不管之前有没有上传成功它都要上传;在 checkpoint 三的时候,可以在下图中看到有 45 sst 和MANIFEST ,那么这时候 5.sst 也是个新文件,需要将它上传, MANIFEST 文件同样也需要上传。
如果整个 checkpoint 失败了,不是说 task 完成的 checkpoint ,而是因为其他 task 导致整个checkpoint 失败,被认为是不可用的,那么 Flink 机制会保证说checkpoint 数据是不可信的,当为 checkpoint 三的时候说明4. sst 文件并没有上传上去,那么这时候会把4和5 sst 文件都上传一遍,
如下图:
(7)如何从已停止的作业进行状态恢复
在 Flink 里面有两个概念,分别是 Savepoint 和 Externalized ,Checkpoint Savepoint 就是由用户管理触发的数据,它的格式是标准化的,允许作业升级或者配置变更,比较慢,用户也可以从Externalized Checkpoint 机制继续 Flink 的恢复,之后可能Savepoint 对于非必要场景大部分可以被 Externalized Checkpoint 所替代。
(8)已停止的作业进行状态恢复
① Keyed State 的改并发
可以看到下图上面的并发路是3,下面的并发路是4,改并发有个签订是说 KeyGroup 总数是不变的,可以看到对于 Subtask0 来说,它之前的 KeyGroup 是0-3 ,在它改并发后减少了,变成了0-2,也就是需要将之前自己的0-3抽出0-8的部分给自己使用,对于新的Subtask 来说,它需要继承老 Subtask 一部分,并对它进行改并发,
如下图:
② Operator State 改并发
一共有三种划分,分别是如果:是使用 ListState 就是均匀划分;如果是使用 UnionListState 就是 Union 划分, Union 划分是说每个并发上面的数据都会拿到之前所有的数据的总和,每个地方都可以拿到之前所有 state 的数据,然后都进行过滤筛选,而不是用之前所有的 state 进行下一次的 checkpoint ,所以要正确的使用unionstate ;
Boradcast 就比较明确了,因为每个数据都一样,还是获得之前那个备份。
五、回答部分问题
(1) state 大小的推荐,什么时候用 stateBackend ?其实在生产环境中可能只能用 RocksDBStateBackend 了,虽然用FsStateBackend 也不是不可以,但是需要对 State 有个非常深刻的了解,也就是说 checkpoint 数据不会突然间的增大或者减少,使用FsStateBackend 可能会导致作业挂掉。
(2)做 checkpoint 的时候, sink 如果挂了, Flink 会有什么处理?
之前在 Checkpoint 执行流程图中讲到过,比如 sink 挂了,它没有把 state handle 返回回去,这个时候会有两种情况:一种是直接告诉 JM ,告诉你这种情况废掉了,没有用,那么这时 JM 会把它所收到的所有的 handle ,逐一异步的将 handle 路径的文件全部给清理掉;
第二种情况是做失败了,那么 checkpoint 这边有一个超时的时间限制, 默认为10分钟,也就是从出发到10分钟这个时间若仍然没有收集齐所有的 handle ,会认为 checkpoint 超时了,也是不可用的,也会将所有的 checkpoint handle 对应的文件异步的进行删除。
(3)RocksDB 文件对应一个或几个 Operator State 结构吗?如果Operator State 内容没有改变,它们使用的文件应该是一样的吗?可能很多人觉得一开始不太理解什么是增量 checkpoint ,其实增量checkpoint 它所依赖的基础是 LSM ,当文件生成完备之后,这个文件就不会再更新了,它是基于这样一种概念。
(4) RocksDBState 开销很大吗?
对,因为 RocksDB 写的时候要进行序列化,读的时候要进行一些反序列化,那么当KB比较大时,开销确实很大。