开发者社区> 问答> 正文

如何获取一分钟处理窗口的最新记录

"我有一个利用Windows的flink流媒体工作。

我的目标是id在一分钟内按内部收到的记录进行分组,然后仅按最新记录流式传输记录id。

我想出了两种可能的方法:

运用 reduce()

stream.keyBy(Record::getId)

.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.reduce((rec1, rec2) -> rec2);

这工作得很好,但似乎浪费,因为它要求每个和每一个记录。

运用 process()

stream.keyBy(Record::getId)

.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new ProcessWindowFunction<Record, Object, Long, TimeWindow>() {
    @Override
    public void process(Long aLong, Context context, Iterable<Record> iterable, Collector<Object> collector) throws Exception {
        Record last = null;
        for (Record rec : iterable) {
            if (last == null || last.getTimestamp() < rec.getTimestamp()) {
                last = rec;
            }
        }
        collector.collect(last);
    }
});

这也可以正常工作。我曾预料到它会更快,但事实并非如此(它与解决方案1大致相同)。

你能推荐一个更好的方法吗?"

展开
收起
flink小助手 2018-11-28 16:31:54 1646 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "你的解决方案1.似乎是最好的方法。

    关于你评论:

    这工作正常,但似乎浪费,因为它被称为每个记录。

    问题是你不知道哪个记录是最后一个。因此,您始终需要存储上次查看的记录。由于a的结果ReduceFunction存储在状态中(无论是对于方法的下一次评估还是作为结果返回),这正是这里发生的事情。

    你的解决方案2.实际上效率较低(从存储/内存的角度来看)。它会记住在一分钟内到达的所有记录,并在评估窗口时迭代所有记录。相反,解决方案1.仅存储单个值(最后一次功能评估的结果)。

    你可以使用常规ProcessFunction和计时器实现解决方案,但是,我认为这不会比窗口+快得多ReduceFunction。而且,它需要更多的代码。"

    2019-07-17 23:16:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载