目前我司通过flinkcdc将mysql表同步至es过程中出现数据丢失,但是丢失数据更新任意字段又能同步上去,sql如下:
CREATE TABLE prod_erp_goods_cdc (
id INT,
...
created_at TIMESTAMP(3),
updated_at TIMESTAMP(3),
deleted_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '',
'port' = '',
'username' = '',
'password' = '',
'database-name' = '',
'table-name' = '',
'scan.incremental.snapshot.enabled'='true'
);
CREATE TABLE esprod_erp_goods_sink (
id INT,
...
created_at timestamp_ltz(3),
updated_at timestamp_ltz(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '',
'index' = '',
'sink.bulk-flush.max-actions' = '1000', -- 每条数据直接flush到ES
-- 'sink.bulk-flush.max-size' = '1mb', -- 数据大小达到1mb时flush到ES
'sink.bulk-flush.interval' = '1s', -- 每1秒钟进行一次flush
-- 'connection.timeout' = '10000',
-- 'socket.timeout' = '30000',
'format' = 'json', -- 非常重要的配置
'json.timestamp-format.standard' = 'ISO-8601' -- 非常重要的配置
);
insert into esprod_erp_goods_sink
select id ,
...
JSON_VALUE(img_url_list, '$[0]'),
...
cast(created_at as timestamp_ltz(3)),
cast(updated_at as timestamp_ltz(3))
from prod_erp_goods_cdc
where deleted_at is null;
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。