开发者社区> 问答> 正文

Apache Flink:自定义触发器意外行为

Apache Flink:自定义触发器意外行为
我有一个DataStream,它由一个事件组成,该事件具有一个表示一批生成元素的属性。该属性,我们称之为'batchNumber',在我从同一生产批次中摄取的每个事件中都是恒定的。我每批收到多个事件。

我想在'batchNumber'更改时分析批处理中的机器性能。我的方法是使用全局流并使用'batchNumber'作为密钥对其进行分区。我希望这会将全局流分区为窗口,其中每个事件都有'batchNumber'。然后我定义一个触发器,当'batchNumber'发生变化时应该触发。然后我可以分析ProcessWindowFunction中的聚合数据。

我的问题是:

当prodnr改变时,触发器并不总是触发
即使它发射,也只有一个元素被聚合。我期待接近200。
这是我正在使用的代码。

public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {

private static final long serialVersionUID = 1L;

public batchnrTrigger() {}

private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);

@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);

    if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {

        System.out.println("batchnr BEFORE: " + batchnrState.value() + "   NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
        batchnrState.update(element.batchnr);
        return TriggerResult.FIRE;

    }

    System.out.println("batchnr BEFORE: " + batchnrState.value() + "   NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
    batchnrState.update(element.batchnr);
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

}

}
这就是我称之为触发器的方式:

DataStream imaginePaperDataStream = nifiStreamSource

    .map(new ImaginePaperDataConverter())
    .keyBy((ImaginePaperData event) -> event.lunum)
    .window(GlobalWindows.create())
    .trigger(new LunumTrigger())
    .process(new ImaginePaperWindowReportFunction());

展开
收起
flink小助手 2018-12-14 14:04:29 4677 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    您确定要通过event.lunum键入流吗?如果你期望每个不同的lunum值大约有200个事件,这是有道理的。但是如果每个lunum值只有一个事件,那就可以解释你所看到的行为。

    此外,您确定您的活动正在按顺序处理吗?如果批处理在并行处理之间的竞争条件下在处理管道中的某处交错,那么这也可能有助于解释您所看到的内容。

    此外,您应该在触发器的clear方法中清除状态。并且您需要实现一个Evictor,以便在处理后从窗口中删除元素。

    窗口API的这一部分非常复杂。我认为这个特定的应用程序将更直接地实现为RichFlatMap,它收集ListState中的项目,直到批号更改(您将保留在ValueState中)。

    2019-07-17 23:20:55
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像