各位大神,你们好:
最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束 窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束 时发送这个数据,应该如何处理?万分感激
// 这里配置了kafka的信息,并进行数据流的输入
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 kafkaSource = new FlinkKafkaConsumer010<>("rfid-input-topic",
new RfidRawDataSchema(), props);
kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());
DataStream dataStream = env.addSource(kafkaSource);
// 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写 的是窗口融合的方法
DataStream outputStream = dataStream.keyBy("uniqueId")
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new
RfidReduceFunction());
//通过kafka数据流的输出
outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new RfidRawDataSchema(), props));
try {
env.execute("Flink add data source");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*来自志愿者整理的flink邮件归档
您好,下面是个人理解:
首先,这个问题不是Session窗口的问题,滚动和滑动是一样的。
时间窗口的计算输出是由时间特性确定的,目前 1. 只有processing-time即处理时间(没有水位线处理乱序)能够满足及时输出窗口的结果。 2. 把eventtime的水位线时间戳设置为System.currentTimeMillis()也是可以及时输出窗口的结果,但是只会消费Flink程序启动后接收到的新消息,之前的消息是处理不到的,即便是新的消费者组group.id和earliest也无效【意思就是容错和重播失效,当然还可以再反复验证】。
目前EventTime-事件时间做到实时正确性的前提:数据的事件时间间隔小,或者小于窗口时间间隔就可以了,保证数据流不中断,这样就把不及时输出窗口的时间点无限推到无穷大的未来,即程序最终崩溃或者下线那一刻。
水位线是用来处理事件乱序的,水位线的增长依赖数据的输入,这个是很明显的咯,assignTimestampsAndWatermarks的时候根据事件时间推算的嘛,而且还会减掉一点时间,就是多掳一点数据,所以数据中断了,就是水位线停止增长了。
然后再来看,事件时间窗口默认使用的窗口触发源码: onElement和onEventTime时才有机会TriggerResult.FIRE; onElement时会判断水位线。
onEventTime时会根据水位线设置的时间戳定时器进行时间比较。 onEventTime往上找会找到InternalTimerServiceImpl#advanceWatermark 再往上找会到AbstractStreamOperator#processWatermark, 也就是和新的数据进来有关。
结论就是,如果当前事件时间窗口的end时间还没到,然而水位线是小于这个end时间的,如果处理乱序的间隔比较大,甚至会有多个窗口的end时间都大于最近的水位线时间戳,那不就是把窗口往后退了嘛...只有更后面的数据到来,新的水位线增长上去,前面滞留的窗口数据才有机会输出。
所以我的想法是,在每一个时间窗口上面加上一个判断,只要当前窗口未关闭未触发,窗口的end时间大于或等于自然时间点就触发【保证只触发一次就好】,不需要等到下一次水位线增长。
另外,目前的事件时间是符合自然的实时流数据语义的,可是,业务数据有时候间隔还是蛮大的,毕竟有一些阶段数据比较密集,有一些阶段数据比较稀疏。
以上为个人理解,也遇到同样的问题,甚至认为事件时间在Flink这里毫无意义,如有哪里不对的地方,做梦都想肯定是哪里不对,欢迎讨论,如果真的不对,希望能给出正确的demo,这样就可以完美的用于生产了。
还有就是我默认为,窗口是根据事件已经确定好了的: 时间窗口的生成:
模板方法-处理水位线:AbstractStreamOperator#processWatermark
InternalTimerServiceImpl#advanceWatermark
默认的事件时间触发器:
在 2019-04-29 18:06:30,by1507118@buaa.edu.cn 写道:
各位大神,你们好:
最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束时发送这个数据,应该如何处理?万分感激
// 这里配置了kafka的信息,并进行数据流的输入
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010 kafkaSource = new FlinkKafkaConsumer010<>("rfid-input-topic",
new RfidRawDataSchema(), props);
kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());
DataStream dataStream = env.addSource(kafkaSource);
// 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写的是窗口融合的方法
DataStream outputStream = dataStream.keyBy("uniqueId")
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new
RfidReduceFunction());
//通过kafka数据流的输出
outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new RfidRawDataSchema(), props));
try {
env.execute("Flink add data source");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。