flink cdc addsouce PG库 update操作 拿不到before数据 为null 这个有人碰到吗?
在 Flink CDC 中使用 PostgreSQL 作为源并执行 UPDATE 操作时,无法获取到 BEFORE 数据的原因可能是由于 PostgreSQL 数据库对 UPDATE 操作的执行顺序不同于其他数据库,导致 Flink CDC 无法正确获取 BEFORE 数据。
具体地说,PostgreSQL 在执行 UPDATE 操作时,会先执行 NEW 数据的插入,再执行 OLD 数据的删除,而 Flink CDC 在获取 BEFORE 数据时是在 DELETE 操作之前获取的,因此无法获取到 BEFORE 数据。这种情况下,可以考虑使用 PostgreSQL 的触发器来在 UPDATE 操作执行前获取 BEFORE 数据,并将其保存到一个临时表中,然后在 Flink CDC 中通过查询该临时表来获取 BEFORE 数据。
以下是一个使用 PostgreSQL 触发器来获取 BEFORE 数据的示例:
创建一个临时表来存储 BEFORE 数据:
sql
Copy
CREATE TABLE before_data (
id INT PRIMARY KEY,
column1 VARCHAR(50),
column2 INT,
...
);
创建一个触发器,在 UPDATE 操作执行前将 BEFORE 数据保存到临时表中:
sql
Copy
CREATE OR REPLACE FUNCTION save_before_data() RETURNS trigger AS $$
BEGIN
INSERT INTO before_data(id, column1, column2, ...)
VALUES(OLD.id, OLD.column1, OLD.column2, ...);
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER before_update_trigger
BEFORE UPDATE ON your_table_name
FOR EACH ROW
EXECUTE FUNCTION save_before_data();
在上述示例中,我们创建了一个名为 before_update_trigger 的触发器,在 UPDATE 操作执行前将 BEFORE 数据保存到临时表 before_data 中。
在 Flink CDC 中查询临时表获取 BEFORE 数据:
java
Copy
FlinkJdbcLookupFunction jdbcLookupFunction = JdbcLookupOptions.builder()
.setTableName("your_table_name")
.setDriverName("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:5432/your_database_name")
.setUsername("your_username")
.setPassword("your_password")
.setQuery("SELECT FROM before_data WHERE id = ?")
.setRowTypeInfo(rowTypeInfo)
.setCacheMaxSize(1000)
.setCacheExpireMs(10 60 * 1000)
.build()
.createLookupFunction();
DataStream> result = stream
.keyBy(...)
.process(new KeyedProcessFunction<..., ..., Tuple2>() {
@Override
public void processElement(... value, Context ctx, Collector> out) throws Exception {
// 获取 BEFORE 数据
Tuple2 beforeData = jdbcLookupFunction.asyncInvoke(value.getField(0));
// 处理数据
// ...
// 发送结果
out.collect(...);
}
});
在上述示例中,我们创建了一个 FlinkJdbcLookupFunction 实例,用于查询临时表 before_data 中的 BEFORE 数据。然后在 KeyedProcess
在 Flink CDC 的 PostgreSQL (PG) 库中,通过 addSource
方法进行 update 操作时,默认情况下是无法获取到 before 数据的。这是由于 PostgreSQL 的逻辑复制机制,导致 Flink CDC 无法直接获取到 before 数据。
要解决这个问题,你可以尝试以下两种方法:
1. 使用逻辑复制插件: 在 PostgreSQL 中启用逻辑复制插件,例如 pgoutput
插件。通过启用逻辑复制插件,可以使 Flink CDC 能够获取到完整的 before 和 after 数据。请参考 PostgreSQL 文档了解如何启用和配置逻辑复制插件。
2. 自定义 CDC Connector: 如果你对 Flink 有一定的开发经验,可以自己实现一个自定义的 CDC Connector。通过监听 PostgreSQL 的数据变更事件,并手动捕获 before 数据并合并到 Flink CDC 的输出中。
需要注意的是,以上两种方法都需要进行额外的配置和开发工作。选择哪种方法取决于你的具体需求和可行性。
另外,如果你只是需要 before 数据来进行变更比较或处理特定的业务逻辑,而不需要在输出中包含所有 before 数据,你可以考虑使用 Temporal Table Join
或 Table Function
来实现维表关联,从而获取到 before 数据进行处理。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。