基于 Apache Flink Table Store 的全增量一体实时入湖

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文简要回顾了数据入湖(仓)的发展阶段,针对在数据库数据入湖中面临的问题,提出了使用 Flink Table Store 作为全增量一体入湖的解决方案,并辅以开源 Demo 的测试结果作为展示。

作者|陈婧敏
本文简要回顾了数据入湖(仓)的发展阶段,针对在数据库数据入湖中面临的问题,提出了使用 Flink Table Store 作为全增量一体入湖的解决方案,并辅以开源 Demo 的测试结果作为展示。文章主要内容包括:

  1. 数据库数据集成入湖(仓)的发展阶段及面临痛点
  2. 基于 Apache Flink Table Store 解决全增量一体入湖
  3. 总结与展望

1. 数据入湖(仓)发展阶段及面临痛点

基于数据库的数据集成过程,简要来说经历了如下几个阶段。

1.1 全量 + 定期增量的数据入仓

1

Fig.1 数据入仓: 一次全量+周期增量

全量数据通过 bulk load 一次性导入,定时调度增量同步任务从数据库同步增量到临时表,再与全量数据进行合并。这种方式虽然能满足一定的业务需求,但是也存在以下问题。

:::
⚠️ 链路复杂,时效性差
:::

全量与增量需要定期的合并以获取最新的数据快照,由于不支持记录级别的更新,用户需要额外的 SQL 任务去计算去重;数据新鲜度依赖于调度,若数据存在晚到,则还需要处理数据漂移情况,一种常见处理方式是在 T +N 时刻(processing time)调度产生 T (event time)的合并任务,同时取 T ~ T + N 个分区(processing time),再从中过滤出业务时间小于等于 T (event time)的数据进行合并,这会导致数据新鲜度进一步降低。

:::
⚠️ 明细查询慢,排查问题难
:::

虽然下游会使用各种交互式分析引擎来加速查询,但基于成本考虑,底表明细数据一般没有这种待遇,这就导致在数据正确性排查时需要直接查询明细,特别是需要查询合并前后全量和增量的明细变化来定位问题。如果业务变更,导致一批订单数据需要订正并要求生成订正之后的各类指标,则需要手工对原始表及其下游依赖表进行级联订正。

1.2 全量 + 实时增量的数据入湖

相对于传统数据仓库,数据湖的出现使得数据在以低成本存储的同时,数据新鲜度有了极大的提升。以 Apache Hudi 为例,它支持先做一次全量 bootstrap 构建基础表,然后基于新接入的 CDC 数据进行实时构建 基于 Hudi 的湖仓一体技术在 Shopee 的实践[1],如 Fig.2 所示。

2

Fig.2 数据入湖: 一次全量+实时增量

由于支持记录级别的更新及删除,在存储侧就可以完成主键的去重,不再需要额外的合并任务。在数据新鲜度方面,由于流式作业会定期的触发 checkpoint 来产生全量与增量合并后的快照,故而数据新鲜度对比第一种方式(以天或小时调度产生合并快照)有了很大提升。

但从另外一方面,我们也发现这种方式有以下这些问题。

:::
🤔 Bootstrap Index 超时及 state 膨胀
:::

以流模式启动 Flink 增量同步作业后,系统会先将全量表导入到 Flink state 来构建 Hoodie key(即主键 + 分区路径)到写入文件的 file group 的索引,此过程会阻塞 checkpoint 完成。而只有在 checkpoint 成功后,写入的数据才可以变为可读状态,故而当全量数据很大时,有可能会出现 checkpoint 一直超时的情况,导致下游读不到数据。另外,由于索引一直保存在 state 内,在增量同步阶段遇到了 insert 类型的记录也会更新索引,需要合理评估 state TTL,配置太小可能会丢失数据,配置过大可能导致 state 膨胀。

:::
🤔 链路依然复杂,难以对齐增量点位,自动化运维成本高
:::

全量 + 实时增量的方式并没有简化链路的复杂度,因为它额外引入了 Kafka 的运维,需要手工对齐增量消费的点位以防止数据丢失 Change Data Capture with Debezium and Apache Hudi[2]。在启动增量 CDC 作业后,用户需要等待和观察作业的运行状态,在第一次 checkpoint 成功后,暂停作业(stop-with-savepoint)修改配置禁用 Bootstrap Index,然后从 savepoint 重启作业( restore-from-savepoint)。整个过程操作复杂,实现自动化运维成本比较高。

此外,我们回顾了一些使用 Hudi 的行业实践,发现用户需要格外注意各项配置来实现不同需求,这对易用性有一定的伤害。比如 基于 Hudi 的湖仓一体技术在 Shopee 的实践[1] 中提到的需要在平台层面监控用户的建表语句,防止在大规模写入场景配置为 COW(Copy on Write) 模式;全增量切换时用户必须格外注意 Kafka 消费点位来保证数据准确性,参数配置极大影响了作业的数据准确性及性能。

2. 基于 Apache Flink Table Store 的全增量一体入湖

随着基于日志的 CDC 逐步取代基于查询的 CDC,特别是 Flink SQL CDC 在 source 端已支持全增量一体同步后,全增量一体入湖(使用一个流作业完成全量同步、并持续监听增量 changelog)也成为一个新的探索方向。这种方式降低了链路复杂度,同时将全增量切换时需要手工对齐 offset 的繁琐托管给了 Flink CDC 和 checkpoint 机制,让框架层面去保障数据的最终一致性。但经过调研我们发现,在使用 Hudi 做这种尝试时遇到了以下挑战。

:::
🤔 全量同步阶段数据乱序严重,写入性能和稳定性难以保障 
:::

在全量同步阶段面临的一个问题是多并发同时读取 chunk 会遇到严重的数据乱序,出现同时写多个分区的情况,大量的随机写入会导致性能回退,出现吞吐毛刺,每个分区对应的 writer 都要维护各自缓存,很容易发生 OOM 导致作业不稳定。虽然 Hudi 支持通过 Rate Limit Apache Hudi DeltaStreamer#Rate Limit[3] 限制每分钟的数据写入来起到一定的平滑效果,但在作业稳定性和性能吞吐之间取得平衡的调优过程对于一般用户来说门槛也较高。

2.1 为什么选择 Flink Table Store

Apache Table Store https://github.com/apache/flink-table-store[4] 作为 2022 年初开源的 Apache Flink 子项目,目标是打造一个支持更新的据湖存储,用于实时流式 Changelog 摄取和高性能查询。

:::
🚀 大吞吐量的更新数据摄取,支持全增量一体入湖,一个 Flink 作业搞定所有
:::

3

Fig.3 Flink Table Store: 全量 + 增量一体化同步

回顾前文,我们知道全增量一体化同步入湖的主要挑战在于全量同步阶段产生了大量数据乱序引起的随机写入,导致性能回退、吞吐毛刺及不稳定。而 Table Store 的存储格式采用先分区(Partition)再分桶(Bucket),每个桶内各自维护一棵 LSM(Log-structured Merge Tree)的方式(参见 Fig.4、Fig.5),每条记录通过主键哈希落入桶内时就确定了写入路径(Directory),以 KV 方式写入 MemTable 中(类似于 HashMap,Key 就是主键,Value 是记录)。在 flush 到磁盘的过程中,以主键排序合并(去重),以追加方式写入磁盘。Sort Merge 在 buffer 内进行,避免了需要点查索引来判断一条记录是 insert 还是 update 来获取写入文件的 file group 的 tagging Apache Hudi Technical Specification#Writer Expectations[5] 。另外,触发 MemTable flush 发生在 buffer 充满时,不需要额外通过 Auto-File Sizing Apache Hudi Write Operations#Writing path[6](Auto-File Sizing 会影响写入速度 Apache Hudi File Sizing#Auto-Size During ingestion[7])来避免小文件产生,整个写入过程都是局部且顺序的 On Disk IO, Part 3: LSM Trees [8],避免了随机 IO 产生。

使用 Table Store 作为湖存储时,只需要一条 INSERT INTO 语句就可以完成全增量同步。以如下 SQL 为例,它展示了使用一个 Flink 流作业将 MySQL 数据库中的订单表通过 Streaming 方式写入 Table Store 表,并持续消费增量数据。

    -- 创建并使用 table store catalog
    CREATE CATALOG `table_store` WITH (
        'type' = 'table-store',
        'warehouse' = 'hdfs://foo/bar'
    );
    
    USE CATALOG `table_store`;
    
    -- 定义 mysql-cdc source 表
    CREATE TEMPORARY TABLE `orders_cdc` (
      order_id BIGINT NOT NULL,
      gmt_modified TIMESTAMP(3) NOT NULL,
      ...
      PRIMARY KEY (`order_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
       ...
    );
    
    -- 定义 table store ods 表,按日期作分区
    CREATE TABLE IF NOT EXISTS `orders` (
      ...
      PRIMARY KEY (`dt`, `order_id`) NOT ENFORCED
    ) PARTITIONED BY (`dt`);
    
    -- streaming 模式下提交作业
    SET 'execution.runtime-mode' = 'streaming';
    
    -- 设置 1min 的 cp interval,对应 1min 的数据新鲜度
    SET 'execution.checkpointing.interval' = '1min';
    
    -- 一条 SQL 同步全量 + 增量,动态分区写入
    INSERT INTO `orders`
    SELECT ..., DATE_FORMAT(gmt_modified, 'yyyy-MM-dd') AS dt
    FROM `orders_cdc`;

Fig.4 展示了以 dt 作为分区 orders 表的存储结构,在用户指定总 bucket 数 N 后,每个分区下会生成相应的 bucket-${n} 目录,每个目录下以列存格式(orc 或 parquet)存放 hash_func(pk) % N == ${n} 的记录文件。

4

 Fig.4 Flink Table Store 表的文件目录

元数据与数据存储在表的同一级目录下,包括 manifest 目录和 snapshot 目录。

  • manifest 目录下中记录了每次经 checkpoint 触发而提交的数据文件变更,包含新增和删除的数据文件
  • snapshot 目录下记录了每次提交产生的 snapshot 文件,内容包括为上一次提交产生的 manifest,加上本次提交产生的 manifest 作为增量

Fig.5 展示了 Fig.4 中每个 bucket 内 LSM 实现过程。我们以 Flink 流作业为例,在每次 checkpoint 时,Flink Table Store 会产生一次提交(commit),包含以下信息

  • 生成当前 table 的一个快照(snapshot)。系统会通过 snapshot pointer file(类似于指针)追踪最早产生和当前最新的 snapshot 文件
  • snapshot 文件中包含了本次 commit 新增了哪些 manifest 文件、删除了哪些 manifest 文件
  • 每个 manifest 文件中记录了产生了哪些 sst 文件、删除了哪些 sst 文件,以及每个 sst 文件所包含记录的主键范围、每个字段的 min/max/null count 等统计信息
  • 每个 sst 文件则包含了按主键排好序的、列存格式的记录。对于 Level 0 的文件,Table Store 会异步地触发 compact 合并线程来消除主键范围重叠带来的读端 merge 开销。

5

Fig.5 Flink Table Store 表的 LSM 实现

在 Flink Table Store 0.2.0 发布时,我们测试了五亿条数据在一亿主键下的实时更新场景写入(包含插入与更新)并与 Apache Hudi MOR 及 COW 进行对比 Apache Flink Table Store 0.2.0 发布[9], Table Store 在大规模实时更新写入场景拥有良好的写入性能。

:::
🚀 高效 Data Skipping 支持过滤,提供高性能的点查和范围查询
:::

虽然没有额外的索引,但是得益于 meta 的管理和列存格式,manifest 中保存了

  • 文件的主键的 min/max 及每个字段的统计信息,这可以在不访问文件的情况下,进行一些 predicate 的过滤
  • orc/parquet 格式中,文件的尾部记录了稀疏索引,每个 chunk 的统计信息和 offset,这可以通过文件的尾部信息,进行一些 predicate 的过滤

数据在有 filter 读取时,可以根据上述信息做如下过滤

1. 读取 manifest:根据文件的 min/max、分区,执行分区和字段的 predicate,淘汰多余的文件

2. 读取文件 footer:根据 chunk 的 min/max,过滤不需要读取的 chunk

3. 读取剩下与文件以及其中的 chunks

以上述订单表 orders 为例,当用户想要查询 dt = 2022-01-01 分区下所有 order\_id 在 100 ~ 200 之间的订单时

    SELECT * FROM orders WHERE dt = '2022-01-01' AND order_id >= 100 AND order_id <= 200;

Flink Table Store 会先根据 LATEST-SNAPSHOT 文件读到最近一次提交的 snapshot 文件(read committed),然后从 snapshot 中读取到对应 manifest meta 文件, 根据分区条件 dt='2022-01-01',过滤出包含这些分区的统计信息,由于统计信息里包含了每个 sst 文件 key 的范围,所以继续执行 order\_id 在 [100, 200] 区间这个过滤条件,就能在 2022-01-01 这个目录下只读取对应的 sst 文件。

我们同样基于上述数据集测试了 Flink Table Store 的查询性能 Apache Flink Table Store 0.2.0 发布[9],在点查和范围查询的场景下,Flink Table Store 表现出众。从实现原理来说,MOR 的查询性能低于 COW、COW 的写入性能低于 MOR 是难以避免的。而在实践层面,在大规模写入场景下建立的 MOR 表也很难一键转换为 COW 来读取,所以在查询写入较多的表(MOR 表)这个前提下,Flink Table Store 的查询表现还是不俗的。

:::
🚀 文件格式支持流读
:::

Flink Table Store 实现了 Incremental Scan,在流模式下,可以持续监听文件更新,数据新鲜度保持在分钟级别,如下所示。

    -- 进入 SQL CLI,创建 catalog 和 table
    CREATE CATALOG table_store WITH (
      'type' = 'table-store',
      'warehouse' = 'file://foo/bar/' --或 'hdfs://foo/bar'
    );
    
    CREATE TABLE IF NOT EXIST my_table (
      f0 INT,
      f1 STRING,
      PRIMARY KEY(f0) NOT ENFORCED
    );
    
    -- 切换到 batch 模式,写入数据
    SET 'execution.runtime-mode' = 'batch';
    INSERT INTO my_table VALUES(1, 'Hello');
    
    -- 新打开一个 SQL CLI 中,切换到 streaming 模式,提交流式查询
    SET 'execution.runtime-mode' = 'streaming';
    SET 'sql-client.execution.result-mode' = 'tableau';
    SELECT * FROM my_table; 
    -- 可以读到结果如下
    +----+-------------+--------------------------------+
    | op |          f0 |                             f1 |
    +----+-------------+--------------------------------+
    | +I |           1 |                          Hello |
    
    -- 在第一个 SQL CLI 中,继续写入数据
    INSERT INTO my_table VALUES(1, 'Bye'), (2, '你好');
    
    -- 可以在第二个 SQL CLI 中,观察到新增输出 (-U, 1, Hello),(+U, 1, Bye)  和 (+I, 2, 你好) 
    +----+-------------+--------------------------------+
    | op |          f0 |                             f1 |
    +----+-------------+--------------------------------+
    | +I |           1 |                          Hello |
    | -U |           1 |                          Hello |
    | +U |           1 |                            Bye |
    | +I |           2 |                           你好 |

2.2 基于 TPC-H 数据集的全增量一体入湖 Demo

前文对 Flink Table Store 解决全增量一体入湖进行了简要分析,下面一个实例演示了如何在本地单机环境下,将近六千万条订单记录作为全量数据从 MySQL 同步到 Flink Table Store,并持续消费增量更新(由 TPC-H RF1 和 RF2 产生),下游接实时聚合及查询的过程。

6

数据源由 TPC-H 自动生成并导入 MySQL ,运行在 Docker 容器内,本地只需要下载 Flink release 包和 Flink Table Store 依赖即可完成。

Demo 使用 lineitem 表中发货日期 l\_shipdate 作为业务字段定义了二级分区 l\_year 和 l\_month,时间跨度从 1992.1 ~ 1998.12,即动态写入 84 个分区。 经测试,在本地单机并发为 2,checkpoint interval 为 1 min 的配置下(每分钟更新可见)46 min 内写入 59.9 million 全量数据,每 10 min 的写入性能如下表所示,平均写入性能在 1.3 million/min。

表格1

详细配置如下所示

表格2

详细步骤可查看 Apache Flink Table Store 全增量一体 CDC 实时入湖

3.  总结与展望

本文简要回顾了数据入湖(仓)的发展阶段,针对在数据库数据入湖中面临的问题,我们提出了使用 Flink Table Store 作为全增量一体入湖的解决方案,并辅以开源 Demo 的测试结果作为展示。

我们期待用户的一线反馈及同行深入交流,欢迎大家扫码加入钉钉群一起探索。

二维码

4. Reference

[1] 基于 Hudi 的湖仓一体技术在 Shopee 的实践

[2] Change Data Capture with Debezium and Apache Hudi

[3] Apache Hudi DeltaStreamer#Rate Limit

[4] https://github.com/apache/flink-table-store

[5] Apache Hudi Technical Specification#Writer Expectations

[6] Apache Hudi Write Operations#Writing path

[7] Apache Hudi File Sizing#Auto-Size During ingestion

[8] On Disk IO, Part 3: LSM Trees

[9] Apache Flink Table Store 0.2.0 发布


Flink Forward Asia 2022

img

时间:11月26日-27日

PC端直播观看:https://flink-forward.org.cn/点击议题,即可查看议题详情以及讲师介绍

移动端建议观看 ApacheFlink 视频号预约观看:
视频号

点击预约直播~

img

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
48 1
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
761 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
82 3
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
4月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
268 2
|
4月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
60 3
|
4月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
53 2
|
4月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
3月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
420 31
Apache Flink 流批融合技术介绍
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
67 1

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多