原sql
select 0 as id
, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h
,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h
from dwd_XXX
where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
group by TUMBLE(proctime ,interval '1' HOUR);
报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以
数据源不是flink-mysql-cdc得来的
是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json');
上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json:
WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); *来自志愿者整理的flink邮件归档
看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
如果是 retract ,应该就不能再上面进行窗口计算了。
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。