在使用 Flink SQL 进行实时数据处理的过程中,双流 Join 是非常常见的操作之一。典型的场景包括分析广告效果(曝光流订单流实时关联)、实时推荐(点击流和商品信息)等等。然而,双流 Join 需要在状态中维护两侧全量的历史数据,以确保计算结果的准确性。随着作业的持续运行,双流 Join 会逐渐带来一些问题:
- 运维层面
- 状态过大,开发者需要不断加大作业的资源才能维持较高的吞吐。
- Checkpoint 易超时,导致作业不稳定、持续 Failover。
- 状态是 Flink 内部产物,排查问题时,其内部数据难以探查。
- 开发层面
- Query 迭代修改后,状态难以复用,且重启回追代价高。
为了解决这些问题,Flink 社区在 2.1 引入了新的 Delta Join 算子,并在 2.2 对其进行了进一步的扩展。Delta Join 的核心思想是舍弃算子内本地状态冗余的数据存储,利用双向 Lookup Join 直接查询源表中的数据,而非查询状态中的数据,从而复用源表数据。Delta Join 结合流存储 Apache Fluss,在阿里巴巴淘宝天猫团队成功落地,并且对比双流 Join,拥有如下几个优势:
- 消除了将近 50 TB 的 双流 Join 状态
- 计算 CU 降低 10 倍
- 作业恢复速度提升 87 %
- Checkpoint 秒级完成
Flink Delta Join 介绍请参考:《Delta Join:为超大规模流处理实现计算与历史数据解耦》https://developer.aliyun.com/article/1690558
01双流 Join 实现原理
让我们先简单描述 Flink 双流 Join 的工作原理。
我们以处理左侧表来的 changelog 数据为例,流入的数据主要经过以下三个阶段。
1. 通过 Join Key 查询对侧(即右侧)的状态,获取右侧历史上曾经流入该算子的全量数据。
2. 使用 Join 条件过滤查询得到的数据,并输出。
3. 将输入的本条数据,存入本侧(即左侧)的状态中,以供后续右侧的数据来临时,能正确的匹配数据。
之所以要把所有的数据用状态记录下来,是因为流计算是没有边界的,左侧数据和右侧数据匹配的时间点会存在时间差,即使一侧的数据延迟到达,也需要保证可以关联上另一侧的数据,最终输出。
双流 Join 的算法确保了数据的正确性,但是其状态会随着时间的推移而无限制增大,成为影响作业资源消耗和稳定性的关键因素。虽然目前已有 Interval Join[1]、Lookup Join[2]、State TTL Hint[3] 等手段来缓解或解决该问题,但是均面向了特定的业务场景,牺牲了一定的功能(如 Lookup Join 舍弃了维表侧数据的更新追踪,State TTL Hint 放弃匹配超过 TTL 期限的数据)。
02Delta Join 技术原理
从双流 Join 的原理上,我们可以观察到,状态里记录的全量数据,与源表中的数据基本相同,那么一个直观的想法是,可以复用源表的数据来取代原有的状态。Delta Join 正是基于这个思路,它利用了外部存储系统提供的索引能力,并不从状态中查找数据,而是直接对外部存储发出高效的、基于索引的数据查询,以获取匹配的记录。通过这种方式,Delta Join 消除了双流 Join 状态与外部系统之间冗余的数据存储。
理论推导
我们以两路输入为例,增量更新 Join 结果的公式为:
其中,A 代表了左表的全量历史数据, 代表了左表中的增量数据。B和 的定义与此类似。每当我们需要计算 Join 结果的增量部分时,我们只需要获取源表中从上次计算到当前时间之间新生成的数据,并查询对侧源表中的历史快照数据。因此我们需要:
1. 感知源表的增量数据
2. 访问源表历史快照数据
这对源表的物理存储引擎提出了很高的要求,存储引擎需要支持快照隔离,以确保强一致性语义。然而,目前存在以下几个问题:
1. 目前只有有限的存储支持了快照的概念,例如 Paimon、Iceberg、Hudi 等等
2. 快照生成的时间间隔为分钟级别,无法满足实时处理的要求
3. 当指定快照查询数据时,快照可能会在存储系统中过期
考虑到上述这些问题,Flink 2.1 提出了一种满足实时性要求的、最终一致性的 Delta Join 方案。
最终一致性语义的 Delta Join
最终一致性语义的 Delta Join 并不要求源表的存储引擎支持快照。它总是去查询源表当前最新的数据。其对应的变种公式如下:
和强一致性 Delta Join 相比,最终一致性 Delta Join 多出了一部分额外的中间结果 \Delta A \Join \Delta B,因此,这种方法只能确保最终的结果是一致的。
以下是双流 Join 和两种语义的 Delta Join 的对比。
双流 Join |
强一致性 Delta Join |
最终一致性 Delta Join |
|
延迟 |
低 |
高 |
低 |
状态大小 |
大 |
小 |
小 |
状态内数据详情 |
两侧输入全量明细数据 |
上一次触发计算的源表快照id |
等待触发计算的异步队列 |
数据一致性 |
强一致性 |
强一致性 |
最终一致性 |
03Delta Join 算子实现
为了提高算子的吞吐,在 Delta Join 算子中,分别引入了一个 TableAsyncExecutionController 组件和两个双侧完全相同的 DeltaJoinRunner 组件。
TableAsyncExecutionController 原理
该组件由 FLIP-519 Introduce async lookup key ordered mode[4] 引入,其严格限制相同 key 之间的数据必须串型执行,而允许不同 key 之间的数据并行处理,同时结合异步处理机制,大大提高了算子的吞吐能力。
该组件的运行原理如下:
TableAsyncExecutionController 在接收到数据后,按照 key 放入 BlockingBuffer 内不同 key 的队列里,然后通过 KeyAccountingUnit 检查该 key 是否被抢占、有对应的数据正在执行。如果 key 被抢占,直接返回;如果 key 未被抢占,则抢占该 key ,同时 poll 队列数据,放入 ActiveBuffer,交给后续计算逻辑处理,同时注册回调函数,在数据处理结束、输出后,在 KeyAccountingUnit 内释放该 key,去 BlockingBuffer 内拿下一条数据。
这套机制保证了相同 key 之间的数据是串行执行的,以避免出现分布式乱序问题。该机制在某种程度上是 FLIP-425 Asynchronous Execution Model[5] 的简化版本,感兴趣的可以另行研究。
在实际场景下,Delta Join 算子的吞吐会受到 BlockingBuffer 能允许的最大容量(各个 key 的队列大小之和)影响,当 BlockingBuffer 最大容量过小时,即使收到的每个 key 都不一样,也会由于无法充分利用异步并行的能力而导致吞吐较小。此时,可以适当调整下面的参数,来增大 BlockingBuffer 的最大容量。但如果设置的过大,BlockingBuffer 会占用比较高的内存,同时也可能会给外部存储带来较大的查询压力。
// 默认 100table.exec.async-lookup.buffer-capacity: 1000
我们可以通过监测 Delta Join 算子内以下几个 metric,来判断是否需要调整该参数。
aec_blocking_size:当前 BlockingBuffer 内被阻塞的所有 key 的队列大小之和。
该值越大,代表 join key 较为密集,考虑开启或增大 delta join cache;该值越小,但吞吐不佳的情况下,考虑增大 table.exec.async-lookup.buffer-capacity 的值。
aec_inflight_size:当前 ActiveBuffer 内正在执行计算的数据数量。
该值越大,代表当前同时请求外部存储集群的数据较多,存在请求堆积的情况,需要进一步查看外部存储系统是否存在异常,或查看是否有相关参数可以提高查询效率;该值越小,代表 join key 较为密集,考虑开启或增大 delta join cache。
注:当 Fluss 流存储的表作为 Delta Join 的源表时,你可以通过 Flink Table Hint[6],在 Fluss 表上配置以下这些关键参数,来提高查询效率。
client.lookup.queue-sizeclient.lookup.max-batch-sizeclient.lookup.max-inflight-requestsclient.lookup.batch-timeout
具体请参考 Fluss Connector Options[7]
04DeltaJoinRunner 原理
DeltaJoinRunner 是负责执行 Lookup 的组件。由于 Delta Join 算子会处理两侧的数据,因此对于不同侧的数据,各有一个完全相同的 DeltaJoinRunner 负责 Lookup 对应表的数据。
想象一下,如果我们对每条数据都要去外部存储进行查询,对外部吞吐的压力会非常大,算子的吞吐性能完全取决于请求外部系统的吞吐。但如果用普通的 cache 来对 Lookup 的数据进行缓存,Lookup 目标表的数据更新消息将无法订阅。为此,我们引入了驱动侧仅构建、Lookup 侧仅更新的特殊 cache。
DeltaJoinRunner 组件的运行原理如下(图例是用于左侧输入流查询右侧源表的 DeltaJoinRunner),分别由 LocalCache 和 LookupFetcher 组成。
当左侧数据到达时,先去 LocalCache 查询是否有 cache。当有 cache 时,直接输出;当没有 cache 时,借助 LookupFetcher 通过右表的 index 查询右表的数据,然后将查询回来的数据在 LocalCache 中构建 cache,最后输出。
同时,右表的数据到达时,将会查看此 DeltaJoinRunner 中的 LocalCache 是否有 cache。如果没有cache,忽略更新;如果有 cache,更新 cache。
该 cache 机制一方面确保了在 join key 较为密集的场景,算子的吞吐能够得到巨大的提升,同时对外部存储也不会构成很大的查询压力;另一方面,确保了对侧最新的数据能够更新 cache,从而在后续的流程中能被正确地匹配上。
该 cache 是一个 LRU 的 cache,合理的设置该 cache 的大小是非常必要的。过小的 cache 大小将导致 cache 的命中率受到影响,过大的 cache 会占用较多的内存。我们可以通过下面的参数来分别调节左右两侧 cache 的大小,甚至是在每条数据 join key 都不相同、cache 基本无用时关闭 cache。
// 是否启用cache,默认为 true table.exec.delta-join.cache-enabled: true // 设置用于缓存左表数据的cache大小,默认为 10000 // 推荐在左表较小、或右流 join key 较为密集时,设置较大值 table.exec.delta-join.left.cache-size: 10000 // 设置用于缓存右表数据的cache大小,默认为 10000 // 推荐在右表较小、或左流 join key 较为密集时,设置较大值 table.exec.delta-join.right.cache-size: 10000
我们可以通过监测 Delta Join 算子上的 metric,来判断是否需要适当增加 cache 的大小。
deltaJoin_leftCache_hitRate: 在右流查询左表的场景下,缓存左表数据的 cache 的命中率百分比。该值越高越好。deltaJoin_rightCache_hitRate:在左流查询右表的场景下,缓存右表数据的 cache 的命中率百分比。该值越高越好。
注:该图来自于“实战”章节 Nexmark q20 变种 query。右表 Auction 表每次都产生不同的id,故而 deltaJoin_leftCache_hitRate 的命中率始终为 0。
05实战
我们借用 nexmark 数据集[8] 中 q20 的 query,略微修改后,作为本次实战的样例代码。
-- 获取包含相应拍卖信息的出价表 INSERT INTO nexmark_q20 SELECT auction, bidder, price, channel, url, B.`dateTime`, B.extra, itemName, description, initialBid, reserve, A.`dateTime`, expires, seller, category, A.extra FROM bid AS B INNER JOIN auction AS A on B.auction = A.id; -- WHERE A.category = 10;
方式一:使用 Docker 环境测试
1. 环境准备
(1)类 Unix 操作系统,如 Linux、Mac OS X
(2)内存建议至少 4 GB,磁盘建议至少 4 GB
2. 下载 Docker 镜像
在命令行中,运行如下命令安装 Docker 测试镜像。
docker pull xuyangzzz/delta_join_example:1.0
运行如下命令运行该测试镜像,进入测试 docker container 的命令行。
docker run -p 8081:8081 -p 9123:9123 --name delta_join_example -it xuyangzzz/delta_join_example:1.0 bash
3. 运行任务 SQL
# 运行 flink 和 fluss 集群 ./start-flink-fluss.sh # 创建相关表和 delta join 作业 ./create-tables-and-run-delta-join.sh
此时,在宿主机 localhost:8081(或其他绑定的端口)即可查看 Flink UI 界面,可以看到此时 Delta Join 作业正在运行。
4. 插入数据到源表
在测试 docker container 中执行下面的命令,为源表插入数据。
# 在源表插入数据./insert-data.sh
5. 观察 Delta Join 作业
在宿主机 localhost:8081(或其他绑定的端口)的 flink-ui 界面,就可以看到 Delta Join 作业在正常消费数据了。
方式二:手工搭建环境测试
1. 环境准备
(1)运行环境
a. 类 Unix 操作系统,如 Linux、Mac OS X
b. 内存建议至少 4 GB,磁盘建议至少 4 GB
c. Java 11 及以上版本,且将环境变量 JAVA_HOME设置为 Java 的安装目录
(2)准备 Apache Flink 计算引擎
a. 下载
在 Apache Flink 官方下载网站[9] 下载最新的 Flink 2.2.0 版本,并解压。
b. 修改相关配置
修改 ./conf/config.yaml 文件,将 TaskManager numberOfTaskSlots 设置成 4 (默认为1)
(3)准备 Apache Fluss 流存储引擎
在 Apache Fluss 官方下载网站[9] 分别下载 Fluss 0.8 版本(并解压)和适配 Apahce Flink 2.1 的连接器。
(4)准备 Nexmark 源数据生成器
下载 Nexmark 项目[10] master 分支,在该项目根目录下,用 maven-3.8.6 版本执行以下的 maven 命令
mvn clean install -DskipTests=true
在"./nexmark-flink/target/" 文件夹下,将会生成 nexmark-flink-0.3-SNAPSHOT.jar 文件
2. 服务启动
(1)启动 Flink
将 Fluss 适配 Flink 2.1 的连接器,以及 Nexmark 项目生成的 nexmark-flink-0.3-SNAPSHOT.jar 文件,放入 Flink 目录的 ./lib 目录下。
参考 Flink 本地模式安装文档[11],在 Flink 目录中,执行下面的语句,启动本地 Standalone 集群。
## 请确保在 Flink 目录下执行该语句./bin/start-cluster.sh
检查 http://localhost:8081/#/overview 界面是否可正常访问。
(2)启动 Fluss
参考 Fluss 部署 Local Cluster 文档[12],在 Fluss 目录下,执行下面的语句,启动本地集群。
## 请确保在 Fluss 目录下执行该语句./bin/local-cluster.sh start
3. 运行任务 SQL
(1)创建 Fluss 表
将下面的 SQL 代码保存为“prepare_table.sql”文件,其中定义了 2 张源表和 1 张结果表。
CREATE CATALOG fluss_catalog WITH ( 'type'='fluss' ,'bootstrap.servers'='localhost:9123' ); USE CATALOG fluss_catalog; CREATE DATABASE IF NOT EXISTS my_db; USE my_db; -- 创建左侧源表 CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.bid ( auction BIGINT ,bidder BIGINT ,price BIGINT ,channel VARCHAR ,url VARCHAR ,`dateTime` TIMESTAMP(3) ,extra VARCHAR ,PRIMARY KEY (auction, bidder) NOT ENFORCED ) WITH ( -- fluss prefix lookup key,可用于 index 'bucket.key'='auction' -- Flink 2.2 中,delta join 仅支持消费不带 delete 操作的 cdc 源表 ,'table.delete.behavior'='IGNORE' ); -- 创建右侧源表 CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.auction ( id BIGINT ,itemName VARCHAR ,description VARCHAR ,initialBid BIGINT ,reserve BIGINT ,`dateTime` TIMESTAMP(3) ,expires TIMESTAMP(3) ,seller BIGINT ,category BIGINT ,extra VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH ( -- Flink 2.2 中,delta join 仅支持消费不带 delete 操作的 cdc 源表 'table.delete.behavior'='IGNORE' ); -- 创建 delta join 写入的结果表 CREATE TABLE IF NOT EXISTS fluss_catalog.my_db.delta_join_sink ( auction BIGINT ,bidder BIGINT ,price BIGINT ,channel VARCHAR ,url VARCHAR ,bid_dateTime TIMESTAMP(3) ,bid_extra VARCHAR ,itemName VARCHAR ,description VARCHAR ,initialBid BIGINT ,reserve BIGINT ,auction_dateTime TIMESTAMP(3) ,expires TIMESTAMP(3) ,seller BIGINT ,category BIGINT ,auction_extra VARCHAR ,PRIMARY KEY (auction, bidder) NOT ENFORCED );
在 Flink 目录下,执行下面的语句,创建持久化的表。
## 请确保在 Flink 目录下执行该语句 ## 注意:请将 ${your_path} 替换为 prepare_table.sql 实际所在的目录 ./bin/sql-client.sh -f ${your_path}/prepare_table.sql
(2)启动 Delta Join 作业
将下面的 SQL 代码保存为“run_delta_join.sql”文件,其中包含了可转化为 delta join 的 q20 变体查询。
CREATE CATALOG fluss_catalog WITH ( 'type'='fluss' ,'bootstrap.servers'='localhost:9123' ); USE CATALOG fluss_catalog; USE my_db; INSERT INTO delta_join_sink SELECT auction ,bidder ,price ,channel ,url ,B.`dateTime` ,B.extra ,itemName ,description ,initialBid ,reserve ,A.`dateTime` ,expires ,seller ,category ,A.extra FROM bid AS B INNER JOIN auction AS A ON B.auction = A.id;
在 Flink 目录下,执行下面的语句,启动 delta join 作业。
## 请确保在 Flink 目录下执行该语句 ## 注意:请将 ${your_path} 替换为 run_delta_join.sql 实际所在的目录 ./bin/sql-client.sh -f ${your_path}/run_delta_join.sql
在 Flink UI 上,我们可以看到 Delta Join 作业正常跑起来了。
4. 插入数据到源表
将下面的 SQL 代码保存为“insert_data.sql”文件,其中包含了向两张源表灌入 Nexmark 数据源产生模拟数据的作业。
CREATE CATALOG fluss_catalog WITH ( 'type' = 'fluss' ,'bootstrap.servers' = 'localhost:9123' ); USE CATALOG fluss_catalog; USE my_db; -- nexmark 模拟数据源 CREATE TEMPORARY TABLE datagen ( event_type int ,person ROW< id BIGINT ,name VARCHAR ,emailAddress VARCHAR ,creditCard VARCHAR ,city VARCHAR ,state VARCHAR ,`dateTime` TIMESTAMP(3) ,extra VARCHAR > ,auction ROW< id BIGINT ,itemName VARCHAR ,description VARCHAR ,initialBid BIGINT ,reserve BIGINT ,`dateTime` TIMESTAMP(3) ,expires TIMESTAMP(3) ,seller BIGINT ,category BIGINT ,extra VARCHAR > ,bid ROW< auction BIGINT ,bidder BIGINT ,price BIGINT ,channel VARCHAR ,url VARCHAR ,`dateTime` TIMESTAMP(3) ,extra VARCHAR > ,`dateTime` AS CASE WHEN event_type = 0 THEN person.`dateTime` WHEN event_type = 1 THEN auction.`dateTime` ELSE bid.`dateTime` END ,WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND ) WITH ( 'connector' = 'nexmark' -- 下面两个参数为每秒数据生成速度 ,'first-event.rate' = '1000' ,'next-event.rate' = '1000' -- 生成的数据总条数,过大可能导致 OOM ,'events.num' = '100000' -- 下面三个参数为 Bid/Auction/Persion 三个数据的生成占比 ,'person.proportion' = '2' ,'auction.proportion' = '24' ,'bid.proportion' = '24' ) ; CREATE TEMPORARY VIEW auction_view AS SELECT auction.id ,auction.itemName ,auction.description ,auction.initialBid ,auction.reserve ,`dateTime` ,auction.expires ,auction.seller ,auction.category ,auction.extra FROM datagen WHERE event_type = 1 ; CREATE TEMPORARY VIEW bid_view AS SELECT bid.auction ,bid.bidder ,bid.price ,bid.channel ,bid.url ,`dateTime` ,bid.extra FROM datagen WHERE event_type = 2 ; INSERT INTO bid SELECT * FROM bid_view ; INSERT INTO auction SELECT * FROM auction_view ;
在 Flink 目录下,执行下面的语句,启动两个将 nexmark 模拟数据写入源表的作业。
## 请确保在 Flink 目录下执行该语句 ## 注意:请将 ${your_path} 替换为 insert_data.sql 实际所在的目录 ./bin/sql-client.sh -f ${your_path}/insert_data.sql
5. 观察 Delta Join 作业
重新点击 Flink UI 上的 Delta Join 作业,可以看到 Delta Join 作业正常在消费数据了。
06现状和未来工作
目前 Delta Join 仍然在持续演进中,Flink 2.2 已经支持了一些常用的 SQL pattern,具体可以参考文档[13]。
在未来,我们将会持续推进以下几个方向:
1. 持续完善最终一致性 Delta Join
(1)支持 Left / Right Join
(2)支持消费 Delete
(3)支持级联 Delta Join
2. 结合 Paimon/Iceberg/Hudi 等支持快照的存储,支持分钟级的强一致性 Delta Join
参考文档
1. Apache Flink 社区 Delta Join 用户文档 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tuning/
2. Apache Flink 社区 Delta Join FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin?src=contextnavpagetreemode
3. Apache Fluss (Incubating) 社区 Delta Join 用户文档 https://fluss.apache.org/docs/engine-flink/delta-joins/
参考链接:
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
[3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#state-ttl-hints
[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-519:++Introduce+async+lookup+key+ordered+mode
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model
[6] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options
[7] https://fluss.apache.org/docs/engine-flink/options/#lookup-options
[8] https://github.com/nexmark/nexmark/
[9] https://flink.apache.org/downloads/
[10] https://github.com/nexmark/nexmark/tree/master
[12] https://fluss.apache.org/docs/install-deploy/deploying-local-cluster/