开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

用flink cdc同步rds上的mysql数据到kafka里,请问这是什么情况?

用flink cdc同步rds上的mysql数据到kafka里,但是每次update的时候会被拆成一个create和一个delete事件。请问这是什么情况,我如果需要update时输出update事件该怎么处理?

展开
收起
真的很搞笑 2023-11-16 08:07:44 119 0
3 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    这是因为在MySQL中,更新操作实际上是由两个操作组成的:先删除旧记录,然后再插入新记录。因此,在Flink CDC中,每次更新操作都会被拆分成一个delete事件和一个create事件。
    如果你想要在更新时输出update事件,可以使用以下两种方法之一:
    方法一:在connector的配置文件中,将"debezium.snapshot.incremental.include-ddl"属性设置为true。这样,在进行全量同步时,会包含DDL语句,而不会生成delete和create事件。然后,在流处理程序中,可以通过检查每个事件的操作类型来区分不同的事件。
    方法二:在流处理程序中,可以使用Flink SQL中的窗口函数来合并连续的create和delete事件,从而生成update事件。例如,可以使用如下SQL语句:

    SELECT 
        user_id, 
        name, 
        phone_num,
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num,
        LAG(name) OVER (PARTITION BY user_id ORDER BY ts DESC) AS prev_name,
        LAG(phone_num) OVER (PARTITION BY user_id ORDER BY ts DESC) AS prev_phone_num
    FROM 
        MyTable
    WHERE 
        operation = 'c' OR operation = 'd'
    

    这里使用了ROW_NUMBER()函数来为每个用户的所有事件分配一个行号,并使用LAG()函数来获取前一个事件的信息。然后,可以在应用程序中比较当前事件和前一个事件的信息,如果发生了更新,则生成一个update事件。

    2023-11-16 13:21:04
    赞同 展开评论 打赏
  • 这种情况可能是因为Flink CDC在处理MySQL的update操作时,会将其解析为一个先delete后create的操作序列。因此,在同步到Kafka时,就会生成两个事件:一个delete事件和一个create事件。

    如果您需要将update操作输出为单个update事件,可以考虑使用Debezium的value.converter参数来自定义数据转换逻辑。具体来说,您可以编写一个自定义的Converter类,用于将update操作转换为单个update事件。以下是一个示例代码:

    public class MySqlUpdateConverter implements DebeziumValueConverter<String, String> {
        @Override
        public String convert(String value) {
            // 在这里实现您的转换逻辑,将update操作转换为单个update事件
            return "UPDATE: " + value;
        }
    }
    

    然后,在Flink CDC的配置中,将该Converter类添加到value.converter参数中即可:

    ```json
    {
    "name": "mydb",
    "databases": [
    {
    "name": "mydb",
    "tables": [
    {
    "name": "mytable",
    "schema": "...(省略)...",
    "debezium": {
    "sql.include.schema.changes": "true",
    "value.converter": "com.example.MySqlUpdateConverter", // 添加自定义Converter类
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "offset.storage": "org.apache.flink.connector.filesystem.bucketing.BucketingOffsetStorage",
    "offset.flush.interval": "60000",
    "offset.storage.file.filename": "/path/to/offset/storage/file",
    "offset.flush.timeout": "10000",
    "offset.retention.checker": "org.apache.flink.connector.mysql.cdc.config.OffsetRetentionChecker",
    "offset.retention.duration": "7200000",
    "max.queued.messages": "1000",
    "max.batch.size": "2048",
    "max.allowed.latency": "30000",
    "table.whitelist": "mydb.mytable",
    "username": "root",
    "password": "",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.server.id": "85794",
    "database.server.name": "localhost",
    "database.whitelist": "mydb",
    "database.history": "io.debezium.relational.history.FileDatabaseHistory",
    "database.history.file.filename": "/path/to/dbhistory/file",
    "database.history.poll.interval": "60000",
    "database.history."

    2023-11-16 10:10:29
    赞同 展开评论 打赏
  • 自建的喔,不是一个before一个after嘛,此回答整理自钉群“Flink CDC 社区”

    2023-11-16 08:43:21
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像