1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口
例如: tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime"));
// 获得 DataStream,并定义wtm生成 SingleOutputStreamOperator r = tableEnv.toRetractStream(tableEnv.from("test3"), Row.class) .filter(x -> x.f0) // map ........ .returns(Types.TUPLE(Types.STRING, Types.LONG)) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(((element, recordTimestamp) -> element.f1)) );
参考 官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
// stream - 转 Table,指定Rowtime tableEnv.createTemporaryView("test5", r, $("msg"), $("rowtime").rowtime());
String sql5 = "select " + "msg," + "count(1) cnt" + " from test5 " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + ""; tableEnv.executeSql("insert into printlnRetractSink " + sql5);
结果: 无法触发窗口操作。 查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator // 返回的wtm永远都是 -9223372036854775808 public long getCurrentWatermark() { return internalTimerService.currentWatermark(); }
// 查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是 internalTimerService.currentWatermark() 却拿的是-9223372036854775808
// 当 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test group by msg,rowtime")); 语句改为 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime from test"));
结果就是正确的。 所以这是一个bug吗??*来自志愿者整理的flink邮件归档
你好, 你的flink版本是多少? 之前有个bug是Table转datastream 会丢rowtime问题,看起来是这个问题。
我在[1]里修复了,你可以升级对应的版本试下。
祝好, Leonard [1]https://issues.apache.org/jira/browse/FLINK-21013 https://issues.apache.org/jira/browse/FLINK-21013*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。