例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 12:46 (4), 13:16 (0)
即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
如果用sliding window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
所以想问一下:
针对这种case有没有标准做法?sql支持吗?
要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?*来自志愿者整理的flink邮件归档
可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
INSERT INTO mysink
SELECT
ts, userid,
COUNT(userid)
OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt
FROM mysrc
以如下输入为例:
"2019-12-05 12:02:00,user1",
"2019-12-05 12:13:00,user1",
"2019-12-05 12:15:00,user1",
"2019-12-05 12:31:00,user1",
"2019-12-05 12:40:00,user1",
"2019-12-05 12:45:00,user1"
产出如下结果:
{"cnt":1,"ts":1575547320000,"userid":"user1"}
{"cnt":2,"ts":1575547980000,"userid":"user1"}
{"cnt":3,"ts":1575548100000,"userid":"user1"}
{"cnt":4,"ts":1575549060000,"userid":"user1"}
{"cnt":4,"ts":1575549600000,"userid":"user1"}
{"cnt":4,"ts":1575549900000,"userid":"user1"}
为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/ 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
{
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "userid",
"type": "STRING"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input":[
"2019-12-05 12:02:00,user1",
"2019-12-05 12:13:00,user1",
"2019-12-05 12:15:00,user1",
"2019-12-05 12:31:00,user1",
"2019-12-05 12:40:00,user1",
"2019-12-05 12:45:00,user1"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid) OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
}
当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeType从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。