本文整理自阿里云高级开发工程师,Flink Committer 罗宇侠老师在 Flink Forward Asia 2024上海站分论坛流批一体(二)中的分享,内容主要分为以下四个部分:
一、湖流割裂的现状和挑战
二、Fluss 湖流一体架构
三、湖流一体架构的收益
四、未来规划
一、湖流割裂的现状和挑战
从 Lambda 架构到数据湖统一存储架构
在大数据处理领域,Lambda 架构是使用非常广泛的一种架构。Lambda 架构将数据处理分成单独的两条链路,一条是离线计算链路,通常由 Hive 作为离线计算链路的存储,另外一条是实时链路,通常由流存储,如 Kafka 作为实时链路的存储。
随着技术的演进, Apache Paimon,Apache Iceberg ,Apache Hudi 等湖存储在支持大数据量的批式计算的基础上,还可以提供分钟级别的数据新鲜度,Lamda 架构中的两套不同的存储逐渐被统一的数据湖存储替代了。数据湖统一存储极大地简化整个架构,并且可以同时满足离线和近实时(分钟级别数据新鲜度)的需求,逐渐开始变得流行。
尽管数据湖统一存储架构非常简洁和高效,但是最多只能提供分钟级别的数据新鲜度。虽然部分场景对数据新鲜度要求不高,但是数据新鲜度的重要性依然不容忽视。在对数据新鲜度具有强诉求(秒级延迟)的场景下,如实时用户圈选,异常检测,广告归因等等,不可避免地将在整个架构中又引入可以提供秒级数据新鲜度的流存储,如 Kafka。
湖流割裂面临的挑战
当引入 Kafka 后,整个架构又回到了 Lambda 架构了,依然是两套存储,只不过传统的 Lambda 架构的 Hive 换成了湖存储而已。
湖存储和流存储割裂地混在 Lambda 架构当中,湖流割裂在架构层面和数据层面都面临着不小的挑战:
架构
架构复杂:两套存储,两套代码,两条链路,数据开发周期长
运维监控繁琐:每套存储都需要单独的故障排除,监控,升级等
资源浪费:同样的逻辑计算两遍,需要更多的资源
数据
数据一致性:流存储链路和湖存储链路很难保证结果一致
数据治理:元数据管理,数据血缘,数据质量等在两套存储中治理起来难度很大
数据冗余:相同的数据重复存储在两套存储中
湖流一体的业界趋势
引入流存储本身并不是一个问题,天下没有免费的午餐,要秒级的数据新鲜度势必要引入流存储。核心问题在于不能让流存储和湖存储互相割裂,理想的架构应该是流和湖互为一体,互为补充,湖提供高效的历史数据处理能力,而流存储提供秒级数据新鲜度和Serving 能力。
业界知名的流存储厂商也在湖流一体方向上做了不少工作,Kafka 的商业化公司 Confluent 提出 TableFlow,Kafka 中的数据将被 TableFlow 转成 Iceberg 格式,分析引擎可以直接在 Iceberg 格式的数据上进行高效查询;RedPanda 公司也提出了 Iceberg Topic,如果一个 Topic 是 Iceberg Topic,数据也将同时转成 Iceberg 格式,分析引擎也可以查 Iceberg 格式的表。
可以看到,流和湖之间正在逐步靠拢,各补所长,可以预见,未来流和湖的结合只会越来越紧密。不过他们提出的湖与流的结合还有很多改进空间,比如数据依然存在冗余存储的问题,读依然是两套 API 等。并且它们其实是自己在已有的流存储的基础上与湖进行结合,很难做到完美融合,比如流存储在数据分布上其实就很难和湖存储对齐,湖存储有分区表的概念,但是Kafka 就没有。当然,还会有各种各样的其他的问题,简而言之,它们并不是湖原生的。
二、Fluss 湖流一体架构
湖流一体是未来数据湖和流存储发展的一个大趋势,Fluss 作为一款面向实时分析设计的流存储,从一开始就采用了湖流一体的架构设计,可以更好的融入用户已有的 Lakehouse 架构。关于 Fluss 更多细节介绍,可以查看 FFA2024 主论坛上的 Fluss 分享《Fluss:面向面向实时分析设计的流存储》。Fluss 的核心特性包括实时的流读流写、列式裁剪、流式的更新、CDC订阅、实时点查、还有湖流一体。本文将着重介绍湖流一体的设计和收益。Fluss 湖流一体的架构如下图所示:
Fluss 的 server 集群提供了数据的实时写入和读取,提供了毫秒级的端到端延迟。同时,Fluss 的 Compaction Service 会将 Fluss 中的数据 compact 成标准的湖格式,如 Paimon/Iceberg 等,外部的查询引擎就可以直接在湖格式上的数据进行分析。
另外,最新的数据在 Fluss 中,历史数据在 Paimon 中,Flink 可以支持 Union Read,将Fluss 和 Paimon 中的数据 Union 起来实现秒级新鲜度的分析。
统一元数据
之前流存储 Kafka 和湖存储 Paimon 割裂的存在,其都有一套元数据,对计算引擎(如 Flink)来说是两套单独的 Catalog,两张单独的表,用户需要创建两个Catalog,访问数据的时候需要手动切换 Catalog 来确定是访问流存储还是湖存储的数据,十分繁琐。
在 Fluss 构建湖流一体架构下,虽然 Fluss 和 Paimon 也都有单独的元数据,但是对计算引擎(如 Flink)暴露的是一个 Catalog,一张统一的表。用户不需要切换 Catalog 也可以直接访问湖存储的数据,以及直接访问流存储 Fluss中的数据,以及Union 访问 Fluss 和湖存储中的数据。
数据分布的对齐
Fluss 和湖存储 Paimon 中的数据分布是严格对齐的,Fluss 也有分区表,也有 bucket,并且 Fluss 的 bucketing 算法与 Paimon 的 bucketing 算法是一致的,确保了一条数据被一致地分配到同样的 bucket,即Fluss 的 bucket 和 Paimon 的 bucket是一一对应的。
这种数据分布的强一致性有两个重要的好处:
- Compact 的时候避免 Shuffle 开销
在将 Fluss 中的数据 Compact 成 Paimon 格式的时候,可以直接将Fluss 的某个 bucket,如 bucket1 的数据文件直接 compact 到 Paimon 的 bucket1,而不需要将 Fluss 的 bucket1 的数据读出来,判断每条数据属于 Paimon 中的哪个bucket,然后写到对应 bucket 中,避免了 Shuffle 的开销。
- 避免数据的不一致
bucketing 算法是指对一条数据计算其所属的 bucket,Fluss 对齐了 Paimon 使用了一致的 bucketing 算法,即 bucket_id = hash(row) % bucket_num
,并采用一样的 hash 算法。如果 Fluss 和 Paimon 采用不一样的 bucketing 算法的话,就会出现数据不一致现象。比如对于主键表来说,对于一条数据 a,可能 Fluss 将其分配到 bucket1,而 Paimon 将其分配到 bucket2,如果 Compaction Service 将这条数据同步到 Paimon 中的 bucket1 的话,用户在 Paimon 中就查不到这条数据了。而通过保证数据分布的强一致性,则不存在这个问题。
流读:更高效的数据回追
历史数据在湖存储中,实时数据在 Fluss 中,在流读场景下,Fluss 先读湖格式的历史数据进行数据回追,然后再读 Fluss 的实时数据。借助湖存储高效的过滤条件下推、列裁剪、高压缩率等优势,可以实现高效的数据回追。
批读:秒级数据新鲜度
历史数据在湖存储中,实时数据在 Fluss 中,在批读场景下,计算引擎(如 Flink)可以将 Fluss 中的数据和湖存储中的进行union读,以达到极致的秒级数据新鲜度的分析。
Flink + Fluss
Fluss 对 Flink用户暴露了统一的 API支持用户选择 union 读还是只读数据湖上的数据,通过如下的 SQL:
SELECT * FROM orders
代表读取 orders 表的完整数据,则 Flink 将 union 读 Fluss和数据湖上的数据。
如果用户只需要读取数据湖上的数据,可以在要读的表后面加上 $lake
后缀,SQL 如下所示
-- analytical queries
SELECT COUNT(*), MAX(t), SUM(amount)
FROM orders$lake
-- query system tables
SELECT * FROM orders$lake$snapshots
对于只读数据湖上的数据的场景,Fluss 继承了湖格式作为一个 Flink source 的全部优化和能力,如 runtime filter,系统表查询,time travel 等。
三、湖流一体架构的收益
接下来,以湖存储 Paimon 为例,阐述一下在 Paimon 的基础上,引入 Fluss 来构建湖流一体架构的收益。
数据湖的时效提升至秒级
对于 Paimon 来说,数据的可见性取决于 Flink checkpoint 的时间间隔,通常是分钟级别的,但是通过 Fluss + Paimon 构建湖流一体架构后,数据的可见性不再取决于 Flink checkpoint 的时间间隔,数据进入 Fluss 后就可见,数据的时效性提升至秒级。
数仓分层每层表数据新鲜度一致,不受层级影响
在数仓的建设过程中,为了更好地管理数据,通常会进行分层,如 ODS层,DWD层,ADS层等,原始数据会在数仓多层进行流转。如果只通过 Paimon 来作为每一层的存储,由于 Paimon 只有在Flink checkpoint 后数据才可见,其对应的 changelog 才会流转到下一层,那么每一层的数据新鲜度都会增加一个 checkpoint 的时间。如果 Flink checkpoint 的时间设置为 3 分钟的话,那么 ODS层,DWD层,ADS层的数据延迟将分别为3分钟,6分钟,9分钟。
而如果基于 Fluss + Paimon 作为每一层的存储,则数据新鲜度不受层级的影响,每一层的数据新鲜度都是秒级。一条数据到达 ODS 层之后,其 changelog 会立刻流转到下一层,而不用等 Flink checkpoint 的完成,以此类推,每一层的数据新鲜度都可以保证一致。如果 Fluss 的 Compaction Service 设置3分钟的 compact 周期,那么对于Paimon中的数据,每一层的数据延迟都是3分钟。
更高效更高吞吐的 changelog 生成
目前Paimon 通用的 changelog 生成方式主要有两种(Input producer 对数据源的要求较高,暂不考虑), Lookup change producer 和 Full compaction producer。
Lookup change producer 的生成方式时效性好,但是需要更多的资源。Full Compaction Producer不需要多余的资源消耗,会在 Full Compaction 的时候生成 changelog,但是时效性差,因为需要等 Full Compaction 的触发,通常是若干个 checkpoint。
而在 Fluss + Paimon 的架构下,changelog 的生成则可以兼顾时效性和性能。对于 Fluss 来说,changelog 的生成是秒级的,同时 Fluss compaction service 则可以将 Fluss 的 changelog 直接写成 Paimon 的 changelog 格式,转换成 Paimon changelog 这个过程是很高效的,因为并不涉及到 lookup 等开销,只是一次直读直写。
解决 Paimon 部分更新不支持多 writer 的问题
Paimon 的部分更新是使用非常多的一个功能,特别适用于大宽表的场景。但是在 Paimon 中,如果要对一个宽表进行部分更新,则需要将所有对这个表的部分更新都放到一个 SQL作业里面,然后又要通过一个 UNION 语句将所有对这个表部分更新的 SQL 语句 union 到一起,保证只有一个 writer 来写这个宽表。这也导致了作业不好管理和单独调优。
而在 Fluss + Paimon 的架构下,则没有这个问题了。因为所有的更新会先走 Fluss,由Fluss再将更新同步到 Paimon,而 Fluss 的部分更新可以支持多作业同时并发更新。所以在新架构下,你可以有任意多个 SQL 作业来对这个宽表进行任意多列的部分更新,可以进行 per-job 级别的调优的管理。
总结一下,通过 Fluss 来构建湖流一体架构将带来如下收益:
湖存储强实时化,迈向秒级数据新鲜度
统一湖流,write once,run batch & stream
降低维护,降低重复数据存储的成本,降低重复加工链路的成本
四、未来规划
目前,Fluss 社区在湖流一体方向上的规划主要有以下三点:
Union Read 生态:目前 Union Read 的能力只对接了 Flink, 未来将对接更多的查询引擎,如 Spark/StarRocks等。
湖生态:目前 Fluss 只支持 Paimon 作为湖存储,未来将支持更多的湖格式,如 Iceberg/Hudi等。
Arrow -> Paruqet 的高效转换:Fluss 使用 Arrow 作为存储格式,湖格式使用 Parquet 作为存储格式,而 Arrow 到 Parquet 的转换在 Arrow 社区有非常成熟和高效的解决方案,未来将支持 Arrow 到 Parquet 的高效转换,降低 compaction service 成本。
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc