开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC里我一执行滚动开窗,就报下面的错误怎么解决?

Flink CDC里就是消费kafka中的debezium-json格式数据,使用滚动窗口统计1分钟新增的数据量,这个简单的需求,被卡住了。之前一直写的datastream,现在想用flink sql来实现我本来就是消费的kafka数据,为什么还要推到kafka里面?我现在数据源是kafka,kafka中的数据是flink cdc同步到kafka中的,数据有更新,有插入。
// 源表
tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
" UNSEND_ID STRING,\n" +
" SEND_TIME BIGINT,\n" +
" SENDER_CODE STRING,\n" +
" STATUS STRING,\n" +
" CREATE_TIME BIGINT,\n" +
" ts AS TO_TIMESTAMP_LTZ(CREATE_TIME, 3),\n" +
" WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'T_E10_MESSAGE_UNSEND_INFO',\n" +
" 'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
" 'properties.group.id' = 'test',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'debezium-json'\n" +
")\n");// 开窗聚合
tableEnv.executeSql("select\n" +
" TUMBLE_START(ts, interval '1' MINUTE),\n" +
" TUMBLE_END(ts, interval '1' MINUTE),\n" +
" count(1) as cnt\n" +
"from KafkaTable\n" +
"group by TUMBLE(ts, interval '1' MINUTE)").print();
就是上面的数据,我一执行滚动开窗,就报下面的错误怎么解决?Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, KafkaTable, watermark=[-(TO_TIMESTAMP_LTZ($4, 3), 2000:INTERVAL SECOND)]]], fields=[UNSEND_ID, SEND_TIME, SENDER_CODE, STATUS, CREATE_TIME])有点理解了,但是为什么我指定'format' = 'debezium-json'就不行呢?

展开
收起
小小鹿鹿鹿 2024-02-01 15:26:19 101 0
3 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据你提供的信息,问题出在使用了debezium-json格式的数据源。滚动窗口聚合操作不支持消费由TableSourceScan节点产生的更新和删除更改。这是因为debezium-json格式的数据源会将数据变更(如插入、更新和删除)作为单独的记录发送到Kafka,而不是将它们作为数据流的一部分。

    要解决这个问题,你可以尝试以下方法:

    1. 使用Flink SQL的ROW_NUMBER()函数为每个数据变更分配一个唯一的行号。然后,你可以根据这个行号进行分组和聚合操作。这样,你可以避免使用滚动窗口聚合操作,而是使用分组和聚合操作来处理数据变更。

    2. 如果你仍然希望使用滚动窗口聚合操作,你可以考虑将数据变更转换为数据流。这可以通过使用Flink的CEP(Complex Event Processing)库来实现。CEP库允许你处理事件流中的模式匹配和状态管理。你可以使用CEP库来检测数据变更,并将它们转换为数据流,然后再应用滚动窗口聚合操作。

    关于为什么指定'format' = 'debezium-json'就不行的问题,这是因为debezium-json格式的数据源与Flink SQL的滚动窗口聚合操作不兼容。如上所述,debezium-json格式的数据源会将数据变更作为单独的记录发送到Kafka,而不是将它们作为数据流的一部分。因此,你需要使用其他方法来处理这些数据变更,例如使用Flink SQL的ROW_NUMBER()函数或使用Flink的CEP库。

    2024-02-02 13:59:56
    赞同 展开评论 打赏
  • cdc数据不支持,实现需求需要转换下处理下,但是不能接入update和delete语句。一个简单的办法,直接接kafka是json格式,然后消费json来处理,因为你只需要消费debeziuum-json里面after里面的数据。参考:
    https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/debezium/
    此回答来自钉群Flink CDC 社区。

    2024-02-01 18:38:21
    赞同 展开评论 打赏
  • 报错提示是因为StreamPhysicalGroupWindowAggregate操作目前不支持直接处理由Debezium生成的包含更新和删除事件的变更日志。Debezium JSON格式会携带数据库更改事件的类型(insert、update、delete),而滚动窗口聚合通常用于计算基于插入事件的统计数据。

    解决方案可以是:

    • 使用Flink CDC connector提供的内置表函数如upsert-kafka,它能处理这些变更流并将它们以upsert的形式输出。
    • 或者在SQL查询之前,通过Flink DataStream API进行预处理,过滤掉更新和删除事件,只保留插入事件再进行窗口聚合。
    2024-02-01 16:55:08
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载