这就是我的流数据:
time | id | group |
---|---|---|
1 | a1 | b1 |
2 | a1 | b2 |
3 | a1 | b3 |
4 | a2 | b3 |
在我们的窗口中考虑上面的所有示例 我的用例获取最新的独特ID。
我需要输出如下:
time | id | group |
---|---|---|
3 | a1 | b3 |
4 | a2 | b3 |
我怎样才能在Flink中实现这一目标?
我知道窗口功能WindowFunction。但是,我无法绕过这样做。
我试过这只是为了获得不同的ID。如何将此功能扩展到我的用例?
class DistinctGrid extends WindowFunction[UserMessage, String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[UserMessage], out: Collector[String]): Unit = {
val distinctGeo = input.map(_.id).toSet
for (i <- distinctGeo) {
out.collect(i)
}
}
}
如果您通过id字段键入流,则无需考虑不同的ID - 您将为每个不同的键设置单独的窗口。您的窗口函数只需迭代窗口内容以查找具有最大时间戳的UserMessage,并将其作为窗口的结果输出(对于该键)。但是,有一个内置函数可以做到这一点 - 查看maxBy()的文档 - 因此在这种情况下不需要窗口函数。
粗略地说,这看起来像
stream.keyBy("id")
.timeWindow(Time.minutes(10))
.maxBy("time")
.print()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。