开发者社区> 问答> 正文

flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从sav

源表test: CREATE TABLE test ( id INT, name VARCHAR(255), time TIMESTAMP(3), status INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) 源表status CREATE TABLE status ( id INT, name VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' );

输出表 CREATE TABLE test_status ( id INT, name VARCHAR(255), time TIMESTAMP(3), status INT, status_name VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '100000', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' );

输出语句: INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据 test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 .....

status 0, status0 1, status1 2, status2 .....

操作顺序与复现: 1、启动任务,设置并行度为40, 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink savepoint保存,然后web ui上取消任务。 ==> test_status中的数据正常: 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。 /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 1 job 下, ==> test_status中的数据正常: 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1_modify 2, name2, 2020-07-06 00:00:00 , 1, status1_modify /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 40 job 下 ==> test_status中的数据不正常, id = 1,2的两条数据缺失: 0, name0, 2020-07-06 00:00:00 , 0, status0

怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题? 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??

*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-06 11:46:21 1395 0
1 条回答
写回答
取消 提交回答
  • 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423

    这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。

    这个bug 会在即将发布的 1.11.3 中修复。

    *来自志愿者整理的flink邮件归档

    2021-12-06 14:12:54
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载