汽车之家基于 Apache Flink 的跨数据库实时物化视图探索

简介: 汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索

本文转载自「之家技术」,作者刘首维。介绍了汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。主要内容为:

  1. 系统分析与问题拆解
  2. 问题解决与系统实现
  3. 实时物化视图实践
  4. 限制与不足
  5. 总结与展望

GitHub 地址
https://github.com/apache/flink
欢迎关注 Flink~

前言

物化视图这一功能想必大家都不陌生,我们可以通过使用物化视图,将预先设定好的复杂 SQL 逻辑,以增量迭代的形式实时 (按照事务地) 更新结果集,从而通过查询结果集来避免每次查询复杂的开销,从而节省时间与计算资源。事实上,很多数据库系统和 OLAP 引擎都不同程度地支持了物化视图。另一方面,Streaming SQL 本身就和物化视图有着很深的联系,那么基于 Apche Flink (下称 Flink) SQL 去做一套实时物化视图系统是一件十分自然而然的事情了。

本文介绍了汽车之家 (下称之家) 在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。希望能给大家带来一些启发,共同探索这一领域。

一、系统分析与问题拆解

Flink 在 Table & SQL 模块做了大量的工作,Flink SQL 已经实现了一套成熟与相对完备的 SQL 系统,同时,我们也在 Flink SQL 上有着比较多的技术和产品积累,直接基于 Flink SQL 本身就已经解决了构建实时物化系统的大部分问题,而唯一一个需要我们解决的问题是如何不重不漏地生成数据源表对应的语义完备的 Changelog DataStream,包括增量和全量历史两部分。

虽然规约到只剩一个问题,但是这个问题解决起来还是比较困难的,那我们将这个问题继续拆解为以下几个子问题:

1.  加载全量数据;
2.  加载增量数据;
3.  增量数据与全量数据整合。

二、问题解决与系统实现

问题一:基于数据传输平台的增量数据读取

增量数据加载还是相对比较好解决的,我们直接复用实时数据传输平台的基础建设。数据传输平台[1] 已经将 Mysql / SqlServer / TiDB 等增量数据以统一的数据格式写入到特定的 Kafka Topic 中,我们只要获取到对应的 Kafka Topic 就可以进行读取即可。

问题二:支持 checkpoint 的全量数据加载

对于全量数据载入,我们先后写了两个版本。

第一版我们用 Legacy Source 写了一套 BulkLoadSourceFunction,这一版的思路比较朴素,就是全量从数据源表进行查询。这个版本确实能完成全量数据的加载,但是问题也是比较明显的。如果在 bulk load 阶段作业发生了重启,我们就不得不重新进行全量数据加载。对于数据量大的表,这个问题带来的后果还是比较严重的。

对于第一版的固有问题,我们一直都没有特别好的对策,直到 Flink-CDC[2] 2.0 的发布。我们参考了 Flink-CDC 的全量数据加载阶段支持 Checkpoint 的思路,基于 FLIP-27 开发了新的 BulkLoadSource。第二版不论在性能上还是可用性上,对比第一版都有了大幅提升。

问题三:基于全局版本的轻量 CDC 数据整合算法

这三个子问题中,问题三的难度是远大于前面两个子问题的。这个问题的朴素思路或许很简单,我们只要按照 Key 缓存全部数据,然后根据增量数据流来触发 Changelog DataStream 更新即可。

事实上我们也曾按照这个思路开发了一版整合逻辑的算子。这版算子对于小表还是比较 work 的,但是对于大表,这种思路固有的 overhead 开始变得不可接受。我们曾用一张数据量在 12 亿,大小约 120G 的 SqlServer 表进行测试,本身就巨大的数据再加上 JVM 上不可避免的膨胀,状态大小变得比较夸张。经过这次测试,我们一致认为这样粗放的策略似乎不适合作为生产版本发布,于是我们不得不开始重新思考数据整合的算法与策略。

在谈论我们的算法设计思路之前,我不得不提到 DBLog[3] 的算法设计, 这个算法的核心思路利用 watermark 对历史数据进行标识,并和对应的增量数据进行合并,达到不使用锁即可完成整个增量数据和历史数据的整合,Flink-CDC 也是基于这个思路进行的实现与改进。在相关资料搜集和分析的过程中,我们发现我们的算法思路与 DBLog 的算法的核心思路非常相似, 但是是基于我们的场景和情况进行了设计与特化。

首先分析我们的情况:

  • 增量数据需要来自于数据传输平台的 Kafka Topic;
  • 增量数据的是 at least once 的;
  • 增量数据是存在全序版本号的。

结合上述情况进行分析,我们来规约一下这个算法必须要达成的目标:

  • 保证数据的 Changelog Stream,数据完整,Event (RowKind) 语义完备
  • 保证该算法的 overhead 是可控的;
  • 保证算法实现的处理性能是足够高效;
  • 保证算法实现不依赖任何来自于 Flink 外部的系统或者功能。

经过大家的分析与讨论后,我们设计出了一套数据整合的算法,命名为 Global Version Based Pause-free Change-Data-Capture Algorithm

3.1 算法原理

我们同时读入 BulkLoadSource 的全量数据与 RealtimeChangelogSource 增量数据,并根据主键进行 KeyBy 与 Connect,而算法的核心逻辑主要由之后的 KeyedCoProcess 阶段完成。下面交待几个关键的字段值:

  • SearchTs:全量数据从数据源查询出来的时间戳;
  • Watermark:基于增量数据在数据库里产生的时间戳生成;
  • Version:全序版本号,全量数据是 0,即一定最小版本。

KeyedCoProcess 收到全量数据后,不会直接发送,而是先缓存起来,等到 Watermark 的值大于该 SearchTs 后发送并清除对应 version0 版本数据的缓存。在等待的期间,如果有对应的 Changlog Data,就将被缓存的 Version0 全量数据丢弃,然后处理 Changelog Data 并发送。在整个数据处理的流程中,全量数据和增量数据都是同时进行消费与处理的,完全不需要引入暂停阶段来进行数据的整合。

image

             增量数据在全量数据发送 watermark 之前到来,只发送增量数据即可,全量数据直接丢弃        

image

             全量数据发送 watermark 到达后,仍未有对应的增量数据,直接发送全量数据

3.2 算法实现

我们决定以 Flink Connector 的形式开展算法的实现,我们以接入 SDK 的名字 Estuary 为该 Connector 命名。通过使用 DataStreamScanProvider,来完成 Source 内部算子间的串联,Source 的算子组织如下图 (chain 到一起的算子已拆开展示)。

image

  • BulkLoadSource / ChangelogSource 主要负责数据的读入和统一格式处理;
  • BulkNormalize / ChangelogNormalize 主要是负责处理数据运行时信息的添加与覆盖,主键语义处理等工作;
  • WatermarkGenerator 是针对算法工作需求定制的 Watermark 生成逻辑的算子;
  • VersionBasedKeyedCoProcess 就是核心的处理合并逻辑和 RowKind 语义完备性的算子。

算法实现的过程中还是有很多需要优化或者进行权衡的点。全量数据进入 CoProcess 数据后,会首先检查当前是否处理过更大版本的数据,如果没有的话才进行处理,数据首先会被存入 State 中并根据 SearchTs + T (T 是我们设置的固有时延) 注册 EventTimeTimer。如果没有高版本的数据到来,定时器触发发送 Version 0 的数据,否则直接抛弃改为发送 RowKind 语义处理好的高版本增量数据。

另一方面,避免状态的无限增长,当系统判定 BulkLoad 阶段结束后,会结束对相关 Flink State 的使用,存在的 State 只要等待 TTL 过期即可。

另外,我们针对在数据同步且下游 Sink 支持 Upsert 能力的场景下,开发了特别优化的超轻量模式,可以以超低的 overhead 完成全量+增量的数据同步

开发完成后,我们的反复测试修改与验证,完成 MVP 版本的开发。

三、实时物化视图实践

MVP 版本发布后,我们与用户同学一起,进行了基于 Flink 的物化视图试点。

1. 基于多数据源复杂逻辑的 Data Pipeline 实时化

下面是用户的一个真实生产需求:有三张表,分别来自于 TiDB /。SqlServer / Mysql,数据行数分别为千万级 / 亿级 / 千万级,计算逻辑相对复杂,涉及到去重,多表 Join。原有通过离线批处理产生 T+1 的结果表。而用户希望尽可能降低该 Pipeline 的延迟。

由于我们使用的 TiCDC Update 数据尚不包含 -U 部分,故 TiDB 表的整合算法还是采取 Legacy Mode 进行加载。

我们与用户沟通,建议他们以批处理的思路去编写 Flink SQL,把结果的明细数据的数据输出到 StarRocks 中。用户也在我们的协助下,较为快速地完成了 SQL 的开发,任务的计算拓补图如下:

image

结果是相当让人惊喜的!我们成功地在保证了数据准确性的情况下,将原来天级延迟的 Pipeline 降低至 10s 左右的延迟。数据也从原来查询 Hive 变为查询 StarRocks,不论从数据接入,数据预计算,还是数据计算与查询,实现了全面的实时化。另一方面,三张表每秒的增量最大不超过 300 条,且该任务不存在更新放大的问题,所以资源使用相当的少。根据监控反馈的信息,初始化阶段完成后,整个任务 TM 部分只需要使用 1 个 Cpu (on YARN),且 Cpu 使用常态不超过 20%。对比原来批处理的资源使用,无疑也是巨大提升。

2. 数据湖场景优化

正如上文提到的,对于数据同步,我们做了专门的优化。只需要使用专用的 Source 表,就可以一键开启历史数据 + 增量数据数据同步,大大简化了数据同步的流程。我们目前尝试使用该功能将数据同步至基于 Iceberg 的数据湖中,从数据同步层面大幅提升数据新鲜度。

image

四、限制与不足

虽然我们在这个方向的探索取得了一定成果,但是仍有一定的限制和不足。

1. 服务器时钟的隐式依赖

仔细阅读上面算法原理,我们会发现,不论是 SearchTs 的生成还是 Watermark 的生成,实际上最后都依赖了服务器系统的时钟,而非依赖类似 Time Oracle 机制。我们虽然算法实现上引入固有延迟去规避这个问题,但是如果服务器出现非常严重时钟不一致,超过固有延迟的话,此时 watermark 是不可靠的,有可能会造成处理逻辑的错误。

经确认,之家服务器时钟会进行校准操作。

2. 一致性与事务

事实上我们目前这套实现没有任何事务相关的保证机制,仅能承诺结果的最终一致性,最终一致性其实是一种相当弱的保证。就拿上文提到的例子来说,如果其中一张表存在 2 个小时的消费延迟,另一张表基本不存在延迟,这个时候两表 Join 产生的结果其实是一种中间状态,或者说对于外部系统应该是不可见的。

为了完成更高的一致性保证,避免上面问题的产生,我们自然会想到引入事务提交机制。然而目前我们暂时没有找到比较好的实现思路,但是可以探讨下我们目前的思考。

2.1 如何定义事务

事务这个概念想必大家或多或少都有认识,在此不多赘述。如何数据库系统内部定义事务是一件特别自然且必要的事情,但是如何在这种跨数据源场景下定义事务,其实是一件非常困难的事情。还是以上文的例子来展开,我们能看到数据源来自各种不同数据库,我们其实对于单表记录了对应的事务信息,但是确实没有办法定义来自不同数据源的统一事务。我们目前的朴素思路是根据数据产生的时间为基准,结合 checkpoint 统一划定 Epoch,实现类似 Epoch-based Commit 的提交机制。但是这样做又回到前面提到的问题,需要对服务器时间产生依赖,无法从根源保证正确性。

2.2 跨表事务

对于 Flink 物化视图一致性提交这个问题,TiFlink[4] 已经做了很多相关工作。但是我们的 Source 来自不同数据源,且读取自 Kafka,所以问题变得更为复杂,还是上面提到的例子,两张表 Join 过后,如果想保证一致性,不只是 Source 和 Sink 算子,整个关系代数算子体系都需要考虑引入事务提交的概念和机制,从而避免中间状态的对外部系统的发布。

3. 更新放大

这个问题其实比较好理解。现在有两张表 join,对于左表的每一行数据,对应右表都有 n (n > 100) 条数据与之对应。那么现在更新左表的任意一行,都会有 2n 的更新放大。

4. 状态大小

目前整套算法在全量同步阶段的 Overhead 虽然可控,但是仍有优化空间。我们目前实测,对于一张数据量在 1 亿左右的表,在全量数据阶段,需要峰值最大为 1.5G 左右的 State。我们打算在下个版本继续优化状态大小,最直接的思路就是 BulkSource 通知 KeyedCoProcess 哪些主键集合是已经处理完毕的,这样可以使对应的 Key 提早进入全量阶段完成模式,从而进一步优化状态大小。

五、总结与展望

本文分析了基于 Flink 物化视图实现的问题与挑战,着重介绍了处理生成完整的 Changelog DataStream 的算法与实现和在业务上的收益,也充分阐述了目前的限制与不足。

虽然这次实践的结果称不上完备,存在一些问题亟待解决,但是我们仍看到了巨大的突破与进步,不论是从技术还是业务使用上。我们充分相信未来这项技术会越来越成熟,越来越被更多人认可和使用,也通过此次探索充分验证了流处理和批处理的统一性。

我们目前的实现还处在早期版本,仍有着工程优化和 bug fix 的空间与工作 (比如前文提到的两表的推进的 skew 太大问题,可以尝试引入 Coordinator 进行调节与对齐),但是相信随着不断的迭代与发展,这项工作会变得越来越稳固,从而支撑更多业务场景,充分提升数据处理的质量与效率!

特别鸣谢张茄子和云邪老师的帮助与勘误。

引用

[1] http://mp.weixin.qq.com/s/KQH-relbrZ2GUqdmaTWx6Q

[2] http://github.com/ververica/flink-cdc-connectors

[3] http://arxiv.org/pdf/2010.12597.pdf

[4] http://zhuanlan.zhihu.com/p/422931694


近期热点

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
442 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
343 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
4月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
597 9
Apache Flink:从实时数据分析到实时AI
|
4月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
539 0
|
3月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1315 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
4月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
500 6
|
4月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
435 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
795 33
The Past, Present and Future of Apache Flink
|
7月前
|
安全 Apache 数据库
【倒计时3天】NineData x Apache Doris x 阿里云联合举办数据库技术Meetup,5月24日深圳见!
5月24日,NineData联合Apache Doris与阿里云在深圳举办数据库技术Meetup。活动聚焦「数据实时分析」与「数据同步迁移」两大领域,邀请行业专家分享技术趋势、产品实践及解决方案,助力企业构建高效安全的数据管理体系。时间:14:00-17:30;地点:深圳新一代产业园2栋20楼会议室。线下名额有限(80人),速报名参与深度交流!
204 1

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多