前言
Exactly-once投递语义是分布式系统中最常遇到的话题,Exactly-once保证了哪怕系统发生故障,每一个 ETL 操作也仅会被执行一次,不会产生数据的丢失或者重复。这是数据投递的最强保证,很难实现这个等级的流处理数据投递。接下来是对于一些业界主流的Exactly-once投递语义的调研。本文主要关注的是方案的实现和技术点,重点在于方案的failover机制和高可用,对于分布式系统可能会出现的僵尸线程等问题不予深究。
本文的内容将会从数据源端、消费端、输出端进行分析,其中数据源端和输出端对于流处理引擎本身来说都是外部存储,流处理引擎不能仅依靠自身实现Exactly-once投递方案,还需要数据源端和输出端的协同配合。
数据源端
为了便于理解,数据传输的基本单元我们统称为event,event是一个广义的概念,可以是mysql中的一个事务,也可以是自定义的一批数据。要配合工具实现Exactly-once需要满足两个条件:支持重算、记录event的offset
支持重算(必需)
任何节点出现fail重启,由于计算结果还没有完成,我们都需要上游节点重算,否则event丢失。
记录event的offset(非必需)
要求记录event处理的进度,并保证存储结果不出现重复,能够在fail over时不重发offset之前的消息。
消费端为了配合数据源端的Exactly-once语义会产生两种策略:
- 假设数据源无法主动维护自身event的offset,需要下游的消费端内部维护event offset,并且需要保证记录event offset和计算结果高可用是一个原子操作,一条数据对应一个offset,既保证该条数据计算结果存储下来后就不会重算了。
- 存储结果是一个幂等操作,这样就可以先存储计算结果,然后再更新event offset。即使在存储计算结果的过程中节点fail重启,重算上游数据也不会出现问题。
消费端
消费端主要探讨一下Kafka/Spark/Flink三种流计算工具的Exactly-once实现
Kafka的内部Exactly-once
Kafka数据流中的每个partition的数据传递都能够保证Exactly-once,producer保证不重复,consumer幂等,结果高可用,这就是为什么Kafka Streams API提供的Exactly-once保证是迄今为止任何流处理系统中的最强实现的原因。
consumer幂等
Kafka Stream的计算节点的上游信息都来自分布式partition中commit之后的数据, 在queue里的数据都有确定的sequenceId, 所以只要计算节点记录好自己当前处理的sequenceId, 处理完一个信息就更新自己的sequenceId, 并且commit到可靠dataStore里, 就绝对不会重复处理上游event, 而只要没有commit这个位置则可以无数次replay当前的record(当前节点只会处理sequenceId+1的消息)。
结果高可用
就是保证自身节点state状态的更新,其实是写一个内部隐藏的state的change log的topic,和一个本地key value表(也就是本计算节点的state)。failover的时候, 之前的"本地"表丢失没关系, 可以从change log里恢复出失败前确定commit的所有state;
producer保证不重复
Kafka内部有一套完整的Transactional Messaging机制来保证事务的原子性和隔离性,会启用一个TransactionCoordinator负责管理broker和producer之间的事务消息,主要包括管理一个epochId保证事务消息的原子性,以及协助实现2pc两阶段提交。
计算结束后,commit本次的tx,由Kafka Transactional Messaging来保证(1)2pc两阶段提交往下游发的消息。(2)记录event stream的消费进度。(3)所有的state的所有更新是一个原子操作, 由于结果都成功写入Kafka topic,所以达到计算结果的高可用性
小结
实际上,上文内容只是从producer和consumer的角度保证了Kafka stream的Exactly-once语义,Kafka内部Transactional Messaging实现还会涉及到zombie fencing等复杂的场景,这里主要还是用到其原子性。那么简单小结一下Kafka的Exactly-once语义实现,就是保证了系统内部每个节点的端到端Exactly-once投递,sequenceId保证了下游去重,change log保证了高可用,2pc和offset保证不重发。
Spark的内部Exactly-once
Spark的基本数据单元是一种被称作是RDD(分布式弹性数据集)的数据结构,Spark内部程序通过对RDD的进行一系列的transform和action操作,完成数据的分析处理。那么Spark内部的Exactly-once就是通过checkpoint机制保证RDD的容错恢复,如果中间节点出现故障,恢复之后,只需要接着上次 checkpoint 的记录做恢复即可,对于失败前那个未完成的事务执行回滚操作(abort)就可以了。
Spark的checkpoint机制主要包含两种策略:血缘机制和micro-batch checkpoint
血缘机制
首先说一下RDD的血缘机制。当我们计算一个RDD时,会依赖一个或多个父RDD的数据,而这些父RDD又会依赖它自身的父RDD,这样RDD之间的依赖关系就形成了一个有向无环图(也叫DAG图),这些依赖关系被记录在一个图中,这就是RDD的血缘(也叫RDD Lineage)。基于这个DAG图,在fail over时就能够根据上游的数据重算出RDD中丢失的数据。
micro-batch checkpoint
但是每个RDD的process都会在关系图上增加一个新的节点,这些数据都记录下来的话,关系图会出现爆炸式的增长。因此,需要micro-batch和checkpoint机制来减少维护关系图带来的负担。通过异步的checkpoint来截断lineage也就是各个节点状态和计算结果复杂的关系。比如一个数据如果已经checkpoint了, 那么它所依赖的所有状态和计算结果都可以在关系图里删去, 因为replay如果依赖于这个数据, 那么使用它的checkpoint即可, 而不需要知道这个数据是怎么算出来的, 如果还没checkpoint成功, 则需要根据数据依赖图来重算这个数据。 像这样利用checkpoint, 就可以防止lineage无限增长。在 Spark Streaming 中,JobGenerator 用于生成每个 batch 对应的 jobs,定时器一到,就会启动这个job去重算关系图中的数据,然后将得到的RDD数据持久化到外部的可靠存储中,例如HDFS。
如图所示,每一层数据中的micro-batch第一条信息都要等待最后一条信息处理完成之后,才能传给下游。并且这个等待是会叠加的,当stream的层数不断的增加,每一层的micro-batch都需要等待最后一条数据,这样的造成的latency会叠加式的增高,如同蝴蝶效应一般不断的发展,最终会造成Spark很高的端到端处理的latency。为了解决这个问题,Flink给出了不需要使用micro-batch的方案,可以不记录所有中间的计算结果。
小结
checkpoint机制只是能够保证系统内部的Exactly-once投递,不对系统外的投递语义做出保证,因此需要在投递时做出策略的调整,具体看第三部分。
Flink的内部Exactly-once
Checkpoint机制
Flink的Checkpoint机制是基于Chandy-Lamport算法的思想改进而来,引入了Checkpoint Barrier的概念,可以在不停止整个流处理系统的前提下,让每个节点独立建立检查点保存自身快照,并最终达到整个作业全局快照的状态。有了全局快照,当我们遇到故障或者重启的时候就可以直接从快照中恢复,这就是Flink容错的核心。
如图所示,在流处理的过程中,例如在持续处理A1,A2,A3三条数据过程中,这个时候系统崩溃了,那么我们只要回到系统没有见过A1之前的状态就可以了,重启恢复之后再重放A1,A2,A3,那么这些消息就能保证Exactly-once投递了。关键的问题就在于我们如何保存没有见过A1之前的状态,我们看一下Flink如何使用checkpoint机制来实现Exactly-once投递。
Flink内部维护了一个高可用的coordinator,不断地在数据源发出的数据流中,插入不同的stage barrier,比如先给所有的数据源发barrier-a,然后1分钟后发barrier-b,如此类推。所有的节点都必须忠实的转发这些stage barrier,为了对节点的不同状态进行划分:
- 每个节点都分为接收到某barrier(设为barrier-a)之前的信息和收到barrier-a之后的信息,
- 所有的发给下游的计算结果也分为自己发出barrier-a之前的信息和发出barrier-a之后的信息;
- 所有的状态变迁也分为,用所有接收到barrier-a之前的信息建立的状态, 和收到barrier-a之后被新的信息影响了的状态;
全局一致
理解了checkpoint机制的运行机制后,我们来看一下如何进行failover的。首先需要明确两个概念,全局一致点和全局一致状态集。全局一致点是相对于计算节点的,全局一致状态集是相对于整个系统的。如上图所示,每个分割点都表示该计算节点达到了全局一致点。对于barrier-a而言,随着event stream的不断推进,达到了barrier-a的全局一致点也在不断的增加,当作业的所有节点都确认了之后,就得到了一个全局一致状态集。
两个细节:
- 如图,一个节点可能会存在多个input channel,每个input channel都会携带一个barrier-a的消息。收到任意input channel 的barrier-a之后,block此channel。当前阶段的所有input channel都接受到barrier-a的消息后,把当前状态checkpoint。 并且处理完所有此前收到的信息并向下游发送计算结果完毕后, 向所有和自己相连的下游转发barrier-a。
- 当每个阶段的所有节点都备份完成,我们就得到了一个全局一致状态集,,既整个系统的snapshot。系统的稳定点就进步到了barrier-a。但是对于整个流计算过程的所有节点来说,不可能所有节点都在等待系统进步到barrier-a状态,他可能会同时存在barrier-b、barrier-c,甚至更多的状态集, 那么在得到其他barrier的全局一致状态集之前, 如果系统出现failure, 我们就可以通过重启所有计算节点的方式, 让所有节点reload回到barrier-a所记录的状态集, 从而实现把所有节点的状态rollback到上一个全局一致的状态, 使得流系统可以重置到好像根本没有看到过任何barrier-b或者barrier-c之间的信息的一样, 然后重跑这段信息。
小结
Flink和Spark一样都是运用了checkpoint机制来保证了内部的Exactly-once。不同的是,Flink基于Chandy-Lamport算法通过barrier来触发快照时间点,将数据流处理和快照操作解耦开,最大程度降低了快照对系统性能的影响。当然,也不是完全不存在latency,如上述第一点细节中提到的,多个input channel也会存在阻塞的操作,当时相比于Spark的micro-batch已经是极大的减少了系统阻塞的时间。
输出端
这里着重聊一下输出端。 Sink主要有两种手段来配合流系统中间件的精确一次投递, 幂等和2阶段提交(2PC),以Flink的sink端为例:
幂等
幂等性的简单理解,就是指可以执行多次,而不会产生与仅执行一次不同结果的操作,因此 At-least-once 自然等同于 Exactly-once。如此一来,在从快照恢复的时候幂等 sink 便不需要对外部系统撤回已发消息,相当于回避了外部系统的状态回滚问题。比如写入 KV 数据库的 sink,由于插入一行的操作是幂等的,因此 sink 可以无状态的,在错误恢复时也不需要关心外部系统的状态。
然而幂等 sink 的适用场景依赖于外部存储,如果下游的外部存储本来就无法保证幂等性,这时就需要应用事务性 sink。
事务性sink(2PC)
由于 sink 依赖于目标输出系统的事务保证,而分布式系统对于事务的支持并不一定很完整,比如 HBase 就不支持跨行事务,再比如 HDFS 等文件系统是不提供事务的,这种情况下 Flink提供了事务性sink连接器尽最大努力地提供事务保证。
Flink 结合在系统内部抽象出 Exactly-once sink 的通用逻辑TwoPhaseCommitSinkFunction 接口,从命名即可看出这是基于两阶段提交协议。实际上由于 Flink 的流计算特性,当前事务的未 commit 数据是一直在积累的,根据缓存未 commit 数据的地方的不同,可以将TwoPhaseCommitSinkFunction 接口分为两种实现方式。
- 在 sink 端模拟事务的提交,BucketingSink的原子命名保证了提交的原子性。这种方式可以提供 read-committed 的事务隔离级别,但同时由于未 commit 的数据不会发往下游(与 checkpoint 同步),sink 端缓存会带来一定的延迟,相当于退化为与 checkpoint 同步的 micro-batch 模式。
- 适用于下游系统支持事务的场景,在下游外部存储系统缓存未 commit 数据,等 checkpoint 完成后通知下游 commit。这样的好处是数据是流式发往下游的,不会在每次 checkpoint 完成后出现网络 IO 的高峰,并且事务隔离级别可以由下游设置。
小结
无论是幂等,HDFS的原子命名,还是下游系统支持事务,都是需要输出端的配合,如果仅依靠Flink内部的Exactly-once实现,是无法做到exactly-once投递语义的。
总结
分布式系统的演化就是就是不断trade off的过程,在分布式共识问题上都很难出现一个完美的算法,我们能做的只是在针对现实场景做出合适的选择,实现当前场景下的最优解。
参考
端到端一致性,流系统Spark/Flink/Kafka/DataFlow对比总结(压箱宝具呕血之作)https://zhuanlan.zhihu.com/p/77677075
Flink 必知必会经典课程4:Fault-tolerance in Flink https://developer.aliyun.com/article/782826
Spark计算RDD介绍 https://cloud.tencent.com/developer/article/1159602
Kafka的Exactly-once语义与事务机制 https://www.cnblogs.com/luxiaoxun/p/13048474.html