全网最详细4W字Flink入门笔记(下) 3

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 全网最详细4W字Flink入门笔记(下)

全量聚合函数

全量聚合函数(Full Window Functions)是指在整个窗口中的所有数据都准备好后才进行计算。Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction

与增量聚合函数不同,全窗口函数可以访问窗口中的所有数据,因此可以执行更复杂的计算。例如,可以计算窗口中数据的中位数,或者对窗口中的数据进行排序。

WindowFunction接收一个Iterable类型的输入,其中包含了窗口中所有的数据。ProcessWindowFunction则更加强大,它不仅可以访问窗口中的所有数据, 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。WindowFunction作用可以被 ProcessWindowFunction 全覆盖。一般在实际应用,用 ProcessWindowFunction比较多,直接使用 ProcessWindowFunction 就可以了

下面是使用WindowFunction计算窗口内数据总和的代码示例:

public class SumWindowFunction extends WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
    @Override
    public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
        int sum = 0;
        for (Tuple2<String, Integer> value : input) {
            sum += value.f1;
        }
        out.collect(new Tuple2<>(key, sum));
    }
}
DataStream<Tuple2<String, Integer>> input = ...
input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return value.f0;
        }
    })
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .apply(new SumWindowFunction());

下面是一个使用ProcessWindowFunction统计网站1天UV的代码示例。在这个例子中,我们使用了状态来存储每个窗口中访问过网站的用户ID,以便在窗口结束时计算UV。此外,我们还使用了定时器,在窗口结束时触发计算UV的操作。我们还使用了context对象来获取窗口的开始时间和结束时间,并将它们输出到结果中:

public class UVProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, Tuple3<String, Long, Integer>, String, TimeWindow> {
    private ValueState<Set<String>> userIdState; // 状态,用来存储每个窗口中访问过网站的用户ID
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 初始化状态
        ValueStateDescriptor<Set<String>> stateDescriptor = new ValueStateDescriptor<>("userIdState", new SetTypeInfo<>(Types.STRING));
        userIdState = getRuntimeContext().getState(stateDescriptor);
    }
    @Override
    public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
        Set<String> userIds = userIdState.value();
        if (userIds == null) {
            userIds = new HashSet<>();
        }
        for (Tuple2<String, String> value : input) {
            userIds.add(value.f0); // 将用户ID添加到状态中
        }
        userIdState.update(userIds);
        context.timerService().registerEventTimeTimer(context.window().getEnd()); // 注册定时器,在窗口结束时触发计算UV的操作
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        Set<String> userIds = userIdState.value();
        if (userIds != null) {
            long windowStart = ctx.window().getStart();
            out.collect(new Tuple3<>(ctx.getCurrentKey(), windowStart, userIds.size())); // 计算UV并输出结果,包括窗口的开始时间和结束时间
            userIdState.clear(); // 清空状态
        }
    }
}
DataStream<Tuple2<String, String>> input = ... // 输入数据流,其中第一个字段为用户ID,第二个字段为网站URL
input.keyBy(new KeySelector<Tuple2<String, String>, String>() {
        @Override
        public String getKey(Tuple2<String, String> value) throws Exception {
            return value.f1; // 按照网站URL分组
        }
    })
    .window(TumblingEventTimeWindows.of(Time.days(1))) // 设置窗口大小为1天
    .process(new UVProcessWindowFunction());

增量聚合函数和全量聚合函数结合使用

全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。所以运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。

增量聚合的优点:高效,输出更加实时。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。

全窗口的优点:提供更多的信息,可以认为是更加“通用”的窗口操作。 它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者ProcessWindowFunction。

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了

下面我们举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接,想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class UrlCountViewExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序流的watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        stream.print("input");
        //统计每个url的访问量
        stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
                .print();
        env.execute();
    }
    //增量聚合,来一条数据 + 1
    public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
    //包装窗口信息,输出UrlViewCount
    public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{
        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long count = elements.iterator().next();
            out.collect(new UrlViewCount(s,count,start,end));
        }
    }
}

为了方便处理,单独定义了一个POJO类,来表示输出结果的数据类型

public class UrlViewCount {
    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;
    public UrlViewCount() {
    }
    public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
        this.url = url;
        this.count = count;
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
    }
    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", windowStart=" + new Timestamp(windowStart) +
                ", windowEnd=" + new Timestamp(windowEnd) +
                '}';
    }
}

代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一,得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。

窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。

Window重叠优化

窗口重叠是指在使用滑动窗口时,多个窗口之间存在重叠部分。这意味着同一批数据可能会被多个窗口同时处理。

例如,假设我们有一个数据流,它包含了0到9的整数。我们定义了一个大小为5的滑动窗口,滑动距离为2。那么,我们将会得到以下三个窗口:

  • 窗口1:包含0, 1, 2, 3, 4
  • 窗口2:包含2, 3, 4, 5, 6
  • 窗口3:包含4, 5, 6, 7, 8

在这个例子中,窗口1和窗口2之间存在重叠部分,即2, 3, 4。同样,窗口2和窗口3之间也存在重叠部分,即4, 5, 6。

enableOptimizeWindowOverlap方法是用来启用Flink的窗口重叠优化功能的。它可以减少计算重叠窗口时的计算量。

在我之前给出的代码示例中,我没有使用enableOptimizeWindowOverlap方法来启用窗口重叠优化功能。这意味着Flink不会尝试优化计算重叠窗口时的计算量。

如果你想使用窗口重叠优化功能,你可以在你的代码中添加以下行:

env.getConfig().enableOptimizeWindowOverlap();

这将启用窗口重叠优化功能,Flink将尝试优化计算重叠窗口时的计算量。

触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
    .window(...)
    .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,这块了解一下即可。

移除器(Evictor)

在 Apache Flink 中,移除器(Evictor)是用于在滚动窗口或会话窗口中控制数据保留和清理的组件。它可以根据特定的策略从窗口中删除一些数据,以确保窗口中保留的数据量不超过指定的限制。移除器通常与窗口分配器一起使用,窗口分配器负责确定数据属于哪个窗口,而移除器则负责清理窗口中的数据。

以下是一个使用 Flink 移除器的代码示例,演示如何在滚动窗口中使用基于计数的移除器。

javaCopy codeimport org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class FlinkEvictorExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个包含整数和时间戳的流
        DataStream<Tuple2<Integer, Long>> dataStream = env.fromElements(
                Tuple2.of(1, System.currentTimeMillis()),
                Tuple2.of(2, System.currentTimeMillis() + 1000),
                Tuple2.of(3, System.currentTimeMillis() + 2000),
                Tuple2.of(4, System.currentTimeMillis() + 3000),
                Tuple2.of(5, System.currentTimeMillis() + 4000),
                Tuple2.of(6, System.currentTimeMillis() + 5000)
        );
        // 在滚动窗口中使用基于计数的移除器,保留最近3个元素
        dataStream
                .keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .trigger(CountTrigger.of(3))
                .evictor(CountEvictor.of(3))
                .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
                .print();
        env.execute("Flink Evictor Example");
    }
    // 自定义聚合函数
    private static class MyAggregateFunction implements AggregateFunction<Tuple2<Integer, Long>, Integer, Integer> {
        @Override
        public Integer createAccumulator() {
            return 0;
        }
        @Override
        public Integer add(Tuple2<Integer, Long> value, Integer accumulator) {
            return accumulator + 1;
        }
        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }
        @Override
        public Integer merge(Integer a, Integer b) {
            return a + b;
        }
    }
    // 自定义处理窗口函数
    private static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, String, Integer, TimeWindow> {
        private transient ListState<Integer> countState;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("countState", Integer.class);
            countState = getRuntimeContext().getListState(descriptor);
        }
        @Override
        public void process(Integer key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
            int count = elements.iterator().next();
            countState.add(count);
            long windowStart = context.window().getStart();
            long windowEnd = context.window().getEnd();
            String result = "Window: " + windowStart + " to " + windowEnd + ", Count: " + countState.get().iterator().next();
            out.collect(result);
        }
    }
}

在上述示例中,创建了一个包含整数和时间戳的数据流,并使用基于计数的移除器将滚动窗口的大小限制为最近的3个元素。在聚合函数中,我们简单地将元素的数量累加起来,并在处理窗口函数中收集结果。最后,我们打印窗口的开始时间、结束时间和元素数量。

Flink Time时间语义

Flink定义了三类时间

  • 事件时间(Event Time)数据在数据源产生的时间,一般由事件中的时间戳描述,比如用户日志中的TimeStamp。
  • 处理时间(Process Time)数据进入Flink被处理的系统时间(Operator处理数据的系统时间)。
  • 摄取时间(Ingestion Time)数据进入Flink的时间,记录被Source节点观察到的系统时间。

Flink流式计算的时候需要显示定义时间语义,根据不同的时间语义来处理数据,比如指定的时间语义是事件时间,那么我们就要切换到事件时间的世界观中,窗口的起始与终止时间都是以事件时间为依据

在Flink中默认使用的是Process Time,如果要使用其他的时间语义,在执行环境中可以设置

//设置时间语义为Ingestion Time
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
//设置时间语义为Event Time 我们还需要指定一下数据中哪个字段是事件时间(下文会讲)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  • 基于事件时间的Window操作
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object EventTimeWindow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("node01", 8888).assignAscendingTimestamps(data => {
      val splits = data.split(" ")
      splits(0).toLong
    })
    stream
      .flatMap(x=>x.split(" ").tail)
      .map((_, 1))
      .keyBy(_._1)
//      .timeWindow(Time.seconds(10))
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .reduce((v1: (String, Int), v2: (String, Int)) => {
        (v1._1, v1._2 + v2._2)
      })
      .print()
    env.execute()
  }
}

Watermark(水印)

Watermark本质就是时间戳,说白了Watermark就是来处理迟到数据的

在使用Flink处理数据的时候,数据通常都是按照事件产生的时间(事件时间)的顺序进入到Flink,但是在遇到特殊情况下,比如遇到网络延迟或者使用Kafka(多分区) 很难保证数据都是按照事件时间的顺序进入Flink,很有可能是乱序进入。

如果使用的是事件时间这个语义,数据一旦是乱序进入,那么在使用Window处理数据的时候,就会出现延迟数据不会被计算的问题

  • 举例: Window窗口长度10s,滚动窗口
    001 zs 2020-04-25 10:00:01
    001 zs 2020-04-25 10:00:02
    001 zs 2020-04-25 10:00:03
    001 zs 2020-04-25 10:00:11  窗口触发执行
    001 zs 2020-04-25 10:00:05  延迟数据,不会被上一个窗口所计算导致计算结果不正确

Watermark+Window可以很好的解决延迟数据的问题。

Flink窗口计算的过程中,如果数据全部到达就会到窗口中的数据做处理,如果过有延迟数据,那么窗口需要等待全部的数据到来之后,再触发窗口执行,需要等待多久?不可能无限期等待,我们用户可以自己来设置延迟时间,这样就可以尽可能保证延迟数据被处理。

根据用户指定的延迟时间生成水印(Watermak = 最大事件时间-指定延迟时间),当Watermak 大于等于窗口的停止时间,这个窗口就会被触发执行。

  • 举例:Window窗口长度10s(01~10),滚动窗口,指定延迟时间3s
    001 ls 2020-04-25 10:00:01 wm:2020-04-25 09:59:58
    001 ls 2020-04-25 10:00:02 wm:2020-04-25 09:59:59
    001 ls 2020-04-25 10:00:03 wm:2020-04-25 10:00:00
    001 ls 2020-04-25 10:00:09 wm:2020-04-25 10:00:06
    001 ls 2020-04-25 10:00:12 wm:2020-04-25 10:00:09
    001 ls 2020-04-25 10:00:08 wm:2020-04-25 10:00:05    延迟数据
    001 ls 2020-04-25 10:00:13 wm:2020-04-25 10:00:10

如果没有Watermark在倒数第三条数据来的时候,就会触发执行,那么倒数第二条的延迟数据就不会被计算,那么有了水印可以处理延迟3s内的数据

注意:如果数据不会乱序进入Flink,没必要使用Watermark

DataStream API提供了自定义水印生成器和内置水印生成器。

生成水印策略

  • 周期性水印(Periodic Watermark)根据事件或者处理时间周期性的触发水印生成器(Assigner),默认100ms,每隔100毫秒自动向流里注入一个Watermark
    周期性水印API 1:
val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(100)
    val stream = env.socketTextStream("node01", 8888).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)) {
      override def extractTimestamp(element: String): Long = {
        element.split(" ")(0).toLong
      }
    })
  • 周期性水印API 2:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeDelayWindow {
  class MyTimestampAndWatermarks(delayTime:Long) extends AssignerWithPeriodicWatermarks[String] {
    var maxCurrentWatermark: Long = _
    //水印=最大事件时间-延迟时间   后被调用    水印是递增,小于上一个水印不会被发射出去
    override def getCurrentWatermark: Watermark = {
      //产生水印
      new Watermark(maxCurrentWatermark - delayTime)
    }
    //获取当前的时间戳  先被调用
    override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
      val currentTimeStamp = element.split(" ")(0).toLong
      maxCurrentWatermark = math.max(currentTimeStamp,maxCurrentWatermark)
      currentTimeStamp
    }
  }
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(100)
    val stream = env.socketTextStream("node01", 8888).assignTimestampsAndWatermarks(new MyTimestampAndWatermarks(3000L))
    stream
      .flatMap(x => x.split(" ").tail)
      .map((_, 1))
      .keyBy(_._1)
      //      .timeWindow(Time.seconds(10))
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
          val start = context.window.getStart
          val end = context.window.getEnd
          var count = 0
          for (elem <- elements) {
            count += elem._2
          }
          println("start:" + start + " end:" + end + " word:" + key + " count:" + count)
        }
      })
      .print()
    env.execute()
  }
}
  • 间歇性水印生成器
    间歇性水印(Punctuated Watermark)在观察到事件后,会依据用户指定的条件来决定是否发射水印。
    比如,在车流量的数据中,001卡口通信经常异常,传回到服务器的数据会有延迟问题,其他的卡口都是正常的,那么这个卡口的数据需要打上水印。
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
object PunctuatedWatermarkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //卡口号、时间戳
    env.socketTextStream("node01", 8888)
      .map(data => {
        val splits = data.split(" ")
        (splits(0), splits(1).toLong)
      })
      .assignTimestampsAndWatermarks(new myWatermark(3000))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .reduce((v1: (String, Long), v2: (String, Long)) => {
        (v1._1 + "," + v2._1, v1._2 + v2._2)
      }).print()
    env.execute()
  }
  class myWatermark(delay: Long) extends AssignerWithPunctuatedWatermarks[(String, Long)] {
    var maxTimeStamp:Long = _
    override def checkAndGetNextWatermark(elem: (String, Long), extractedTimestamp: Long): Watermark = {
      maxTimeStamp = extractedTimestamp.max(maxTimeStamp)
      if ("001".equals(elem._1)) {
        new Watermark(maxTimeStamp - delay)
      } else {
        new Watermark(maxTimeStamp)
      }
    }
    override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
      element._2
    }
  }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
276 3
|
Java Linux API
flink入门-流处理
flink入门-流处理
167 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
183 0
|
8月前
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
210 3
|
8月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
8月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
8月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1439 3
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
126 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
223 0