开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC中pgsql 有遇到这种 更新c列 发现获取不到before值的么?

Flink CDC中pgsql 有遇到这种 更新c列 发现获取不到before值的么?image.png

展开
收起
真的很搞笑 2023-12-04 07:33:11 113 0
3 条回答
写回答
取消 提交回答
  • 没,我这是正常的 ,此回答整理自钉群“Flink CDC 社区”

    2023-12-05 08:27:40
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink CDC中,当更新PostgreSQL数据库中的某一列时,如果该列在Flink CDC中,当更新PostgreSQL数据库中的某一列时,如果该列没有定义NOT NULL约束,那么在获取更新前的值时可能会出现问题。这是因为在更新操作中,如果没有提供新值,则该列的值将保持不变。因此,如果该列没有定义NOT NULL约束,则无法确定更新前的值是什么。

    为了解决这个问题,可以在创建源表时使用debezium-sql-connectorupdate.mode参数来指定更新模式。有两种更新模式可供选择:insertupdate。如果选择insert模式,则在插入新行时不会更新已存在的行;如果选择update模式,则会更新已存在的行。

    例如,以下是一个使用update模式的源表创建示例:

    CREATE TABLE my_source (
      id INT,
      name STRING,
      age INT,
      c STRING
    ) WITH (
      'connector' = 'pgsql-cdc',
      'hostname' = 'localhost',
      'port' = '5432',
      'database-name' = 'mydb',
      'table-name' = 'mytable',
      'username' = 'root',
      'password' = 'password',
      'debezium-sql-connector.history.kafka.bootstrap.servers' = 'localhost:9092',
      'debezium-sql-connector.history.kafka.topic' = 'dbhistory.mydb',
      'debezium-sql-connector.history.kafka.groupId' = 'mydb-group',
      'debezium-sql-connector.snapshot.mode' = 'initial',
      'debezium-sql-connector.max.queued.messages' = '10000',
      'debezium-sql-connector.max.retry.attempts' = '16',
      'debezium-sql-connector.heartbeat.interval' = '10000',
      'debezium-sql-connector.max.allowed.packet.size' = '5242880',
      'checkpointing.interval' = '60000',
      'update.mode' = 'update' -- 设置更新模式为update
    );
    

    通过这种方式,Flink CDC可以正确地处理PostgreSQL数据库中的更新操作,并能够获取到更新前的值。

    2023-12-04 14:28:26
    赞同 展开评论 打赏
  • 在Flink CDC中,当更新PostgreSQL数据库中的某一列时,如果该列在Flink CDC中,当更新PostgreSQL数据库中的某一列时,如果该列没有设置默认值或者不允许为空,那么在获取before值时可能会出现问题。

    这是因为在Flink CDC中,当一个事务提交时,它会将该事务中所有发生变化的行的状态都设置为DELETED,然后再将这些行的状态重新设置为INSERTED或UPDATED。在这个过程中,如果某个列的值发生了变化,那么Flink CDC会认为这个列是一个新插入的列,因此无法获取到before值。

    为了解决这个问题,你可以尝试以下方法:

    1. 在PostgreSQL数据库中为需要更新的列设置默认值或者允许为空。这样,在更新该列时就可以获取到before值了。

    2. 使用Flink CDC的upsert功能。通过upsert功能,你可以将更新操作转换为插入操作,这样就可以获取到before值了。但是需要注意的是,upsert功能可能会增加写入数据的成本和延迟。

    2023-12-04 11:59:28
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载