Apache Flink:自定义触发器意外行为
我有一个DataStream,它由一个事件组成,该事件具有一个表示一批生成元素的属性。该属性,我们称之为'batchNumber',在我从同一生产批次中摄取的每个事件中都是恒定的。我每批收到多个事件。
我想在'batchNumber'更改时分析批处理中的机器性能。我的方法是使用全局流并使用'batchNumber'作为密钥对其进行分区。我希望这会将全局流分区为窗口,其中每个事件都有'batchNumber'。然后我定义一个触发器,当'batchNumber'发生变化时应该触发。然后我可以分析ProcessWindowFunction中的聚合数据。
我的问题是:
当prodnr改变时,触发器并不总是触发
即使它发射,也只有一个元素被聚合。我期待接近200。
这是我正在使用的代码。
public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {
private static final long serialVersionUID = 1L;
public batchnrTrigger() {}
private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);
@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);
if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
batchnrState.update(element.batchnr);
return TriggerResult.FIRE;
}
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
batchnrState.update(element.batchnr);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
}
}
这就是我称之为触发器的方式:
DataStream imaginePaperDataStream = nifiStreamSource
.map(new ImaginePaperDataConverter())
.keyBy((ImaginePaperData event) -> event.lunum)
.window(GlobalWindows.create())
.trigger(new LunumTrigger())
.process(new ImaginePaperWindowReportFunction());
您确定要通过event.lunum键入流吗?如果你期望每个不同的lunum值大约有200个事件,这是有道理的。但是如果每个lunum值只有一个事件,那就可以解释你所看到的行为。
此外,您确定您的活动正在按顺序处理吗?如果批处理在并行处理之间的竞争条件下在处理管道中的某处交错,那么这也可能有助于解释您所看到的内容。
此外,您应该在触发器的clear方法中清除状态。并且您需要实现一个Evictor,以便在处理后从窗口中删除元素。
窗口API的这一部分非常复杂。我认为这个特定的应用程序将更直接地实现为RichFlatMap,它收集ListState中的项目,直到批号更改(您将保留在ValueState中)。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。