开发者社区> 问答> 正文

关于使用Flink计算TopN的问题

最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式

create stream input table raw_log ( country STRING, domain STRING, flux LONG, request LONG, rowtime AS ROWTIME(request, "2 SECOND") ) USING kafka ( kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", startingOffsets = earliest, subscribe = "input" ) ROW FORMAT JSON; create stream output table top_n_result USING kafka ( kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", topic = "output" ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as select TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart, country, domain, sum(flux) as flux from raw_log group by TUMBLE(rowtime, INTERVAL '2' SECOND), country, domain; insert into top_n_result SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY wStart ORDER BY flux desc ) AS row_num FROM window_log ) WHERE row_num <= 10;

就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了

一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在

AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 11:11:19 538 0
1 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载