开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink cdc addsouce PG库 update操作 拿不到before数据 为?

flink cdc addsouce PG库 update操作 拿不到before数据 为null 这个有人碰到吗?

展开
收起
真的很搞笑 2023-07-02 17:42:38 280 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中使用 PostgreSQL 作为源并执行 UPDATE 操作时,无法获取到 BEFORE 数据的原因可能是由于 PostgreSQL 数据库对 UPDATE 操作的执行顺序不同于其他数据库,导致 Flink CDC 无法正确获取 BEFORE 数据。
    具体地说,PostgreSQL 在执行 UPDATE 操作时,会先执行 NEW 数据的插入,再执行 OLD 数据的删除,而 Flink CDC 在获取 BEFORE 数据时是在 DELETE 操作之前获取的,因此无法获取到 BEFORE 数据。这种情况下,可以考虑使用 PostgreSQL 的触发器来在 UPDATE 操作执行前获取 BEFORE 数据,并将其保存到一个临时表中,然后在 Flink CDC 中通过查询该临时表来获取 BEFORE 数据。
    以下是一个使用 PostgreSQL 触发器来获取 BEFORE 数据的示例:
    创建一个临时表来存储 BEFORE 数据:
    sql
    Copy
    CREATE TABLE before_data (
    id INT PRIMARY KEY,
    column1 VARCHAR(50),
    column2 INT,
    ...
    );
    创建一个触发器,在 UPDATE 操作执行前将 BEFORE 数据保存到临时表中:
    sql
    Copy
    CREATE OR REPLACE FUNCTION save_before_data() RETURNS trigger AS $$
    BEGIN
    INSERT INTO before_data(id, column1, column2, ...)
    VALUES(OLD.id, OLD.column1, OLD.column2, ...);
    RETURN OLD;
    END;
    $$ LANGUAGE plpgsql;

    CREATE TRIGGER before_update_trigger
    BEFORE UPDATE ON your_table_name
    FOR EACH ROW
    EXECUTE FUNCTION save_before_data();
    在上述示例中,我们创建了一个名为 before_update_trigger 的触发器,在 UPDATE 操作执行前将 BEFORE 数据保存到临时表 before_data 中。
    在 Flink CDC 中查询临时表获取 BEFORE 数据:
    java
    Copy
    FlinkJdbcLookupFunction jdbcLookupFunction = JdbcLookupOptions.builder()
    .setTableName("your_table_name")
    .setDriverName("org.postgresql.Driver")
    .setDBUrl("jdbc:postgresql://localhost:5432/your_database_name")
    .setUsername("your_username")
    .setPassword("your_password")
    .setQuery("SELECT FROM before_data WHERE id = ?")
    .setRowTypeInfo(rowTypeInfo)
    .setCacheMaxSize(1000)
    .setCacheExpireMs(10
    60 * 1000)
    .build()
    .createLookupFunction();

    DataStream> result = stream
    .keyBy(...)
    .process(new KeyedProcessFunction<..., ..., Tuple2>() {
    @Override
    public void processElement(... value, Context ctx, Collector> out) throws Exception {
    // 获取 BEFORE 数据
    Tuple2 beforeData = jdbcLookupFunction.asyncInvoke(value.getField(0));

            // 处理数据
            // ...
    
            // 发送结果
            out.collect(...);
        }
    });
    

    在上述示例中,我们创建了一个 FlinkJdbcLookupFunction 实例,用于查询临时表 before_data 中的 BEFORE 数据。然后在 KeyedProcess

    2023-07-30 09:36:44
    赞同 展开评论 打赏
  • 在 Flink CDC 的 PostgreSQL (PG) 库中,通过 addSource 方法进行 update 操作时,默认情况下是无法获取到 before 数据的。这是由于 PostgreSQL 的逻辑复制机制,导致 Flink CDC 无法直接获取到 before 数据。

    要解决这个问题,你可以尝试以下两种方法:

    1. 使用逻辑复制插件: 在 PostgreSQL 中启用逻辑复制插件,例如 pgoutput 插件。通过启用逻辑复制插件,可以使 Flink CDC 能够获取到完整的 before 和 after 数据。请参考 PostgreSQL 文档了解如何启用和配置逻辑复制插件。

    2. 自定义 CDC Connector: 如果你对 Flink 有一定的开发经验,可以自己实现一个自定义的 CDC Connector。通过监听 PostgreSQL 的数据变更事件,并手动捕获 before 数据并合并到 Flink CDC 的输出中。

    需要注意的是,以上两种方法都需要进行额外的配置和开发工作。选择哪种方法取决于你的具体需求和可行性。

    另外,如果你只是需要 before 数据来进行变更比较或处理特定的业务逻辑,而不需要在输出中包含所有 before 数据,你可以考虑使用 Temporal Table Join 或 Table Function 来实现维表关联,从而获取到 before 数据进行处理。

    2023-07-30 09:39:46
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载