开发者社区> 问答> 正文

pyflink 如何使用session window对相同pv数据聚合?

hi,all: 一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。 希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w") t_env.from_path('source')
.window(session_window)
.group_by("w,pv_id")
.select("pv_id,get_act(act)").insert_into("sink")

http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-01 15:35:32 680 0
1 条回答
写回答
取消 提交回答
  • 1.12 还不支持session window的udaf,在1.13上将提供这部分的支持,具体可以关注JIRA[1]。

    然后,1.12是支持ProcessFunction和KeyedProcessFunction的,具体可以参考代码[2]

    [1] https://issues.apache.org/jira/browse/FLINK-21630

    [2]

    https://github.com/apache/flink/blob/release-1.12/flink-python/pyflink/datastream/functions.py*来自志愿者整理的flink邮件归档

    2021-12-01 15:57:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Get rid of traditional ETL, Move to Spark! 立即下载
Show Me The Money! Cost & Resource Tracking for Hadoop & Storm 立即下载
From Zero to Data Flow In Hours with Apache Nifi 立即下载