Kafka/Spark/Flink Exactly-once语义调研

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Kafka/Spark/Flink Exactly-once语义调研

前言

Exactly-once投递语义是分布式系统中最常遇到的话题,Exactly-once保证了哪怕系统发生故障,每一个 ETL 操作也仅会被执行一次,不会产生数据的丢失或者重复。这是数据投递的最强保证,很难实现这个等级的流处理数据投递。接下来是对于一些业界主流的Exactly-once投递语义的调研。本文主要关注的是方案的实现和技术点,重点在于方案的failover机制和高可用,对于分布式系统可能会出现的僵尸线程等问题不予深究。

本文的内容将会从数据源端、消费端、输出端进行分析,其中数据源端和输出端对于流处理引擎本身来说都是外部存储,流处理引擎不能仅依靠自身实现Exactly-once投递方案,还需要数据源端和输出端的协同配合。

数据源端

为了便于理解,数据传输的基本单元我们统称为event,event是一个广义的概念,可以是mysql中的一个事务,也可以是自定义的一批数据。要配合工具实现Exactly-once需要满足两个条件:支持重算、记录event的offset

支持重算(必需)

任何节点出现fail重启,由于计算结果还没有完成,我们都需要上游节点重算,否则event丢失。

记录event的offset(非必需)

要求记录event处理的进度,并保证存储结果不出现重复,能够在fail over时不重发offset之前的消息。

消费端为了配合数据源端的Exactly-once语义会产生两种策略:

  1. 假设数据源无法主动维护自身event的offset,需要下游的消费端内部维护event offset,并且需要保证记录event offset和计算结果高可用是一个原子操作,一条数据对应一个offset,既保证该条数据计算结果存储下来后就不会重算了。
  2. 存储结果是一个幂等操作,这样就可以先存储计算结果,然后再更新event offset。即使在存储计算结果的过程中节点fail重启,重算上游数据也不会出现问题。

消费端

消费端主要探讨一下Kafka/Spark/Flink三种流计算工具的Exactly-once实现

Kafka的内部Exactly-once

Kafka数据流中的每个partition的数据传递都能够保证Exactly-once,producer保证不重复,consumer幂等,结果高可用,这就是为什么Kafka Streams API提供的Exactly-once保证是迄今为止任何流处理系统中的最强实现的原因。

consumer幂等

Kafka Stream的计算节点的上游信息都来自分布式partition中commit之后的数据, 在queue里的数据都有确定的sequenceId, 所以只要计算节点记录好自己当前处理的sequenceId, 处理完一个信息就更新自己的sequenceId, 并且commit到可靠dataStore里, 就绝对不会重复处理上游event, 而只要没有commit这个位置则可以无数次replay当前的record(当前节点只会处理sequenceId+1的消息)。

结果高可用

就是保证自身节点state状态的更新,其实是写一个内部隐藏的state的change log的topic,和一个本地key value表(也就是本计算节点的state)。failover的时候, 之前的"本地"表丢失没关系, 可以从change log里恢复出失败前确定commit的所有state;

producer保证不重复

Kafka内部有一套完整的Transactional Messaging机制来保证事务的原子性和隔离性,会启用一个TransactionCoordinator负责管理broker和producer之间的事务消息,主要包括管理一个epochId保证事务消息的原子性,以及协助实现2pc两阶段提交。

计算结束后,commit本次的tx,由Kafka Transactional Messaging来保证(1)2pc两阶段提交往下游发的消息。(2)记录event stream的消费进度。(3)所有的state的所有更新是一个原子操作, 由于结果都成功写入Kafka topic,所以达到计算结果的高可用性

小结

实际上,上文内容只是从producer和consumer的角度保证了Kafka stream的Exactly-once语义,Kafka内部Transactional Messaging实现还会涉及到zombie fencing等复杂的场景,这里主要还是用到其原子性。那么简单小结一下Kafka的Exactly-once语义实现,就是保证了系统内部每个节点的端到端Exactly-once投递,sequenceId保证了下游去重,change log保证了高可用,2pc和offset保证不重发。

Spark的内部Exactly-once

Spark的基本数据单元是一种被称作是RDD(分布式弹性数据集)的数据结构,Spark内部程序通过对RDD的进行一系列的transform和action操作,完成数据的分析处理。那么Spark内部的Exactly-once就是通过checkpoint机制保证RDD的容错恢复,如果中间节点出现故障,恢复之后,只需要接着上次 checkpoint 的记录做恢复即可,对于失败前那个未完成的事务执行回滚操作(abort)就可以了。

Spark的checkpoint机制主要包含两种策略:血缘机制和micro-batch checkpoint

血缘机制

首先说一下RDD的血缘机制。当我们计算一个RDD时,会依赖一个或多个父RDD的数据,而这些父RDD又会依赖它自身的父RDD,这样RDD之间的依赖关系就形成了一个有向无环图(也叫DAG图),这些依赖关系被记录在一个图中,这就是RDD的血缘(也叫RDD Lineage)。基于这个DAG图,在fail over时就能够根据上游的数据重算出RDD中丢失的数据。

micro-batch checkpoint

但是每个RDD的process都会在关系图上增加一个新的节点,这些数据都记录下来的话,关系图会出现爆炸式的增长。因此,需要micro-batch和checkpoint机制来减少维护关系图带来的负担。通过异步的checkpoint来截断lineage也就是各个节点状态和计算结果复杂的关系。比如一个数据如果已经checkpoint了, 那么它所依赖的所有状态和计算结果都可以在关系图里删去, 因为replay如果依赖于这个数据, 那么使用它的checkpoint即可, 而不需要知道这个数据是怎么算出来的, 如果还没checkpoint成功, 则需要根据数据依赖图来重算这个数据。 像这样利用checkpoint, 就可以防止lineage无限增长。在 Spark Streaming 中,JobGenerator 用于生成每个 batch 对应的 jobs,定时器一到,就会启动这个job去重算关系图中的数据,然后将得到的RDD数据持久化到外部的可靠存储中,例如HDFS。

如图所示,每一层数据中的micro-batch第一条信息都要等待最后一条信息处理完成之后,才能传给下游。并且这个等待是会叠加的,当stream的层数不断的增加,每一层的micro-batch都需要等待最后一条数据,这样的造成的latency会叠加式的增高,如同蝴蝶效应一般不断的发展,最终会造成Spark很高的端到端处理的latency。为了解决这个问题,Flink给出了不需要使用micro-batch的方案,可以不记录所有中间的计算结果。

小结

checkpoint机制只是能够保证系统内部的Exactly-once投递,不对系统外的投递语义做出保证,因此需要在投递时做出策略的调整,具体看第三部分。

Flink的内部Exactly-once

Checkpoint机制

Flink的Checkpoint机制是基于Chandy-Lamport算法的思想改进而来,引入了Checkpoint Barrier的概念,可以在不停止整个流处理系统的前提下,让每个节点独立建立检查点保存自身快照,并最终达到整个作业全局快照的状态。有了全局快照,当我们遇到故障或者重启的时候就可以直接从快照中恢复,这就是Flink容错的核心。

如图所示,在流处理的过程中,例如在持续处理A1,A2,A3三条数据过程中,这个时候系统崩溃了,那么我们只要回到系统没有见过A1之前的状态就可以了,重启恢复之后再重放A1,A2,A3,那么这些消息就能保证Exactly-once投递了。关键的问题就在于我们如何保存没有见过A1之前的状态,我们看一下Flink如何使用checkpoint机制来实现Exactly-once投递。

Flink内部维护了一个高可用的coordinator,不断地在数据源发出的数据流中,插入不同的stage barrier,比如先给所有的数据源发barrier-a,然后1分钟后发barrier-b,如此类推。所有的节点都必须忠实的转发这些stage barrier,为了对节点的不同状态进行划分:

  1. 每个节点都分为接收到某barrier(设为barrier-a)之前的信息和收到barrier-a之后的信息,
  2. 所有的发给下游的计算结果也分为自己发出barrier-a之前的信息和发出barrier-a之后的信息;
  3. 所有的状态变迁也分为,用所有接收到barrier-a之前的信息建立的状态, 和收到barrier-a之后被新的信息影响了的状态;

全局一致

理解了checkpoint机制的运行机制后,我们来看一下如何进行failover的。首先需要明确两个概念,全局一致点和全局一致状态集。全局一致点是相对于计算节点的,全局一致状态集是相对于整个系统的。如上图所示,每个分割点都表示该计算节点达到了全局一致点。对于barrier-a而言,随着event stream的不断推进,达到了barrier-a的全局一致点也在不断的增加,当作业的所有节点都确认了之后,就得到了一个全局一致状态集。

两个细节:

  1. 如图,一个节点可能会存在多个input channel,每个input channel都会携带一个barrier-a的消息。收到任意input channel 的barrier-a之后,block此channel。当前阶段的所有input channel都接受到barrier-a的消息后,把当前状态checkpoint。 并且处理完所有此前收到的信息并向下游发送计算结果完毕后, 向所有和自己相连的下游转发barrier-a。
  2. 当每个阶段的所有节点都备份完成,我们就得到了一个全局一致状态集,,既整个系统的snapshot。系统的稳定点就进步到了barrier-a。但是对于整个流计算过程的所有节点来说,不可能所有节点都在等待系统进步到barrier-a状态,他可能会同时存在barrier-b、barrier-c,甚至更多的状态集, 那么在得到其他barrier的全局一致状态集之前, 如果系统出现failure, 我们就可以通过重启所有计算节点的方式, 让所有节点reload回到barrier-a所记录的状态集, 从而实现把所有节点的状态rollback到上一个全局一致的状态, 使得流系统可以重置到好像根本没有看到过任何barrier-b或者barrier-c之间的信息的一样, 然后重跑这段信息。

小结

Flink和Spark一样都是运用了checkpoint机制来保证了内部的Exactly-once。不同的是,Flink基于Chandy-Lamport算法通过barrier来触发快照时间点,将数据流处理和快照操作解耦开,最大程度降低了快照对系统性能的影响。当然,也不是完全不存在latency,如上述第一点细节中提到的,多个input channel也会存在阻塞的操作,当时相比于Spark的micro-batch已经是极大的减少了系统阻塞的时间。

输出端

这里着重聊一下输出端。 Sink主要有两种手段来配合流系统中间件的精确一次投递, 幂等和2阶段提交(2PC),以Flink的sink端为例:

幂等

幂等性的简单理解,就是指可以执行多次,而不会产生与仅执行一次不同结果的操作,因此 At-least-once 自然等同于 Exactly-once。如此一来,在从快照恢复的时候幂等 sink 便不需要对外部系统撤回已发消息,相当于回避了外部系统的状态回滚问题。比如写入 KV 数据库的 sink,由于插入一行的操作是幂等的,因此 sink 可以无状态的,在错误恢复时也不需要关心外部系统的状态。

然而幂等 sink 的适用场景依赖于外部存储,如果下游的外部存储本来就无法保证幂等性,这时就需要应用事务性 sink。

事务性sink(2PC)

由于 sink 依赖于目标输出系统的事务保证,而分布式系统对于事务的支持并不一定很完整,比如 HBase 就不支持跨行事务,再比如 HDFS 等文件系统是不提供事务的,这种情况下 Flink提供了事务性sink连接器尽最大努力地提供事务保证。

Flink 结合在系统内部抽象出 Exactly-once sink 的通用逻辑TwoPhaseCommitSinkFunction 接口,从命名即可看出这是基于两阶段提交协议。实际上由于 Flink 的流计算特性,当前事务的未 commit 数据是一直在积累的,根据缓存未 commit 数据的地方的不同,可以将TwoPhaseCommitSinkFunction 接口分为两种实现方式。

  • 在 sink 端模拟事务的提交,BucketingSink的原子命名保证了提交的原子性。这种方式可以提供 read-committed 的事务隔离级别,但同时由于未 commit 的数据不会发往下游(与 checkpoint 同步),sink 端缓存会带来一定的延迟,相当于退化为与 checkpoint 同步的 micro-batch 模式。
  • 适用于下游系统支持事务的场景,在下游外部存储系统缓存未 commit 数据,等 checkpoint 完成后通知下游 commit。这样的好处是数据是流式发往下游的,不会在每次 checkpoint 完成后出现网络 IO 的高峰,并且事务隔离级别可以由下游设置。

小结

无论是幂等,HDFS的原子命名,还是下游系统支持事务,都是需要输出端的配合,如果仅依靠Flink内部的Exactly-once实现,是无法做到exactly-once投递语义的。

总结

分布式系统的演化就是就是不断trade off的过程,在分布式共识问题上都很难出现一个完美的算法,我们能做的只是在针对现实场景做出合适的选择,实现当前场景下的最优解。

参考

端到端一致性,流系统Spark/Flink/Kafka/DataFlow对比总结(压箱宝具呕血之作)https://zhuanlan.zhihu.com/p/77677075

Flink 必知必会经典课程4:Fault-tolerance in Flink https://developer.aliyun.com/article/782826

Spark计算RDD介绍 https://cloud.tencent.com/developer/article/1159602

Kafka的Exactly-once语义与事务机制  https://www.cnblogs.com/luxiaoxun/p/13048474.html

目录
相关文章
|
2月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
233 1
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
109 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
175 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
44 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
98 0
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
2月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
57 4
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
178 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
45 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
58 0