开发者社区> 问答> 正文

Session Window使用event time延迟特别高是怎么回事?

> 大家好, > > 我遇到一个问题一直想不明白原因,想请教大家 > > 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。 > > 代码大致如下 > // Topn聚合 > DataStream itemList = resultDataStream >                 .assignTimestampsAndWatermarks( >                         new > > BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100)) > { >                             @Override >                              public long extractTimestamp(PredictResult > predictResult) { >                              return predictResult.getDate_timestamp(); >                            } >                         } >                 ) >                 .keyBy("userId") > > .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) >                 .process(new TopNService(11)); >         itemList.print("IRS_RESULT: "); > > > 作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用 > ProcessTimeSessionWindow后,延迟降低为一秒以内。 > 使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。 > 我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~ > > > 谢谢 > > // top n方法 > >     public static class TopNService extends > ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> { > >         private final int topSize; > >         public TopNService(int topSize) { >             this.topSize = topSize; >         } >         @Override >         public void process(Tuple tuple, Context context, > Iterable<PredictResult> iterable, Collector<Object> collector) throws > Exception { >             List<PredictResult> allItems = new ArrayList<>(); >             for (PredictResult predictResult:iterable){ >                 allItems.add(predictResult); >             } >             allItems.sort(new Comparator<PredictResult>() { >                 @Override >                 public int compare(PredictResult o1, PredictResult o2) { >                     return o2.probability.compareTo(o1.probability); >                 } >             }); >             int userId = allItems.get(0).userId ; >             String logonType=allItems.get(0).getLogonType(); >             StringBuilder result = new StringBuilder(); >             for (int i=0;i<topSize;i++) { >                 PredictResult currentItem = allItems.get(i); >                 result.append(currentItem.serviceId).append(","); >             } >             LocalDate localDate = LocalDate.now(); >             LocalTime localTime = LocalTime.now(); >             //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 start >             JSONObject resultJson = new JSONObject(); >             resultJson.put("user_id", userId); >             resultJson.put("logon_type", logonType); >             resultJson.put("date", localDate + " " + localTime); >             JSONArray jsonArray = new JSONArray(); >             jsonArray.add(resultJson); >             resultJson.put("service_id", result.toString()); >             //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 end >             collector.collect(jsonArray.toString()); >         } >     } >

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:50:14 1147 0
1 条回答
写回答
取消 提交回答
  • 个人理解可以从以下几个角度分析下 1.你说的30s和1s的延迟分别都是怎样对比出来的,参照物是否一致 2.如果上述参照物都是事件数据的原始时间戳的话,可以按照Congxian说的方法查看下 3.如果不是原始时间戳,并且MaxOutOfOrderness = 100ms的情况下,可以先查看是否是根源问题,查看是否是Flink消费作业时,上游事件时间戳是否已经有比较大的延迟

    *来自志愿者整理的flink邮件归档

    2021-12-07 20:16:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Window_Time_Watermark 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载