开发者社区> 问答> 正文

TUMBLE函数不支持 回撤流

原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);

报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以

数据源不是flink-mysql-cdc得来的

是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),

'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json');

上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json:

WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); *来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-03 16:45:26 1304 0
1 条回答
写回答
取消 提交回答
  • 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。

    如果是 retract ,应该就不能再上面进行窗口计算了。

    *来自志愿者整理的flink邮件归档

    2021-12-06 11:24:26
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载