Flink mysql-cdc connector 源码解析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.

在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.


Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。


下面就来深入 Flink 的源码分析一下 CDC 的实现原理


首先 mysql-cdc 作为 Flink SQL 的一个 connector,那就肯定会对应一个 TableFactory 类,我们就从这个工厂类入手分析一下源码的实现过程,先找到源码里面的 MySQLTableSourceFactory 这个类,然后来看一下它的 UML 类图.



从上图中可以看到 MySQLTableSourceFactory 只实现了 DynamicTableSourceFactory 这个接口,并没有实现 DynamicTableSinkFactory 的接口,所以 mysql-cdc 是只支持作为 source 不支持作为 sink 的,如果想要写入 mysql 的话,可以使用JDBC 的 connector.

然后直接来看 MySQLTableSourceFactory#createDynamicTableSource 方法实现,源码如下所示:


@Override
public DynamicTableSource createDynamicTableSource(Context context) {
    final FactoryUtil.TableFactoryHelper helper =
            FactoryUtil.createTableFactoryHelper(this, context);
    helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
    final ReadableConfig config = helper.getOptions();
    String hostname = config.get(HOSTNAME);
    String username = config.get(USERNAME);
    String password = config.get(PASSWORD);
    String databaseName = config.get(DATABASE_NAME);
    String tableName = config.get(TABLE_NAME);
    int port = config.get(PORT);
    Integer serverId = config.getOptional(SERVER_ID).orElse(null);
    ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
    StartupOptions startupOptions = getStartupOptions(config);
    TableSchema physicalSchema =
            TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
    return new MySQLTableSource(
            physicalSchema,
            port,
            hostname,
            databaseName,
            tableName,
            username,
            password,
            serverTimeZone,
            getDebeziumProperties(context.getCatalogTable().getOptions()),
            serverId,
            startupOptions);
}


这个方法的主要作用就构造 MySQLTableSource 对象,先会从 DDL 中获取 hostname,username,password 等数据库和表的信息,然后去构建 MySQLTableSource 对象.


接着来看一下 MySQLTableSource#getScanRuntimeProvider 这个方法,它会返回一个用于读取数据的运行实例对象


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
    TypeInformation<RowData> typeInfo =
            scanContext.createTypeInformation(physicalSchema.toRowDataType());
    DebeziumDeserializationSchema<RowData> deserializer =
            new RowDataDebeziumDeserializeSchema(
                    rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
    MySQLSource.Builder<RowData> builder =
            MySQLSource.<RowData>builder()
                    .hostname(hostname)
                    .port(port)
                    .databaseList(database)
                    .tableList(database + "." + tableName)
                    .username(username)
                    .password(password)
                    .serverTimeZone(serverTimeZone.toString())
                    .debeziumProperties(dbzProperties)
                    .startupOptions(startupOptions)
                    .deserializer(deserializer);
    Optional.ofNullable(serverId).ifPresent(builder::serverId);
    DebeziumSourceFunction<RowData> sourceFunction = builder.build();
    return SourceFunctionProvider.of(sourceFunction, false);
}


这个方法里面先获取了 rowType 和 typeInfo 信息,然后构建了一个 DebeziumDeserializationSchema 反序列对象,这个对象的作用是把读取到的 SourceRecord 数据类型转换成 Flink 认识的 RowData 类型.接着来看一下 deserialize 方法.


public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
    // 获取 op 类型
    Operation op = Envelope.operationFor(record);
    // 获取数据
    Struct value = (Struct)record.value();
    // 获取 schema 信息
    Schema valueSchema = record.valueSchema();
    GenericRowData delete;
    // 根据 op 的不同类型走不同的操作
    if (op != Operation.CREATE && op != Operation.READ) {
        // delete
        if (op == Operation.DELETE) {
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            out.collect(delete);
        } else {
            // update
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.UPDATE_BEFORE);
            delete.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(delete);
            GenericRowData after = this.extractAfterRow(value, valueSchema);
            this.validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(after);
        }
    } else {
        // insert
        delete = this.extractAfterRow(value, valueSchema);
        this.validator.validate(delete, RowKind.INSERT);
        delete.setRowKind(RowKind.INSERT);
        out.collect(delete);
    }
}


这里主要会判断进来的数据类型,然后根据不同的类型走不同的操作逻辑,如果是 update 操作的话,会输出两条数据.最终都是会转换成 RowData 类型输出.


接着往下面看是 builder.build() 该方法构造了 DebeziumSourceFunction 对象,也就是说 Flink 的底层是采用 Debezium 来读取 mysql,PostgreSQL 数据库的变更日志的.为什么没有用 canal 感兴趣的同学自己可以对比一下这两个框架


然后来看一下 DebeziumSourceFunction 的 UML 类图



可以发现 DebeziumSourceF unction 不仅继承了 RichSourceFunction 这个抽象类,而且还实现了 checkpoint 相关的接口,所以 mysql-cdc 是支持 Exactly Once 语义的.这几个接口大家都非常熟悉了,这里不再过多介绍.

直接来看一下核心方法 open 和 run 方法的逻辑如下


public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build();
        this.executor = Executors.newSingleThreadExecutor(threadFactory);
    }
public void run(SourceContext<T> sourceContext) throws Exception {
    this.properties.setProperty("name", "engine");
    this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
    if (this.restoredOffsetState != null) {
        this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
    }
    this.properties.setProperty("key.converter.schemas.enable", "false");
    this.properties.setProperty("value.converter.schemas.enable", "false");
    this.properties.setProperty("include.schema.changes", "false");
    this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L));
    this.properties.setProperty("tombstones.on.delete", "false");
    this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
    if (this.engineInstanceName == null) {
        this.engineInstanceName = UUID.randomUUID().toString();
        FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
    }
    this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
    String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
    this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
    this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
        if (!success && error != null) {
            this.reportError(error);
        }
    }).build();
    if (this.running) {
        this.executor.execute(this.engine);
        this.debeziumStarted = true;
        MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
        metricGroup.gauge("currentFetchEventTimeLag", () -> {
            return this.debeziumConsumer.getFetchDelay();
        });
        metricGroup.gauge("currentEmitEventTimeLag", () -> {
            return this.debeziumConsumer.getEmitDelay();
        });
        metricGroup.gauge("sourceIdleTime", () -> {
            return this.debeziumConsumer.getIdleTime();
        });
        try {
            while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                if (this.error != null) {
                    this.running = false;
                    this.shutdownEngine();
                    ExceptionUtils.rethrow(this.error);
                }
            }
        } catch (InterruptedException var5) {
            Thread.currentThread().interrupt();
        }
    }
}


open 方法里面主要是创建了一个单线程化的线程池(它只会用唯一的工作线程来执行任务).所以 mysql-cdc 源是单线程读取的.


run 方法里先是设置了一大堆 debenium 的属性,比如 include.schema.changes 默认是 false ,也就是说不会捕获表结构的变更,所以如果有新增字段的话,目前 Flink 任务是不能动态感知 schema 变化的.主要关心的是 insert update delete 操作.


最核心的地方是构建 DebeziumEngine 对象,然后通过上面的线程池来执行 engine,后面还有一些 metric 相关的逻辑,最后是一个循环的判断任务的状态,如果程序有异常或者被打断就抛出异常中断线程关闭 DebeziumEngine 对象.

相关文章
|
2月前
|
SQL 关系型数据库 MySQL
深入解析MySQL的EXPLAIN:指标详解与索引优化
MySQL 中的 `EXPLAIN` 语句用于分析和优化 SQL 查询,帮助你了解查询优化器的执行计划。本文详细介绍了 `EXPLAIN` 输出的各项指标,如 `id`、`select_type`、`table`、`type`、`key` 等,并提供了如何利用这些指标优化索引结构和 SQL 语句的具体方法。通过实战案例,展示了如何通过创建合适索引和调整查询语句来提升查询性能。
243 9
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
160 3
|
4月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
320 0
|
2月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
103 16
|
2月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
148 16
|
2月前
|
存储 关系型数据库 MySQL
double ,FLOAT还是double(m,n)--深入解析MySQL数据库中双精度浮点数的使用
本文探讨了在MySQL中使用`float`和`double`时指定精度和刻度的影响。对于`float`,指定精度会影响存储大小:0-23位使用4字节单精度存储,24-53位使用8字节双精度存储。而对于`double`,指定精度和刻度对存储空间没有影响,但可以限制数值的输入范围,提高数据的规范性和业务意义。从性能角度看,`float`和`double`的区别不大,但在存储空间和数据输入方面,指定精度和刻度有助于优化和约束。
242 5
|
3月前
|
监控 关系型数据库 MySQL
MySQL自增ID耗尽应对策略:技术解决方案全解析
在数据库管理中,MySQL的自增ID(AUTO_INCREMENT)属性为表中的每一行提供了一个唯一的标识符。然而,当自增ID达到其最大值时,如何处理这一情况成为了数据库管理员和开发者必须面对的问题。本文将探讨MySQL自增ID耗尽的原因、影响以及有效的应对策略。
213 3
|
3月前
|
存储 关系型数据库 MySQL
MySQL 字段类型深度解析:VARCHAR(50) 与 VARCHAR(500) 的差异
在MySQL数据库中,`VARCHAR`类型是一种非常灵活的字符串存储类型,它允许存储可变长度的字符串。然而,`VARCHAR(50)`和`VARCHAR(500)`之间的差异不仅仅是长度的不同,它们在存储效率、性能和使用场景上也有所不同。本文将深入探讨这两种字段类型的区别及其对数据库设计的影响。
127 2
|
3月前
|
存储 关系型数据库 MySQL
PHP与MySQL动态网站开发深度解析####
本文作为技术性文章,深入探讨了PHP与MySQL结合在动态网站开发中的应用实践,从环境搭建到具体案例实现,旨在为开发者提供一套详尽的实战指南。不同于常规摘要仅概述内容,本文将以“手把手”的教学方式,引导读者逐步构建一个功能完备的动态网站,涵盖前端用户界面设计、后端逻辑处理及数据库高效管理等关键环节,确保读者能够全面掌握PHP与MySQL在动态网站开发中的精髓。 ####
|
3月前
|
存储 关系型数据库 MySQL
MySQL MVCC深度解析:掌握并发控制的艺术
【10月更文挑战第23天】 在数据库领域,MVCC(Multi-Version Concurrency Control,多版本并发控制)是一种重要的并发控制机制,它允许多个事务并发执行而不产生冲突。MySQL作为广泛使用的数据库系统,其InnoDB存储引擎就采用了MVCC来处理事务。本文将深入探讨MySQL中的MVCC机制,帮助你在面试中自信应对相关问题。
233 3

推荐镜像

更多