场景描述
设备工作期间每隔固定周期(比如30s)就往kafka发送采集到的数据,当设备超过15分钟没有上报数据,则认为设备离网了。如何每隔10分钟计算出设备在该10分钟时间段内的离网时长?
按我的理解(可能有错,请多指正),stream不设置window的时候会使用全量窗口,当新数据来的时候,只是将全量窗口的该设备的最新上报时间更新,每隔10分钟获取globalwindow数据,然后filter最新上报时间到现在时间的差值大于15分钟即可,但是我没有找到flink里如何获取globalwindow的数据。
btw:如果是spark,是可以做的,filter>15min,然后output的model设置为complete即可获取globalwindow的数据。请问flink下该方案该如何做?
问题很久远了,因为做过类似的设备监控的事情,简单说下我的做法:不靠flink本身来做,而是通过redis的timeout来做的。
这个用session window应该可以解决。通过设备ID作key用keyby把同一个设备的数据放到一起,然后用session window,比如设置session超时时间15分钟,那设备如果如果超过15分钟没有汇报,则这15分钟前和后的数据会被分在两个window里面,对于单个window中的数据可以自己实现个AggregateFunction来计算,求得这个window的最大时间戳和最小时间戳(可以返回一个Tuple),两个连续的session window之前的时间差,也就是后一个的最小时间戳减去前一个的最大时间戳,就可以作为设备的离网时间了。
不知道你的需求是不是这样。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。