大数据技术之 Flink-CDC3

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 大数据技术之 Flink-CDC3

3.3 Flink CDC 2.0 设计 ( 以 MySQL 为例)

通过上面的分析,可以知道 2.0 的设计方案,核心要解决上述的三个问题,即支持无锁、水平扩展、checkpoint。



在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:

1.Chunk 切分;

2.Chunk 分配;(实现并行读取数据&CheckPoint)

3.Chunk 读取;(实现无锁读取)

4.Chunk 汇报;

5.Chunk 分配。


DBlog 这篇论文里描述的无锁算法如下图所示:

左边是 Chunk 的切分算法描述,Chunk 的切分算法其实和很多数据库的分库分表原理类似,通过表的主键对表中的数据进行分片。假设每个 Chunk 的步长为 10,按照这个规则进行切分,只需要把这些 Chunk 的区间做成左开右闭或者左闭右开的区间,保证衔接后的区间能够等于表的主键区间即可。


右边是每个 Chunk 的无锁读算法描述,该算法的核心思想是在划分了 Chunk 后,对于每个 Chunk 的全量读取和增量读取,在不用锁的条件下完成一致性的合并。Chunk 的切分如下图所示:



因为每个 chunk 只负责自己主键范围内的数据,不难推导,只要能够保证每个 Chunk 读取的一致性,就能保证整张表读取的一致性,这便是无锁算法的基本原理。


根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。


Netflix 的 DBLog 论文中 Chunk 读取算法是通过在 DB 维护一张信号表,再通过信号表在 binlog 文件中打点,记录每个 chunk 读取前的 Low Position (低位点) 和读取结束之后 High Position (高位点) ,在低位点和高位点之间去查询该 Chunk 的全量数据。在读取出这一部分 Chunk 的数据之后,再将这 2 个位点之间的 binlog 增量数据合并到 chunk 所属的全量数据,从而得到高位点时刻,该 chunk 对应的全量数据。


Flink CDC 结合自身的情况,在 Chunk 读取算法上做了去信号表的改进,不需要额外维护信号表,通过直接读取 binlog 位点替代在 binlog 中做标记的功能,整体的 chunk 读算法描述如下图所示:


比如正在读取 Chunk-1,Chunk 的区间是 [K1, K10],首先直接将该区间内的数据 select 出来并把它存在 buffer 中,在 select 之前记录 binlog 的一个位点 (低位点),select 完成后记录 binlog 的一个位点 (高位点)。然后开始增量部分,消费从低位点到高位点的 binlog。


图中的 - ( k2,100 ) + ( k2,108 ) 记录表示这条数据的值从 100 更新到 108;


第二条记录是删除 k3;


第三条记录是更新 k2 为 119;


第四条记录是 k5 的数据由原来的 77 变更为 100。


观察图片中右下角最终的输出,会发现在消费该 chunk 的 binlog 时,出现的 key 是k2、k3、k5,我们前往 buffer 将这些 key 做标记。


对于 k1、k4、k6、k7 来说,在高位点读取完毕之后,这些记录没有变化过,所以这些数据是可以直接输出的;


对于改变过的数据,则需要将增量的数据合并到全量的数据中,只保留合并后的最终数据。例如,k2 最终的结果是 119 ,那么只需要输出 +(k2,119),而不需要中间发生过改变的数据。


通过这种方式,Chunk 最终的输出就是在高位点是 chunk 中最新的数据。


读取可以分为 5 个阶段

1)SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;

2)SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;

3)查询完成之后记录当前的 Binlog 位置信息记为高位点;

4)在增量部分消费从低位点到高位点的 Binlog;

5)根据主键,对 buffer 中的数据进行修正并输出。

通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,

但是目前只是做到了保证单个 Chunk 中的数据一致性。


上图描述的是单个 Chunk 的一致性读,但是如果有多个表分了很多不同的 Chunk,且这些 Chunk 分发到了不同的 task 中,那么如何分发 Chunk 并保证全局一致性读呢?这个其实就是通过下面的汇总来实现的


通过下图可以看到有 SourceEnumerator 的组件,这个组件主要用于 Chunk 的划分,划分好的 Chunk 会提供给下游的 SourceReader 去读取,通过把 chunk 分发给不同的 SourceReader 便实现了并发读取 Snapshot Chunk 的过程,同时基于 FLIP-27 我们能较为方便地做到 chunk 粒度的 checkpoint。



将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据,实现了并行读取的目标。同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。



当 Snapshot Chunk 读取完成之后,需要有一个汇报的流程,如下图中橘色的汇报信息,将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。汇报的主要目的是为了后续分发 binlog chunk (如下图)。因为 Flink CDC 支持全量 + 增量同步,所以当所有 Snapshot Chunk 读取完成之后,还需要消费增量的 binlog,这是通过下发一个 binlog chunk 给任意一个 Source Reader 进行单并发读取实现的。



整体流程可以概括为,首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader,每个 Snapshot Chunk 读取时通过算法实现无锁条件下的一致性读,SourceReader 读取时支持 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 读取完成后,下发一个 binlog chunk 进行增量部分的 binlog 读取,这便是 Flink CDC 2.0 的整体流程,如下图所示:



Flink CDC 是一个完全开源的项目,项目所有设计和源码目前都已贡献到开源社区,Flink CDC 2.0 也已经正式发布,此次的核心改进和提升包括:


提供 MySQL CDC 2.0,核心feature 包括


并发读取,全量数据的读取性能可以水平扩展;


全程无锁,不对线上业务产生锁的风险;


断点续传,支持全量阶段的 checkpoint。


搭建文档网站,提供多版本文档支持,文档支持关键词搜索


笔者用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:


MySQL CDC 2.0 用时 13 分钟;


MySQL CDC 1.4 用时 89 分钟;


读取性能提升 6.8 倍。


3.4 核心原理分析

3.4.1 Binlog Chunk 中开始读取位置源码

MySqlHybridSplitAssigner类的createBinlogSplit方法

private MySqlBinlogSplit createBinlogSplit() {
    final List<MySqlSnapshotSplit> assignedSnapshotSplit =
            snapshotSplitAssigner.getAssignedSplits().values().stream()
                    .sorted(Comparator.comparing(MySqlSplit::splitId))
                    .collect(Collectors.toList());
    Map<String, BinlogOffset> splitFinishedOffsets =
            snapshotSplitAssigner.getSplitFinishedOffsets();
    final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
    final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
    BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;
    for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
        // find the min binlog offset
        BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
        if (binlogOffset.compareTo(minBinlogOffset) < 0) {
            minBinlogOffset = binlogOffset;
        }
        finishedSnapshotSplitInfos.add(
                new FinishedSnapshotSplitInfo(
                        split.getTableId(),
                        split.splitId(),
                        split.getSplitStart(),
                        split.getSplitEnd(),
                        binlogOffset));
        tableSchemas.putAll(split.getTableSchemas());
    }
    final MySqlSnapshotSplit lastSnapshotSplit =
            assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
    return new MySqlBinlogSplit(
            BINLOG_SPLIT_ID,
            lastSnapshotSplit.getSplitKeyType(),
            minBinlogOffset,
            BinlogOffset.NO_STOPPING_OFFSET,
            finishedSnapshotSplitInfos,
            tableSchemas);
}

3.4.2 读取低位点到高位点之间的 Binlog

BinlogSplitReader类的shouldEmit方法

    /**
     * Returns the record should emit or not.
     *
     * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event
     that
     * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
     * since the offset is after its high watermark.
     *
     * <pre> E.g: the data input is :
     * snapshot-split-0 info : [0, 1024) highWatermark0
     * snapshot-split-1 info : [1024, 2048) highWatermark1
     * the data output is:
     * only the binlog event belong to [0, 1024) and offset is after highWatermark0
     should send,
     * only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should
     send.
     * </pre>
     */
    private boolean shouldEmit(SourceRecord sourceRecord) {
        if (isDataChangeRecord(sourceRecord)) {
            TableId tableId = getTableId(sourceRecord);
            BinlogOffset position = getBinlogPosition(sourceRecord);
            // aligned, all snapshot splits of the table has reached max highWatermark
            if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
                return true;
            }
            Object[] key =
                    getSplitKey(
                            currentBinlogSplit.getSplitKeyType(),
                            sourceRecord,
                            statefulTaskContext.getSchemaNameAdjuster());
            for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
                if (RecordUtils.splitKeyRangeContains(
                        key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                        && position.isAtOrBefore(splitInfo.getHighWatermark())) {
                    return true;
                }
            }
            // not in the monitored splits scope, do not emit
            return false;
        }
        // always send the schema change event and signal event
        // we need record them to state of Flink
        return true;
     }
    }

3.5 未来规划

关于 CDC 项目的未来规划,我们希望围绕稳定性,进阶 feature 和生态集成三个方面展开。


稳定性


通过社区的方式吸引更多的开发者,公司的开源力量提升 Flink CDC 的成熟度;


支持 Lazy Assigning。Lazy Assigning 的思路是将 chunk 先划分一批,而不是一次性进行全部划分。当前 Source Reader 对数据读取进行分片是一次性全部划分好所有 chunk,例如有 1 万个 chunk,可以先划分 1 千个 chunk,而不是一次性全部划分,在 SourceReader 读取完 1 千 chunk 后再继续划分,节约划分 chunk 的时间。


进阶 Feature


支持 Schema Evolution。这个场景是:当同步数据库的过程中,突然在表中添加了一个字段,并且希望后续同步下游系统的时候能够自动加入这个字段;


支持 Watermark Pushdown 通过 CDC 的 binlog 获取到一些心跳信息,这些心跳的信息可以作为一个 Watermark,通过这个心跳信息可以知道到这个流当前消费的一些进度;


支持 META 数据,分库分表的场景下,有可能需要元数据知道这条数据来源哪个库哪个表,在下游系统入湖入仓可以有更多的灵活操作;


整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。


生态集成


集成更多上游数据库,如 Oracle,MS SqlServer。Cloudera 目前正在积极贡献 oracle-cdc connector;


在入湖层面,Hudi 和 Iceberg 写入上有一定的优化空间,例如在高 QPS 入湖的时候,数据分布有比较大的性能影响,这一点可以通过与生态打通和集成继续优化。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
108 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
26天前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
88 2
|
28天前
|
存储 分布式计算 NoSQL
【赵渝强老师】大数据技术的理论基础
本文介绍了大数据平台的核心思想,包括Google的三篇重要论文:Google文件系统(GFS)、MapReduce分布式计算模型和BigTable大表。这些论文奠定了大数据生态圈的技术基础,进而发展出了Hadoop、Spark和Flink等生态系统。文章详细解释了GFS的架构、MapReduce的计算过程以及BigTable的思想和HBase的实现。
zdl
|
24天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
142 56
|
7天前
|
SQL 运维 大数据
轻量级的大数据处理技术
现代大数据应用架构中,数据中心作为核心,连接数据源与应用,承担着数据处理与服务的重要角色。然而,随着数据量的激增,数据中心面临运维复杂、体系封闭及应用间耦合性高等挑战。为缓解这些问题,一种轻量级的解决方案——esProc SPL应运而生。esProc SPL通过集成性、开放性、高性能、数据路由和敏捷性等特性,有效解决了现有架构的不足,实现了灵活高效的数据处理,特别适用于应用端的前置计算,降低了整体成本和复杂度。
|
15天前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
32 4
|
22天前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
22天前
|
SQL 存储 大数据
单机顶集群的大数据技术来了
大数据时代,分布式数仓如MPP成为热门技术,但其高昂的成本让人望而却步。对于多数任务,数据量并未达到PB级,单体数据库即可胜任。然而,由于SQL语法的局限性和计算任务的复杂性,分布式解决方案显得更为必要。esProc SPL作为一种开源轻量级计算引擎,通过高效的算法和存储机制,实现了单机性能超越集群的效果,为低成本、高效能的数据处理提供了新选择。
|
28天前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
1月前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
55 3