您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( log_id
string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with ( 'connector' = 'kafka', 'topic' = 'kafka_table', 'properties.bootstrap.servers' = '10.2.12.3:9092', 'properties.group.id' = 'tmp-log-consumer003', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip
select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root |-- log_id: STRING |-- process_time: TIMESTAMP(3) NOT NULL PROCTIME |-- ts: TIMESTAMP(3) ROWTIME
输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806*来自志愿者整理的flink邮件归档
重复的问题。我将刚刚的回答也贴在这里。
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
保证所有 partition 都有数据。
且每个 partition 数据的 event time 都在前进
且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s
以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。