INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)
from kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming
update changes which is produced by node
GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS
EXPR$1])
在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让
GroupBy 的结果也发送到 Kafka 呢?
*来自志愿者整理的flink邮件归档
DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。