Flink CDC元数据我加上去了,但是在sink端写入的时候 还是无法作为字段写入?
Flink CDC元数据是指从数据库中捕获的变更数据的一些额外信息,例如操作类型、时间戳、表名等。 这些元数据可以更好地理解和处理变更数据,例如根据操作类型进行过滤或者转换,或者根据时间戳进行排序或者窗口等。
如果想在sink端写入的时候使用Flink CDC元数据作为字段写入,需要注意以下几点:
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10, 3),
op_type STRING METADATA FROM 'value.source.op' VIRTUAL, -- 操作类型
op_ts TIMESTAMP(3) METADATA FROM 'value.source.ts_ms' VIRTUAL -- 时间戳
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
CREATE TABLE es_sink (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10, 3),
op_type STRING, -- 操作类型
op_ts TIMESTAMP(3) -- 时间戳
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'products'
);
INSERT INTO es_sink
SELECT id, name, description, weight, op_type, op_ts
FROM mysql_binlog;
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。