开发者社区> 问答> 正文

flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql

您好,请教您一个问题

flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( log_id  string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with (  'connector' = 'kafka',  'topic' = 'kafka_table',  'properties.bootstrap.servers' = '10.2.12.3:9092',  'properties.group.id' = 'tmp-log-consumer003',  'format' = 'json',  'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip

select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root  |-- log_id: STRING  |-- process_time: TIMESTAMP(3) NOT NULL PROCTIME  |-- ts: TIMESTAMP(3) ROWTIME

 输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-03 16:21:47 1201 0
1 条回答
写回答
取消 提交回答
  • 重复的问题。我将刚刚的回答也贴在这里。

    如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:

    1. 保证所有 partition 都有数据。

    2. 且每个 partition 数据的 event time 都在前进

    3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =

    11s

    以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

    *来自志愿者整理的flink邮件归档

    2021-12-06 11:00:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载