全量聚合函数
全量聚合函数(Full Window Functions)是指在整个窗口中的所有数据都准备好后才进行计算。Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。
与增量聚合函数不同,全窗口函数可以访问窗口中的所有数据,因此可以执行更复杂的计算。例如,可以计算窗口中数据的中位数,或者对窗口中的数据进行排序。
WindowFunction接收一个Iterable类型的输入,其中包含了窗口中所有的数据。ProcessWindowFunction则更加强大,它不仅可以访问窗口中的所有数据, 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。WindowFunction作用可以被 ProcessWindowFunction 全覆盖。一般在实际应用,用 ProcessWindowFunction比较多,直接使用 ProcessWindowFunction 就可以了。
下面是使用WindowFunction计算窗口内数据总和的代码示例:
public class SumWindowFunction extends WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { @Override public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { int sum = 0; for (Tuple2<String, Integer> value : input) { sum += value.f1; } out.collect(new Tuple2<>(key, sum)); } } DataStream<Tuple2<String, Integer>> input = ... input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new SumWindowFunction());
下面是一个使用ProcessWindowFunction统计网站1天UV的代码示例。在这个例子中,我们使用了状态来存储每个窗口中访问过网站的用户ID,以便在窗口结束时计算UV。此外,我们还使用了定时器,在窗口结束时触发计算UV的操作。我们还使用了context对象来获取窗口的开始时间和结束时间,并将它们输出到结果中:
public class UVProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, Tuple3<String, Long, Integer>, String, TimeWindow> { private ValueState<Set<String>> userIdState; // 状态,用来存储每个窗口中访问过网站的用户ID @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化状态 ValueStateDescriptor<Set<String>> stateDescriptor = new ValueStateDescriptor<>("userIdState", new SetTypeInfo<>(Types.STRING)); userIdState = getRuntimeContext().getState(stateDescriptor); } @Override public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception { Set<String> userIds = userIdState.value(); if (userIds == null) { userIds = new HashSet<>(); } for (Tuple2<String, String> value : input) { userIds.add(value.f0); // 将用户ID添加到状态中 } userIdState.update(userIds); context.timerService().registerEventTimeTimer(context.window().getEnd()); // 注册定时器,在窗口结束时触发计算UV的操作 } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Integer>> out) throws Exception { super.onTimer(timestamp, ctx, out); Set<String> userIds = userIdState.value(); if (userIds != null) { long windowStart = ctx.window().getStart(); out.collect(new Tuple3<>(ctx.getCurrentKey(), windowStart, userIds.size())); // 计算UV并输出结果,包括窗口的开始时间和结束时间 userIdState.clear(); // 清空状态 } } } DataStream<Tuple2<String, String>> input = ... // 输入数据流,其中第一个字段为用户ID,第二个字段为网站URL input.keyBy(new KeySelector<Tuple2<String, String>, String>() { @Override public String getKey(Tuple2<String, String> value) throws Exception { return value.f1; // 按照网站URL分组 } }) .window(TumblingEventTimeWindows.of(Time.days(1))) // 设置窗口大小为1天 .process(new UVProcessWindowFunction());
增量聚合函数和全量聚合函数结合使用
全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。所以运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。
增量聚合的优点:高效,输出更加实时。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。
全窗口的优点:提供更多的信息,可以认为是更加“通用”的窗口操作。 它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者ProcessWindowFunction。
// ReduceFunction 与 WindowFunction 结合 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) // ReduceFunction 与 ProcessWindowFunction 结合 public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) // AggregateFunction 与 WindowFunction 结合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction) // AggregateFunction 与 ProcessWindowFunction 结合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
下面我们举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接,想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class UrlCountViewExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getConfig().setAutoWatermarkInterval(100); SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()) //乱序流的watermark生成 .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } })); stream.print("input"); //统计每个url的访问量 stream.keyBy(data -> data.url) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .aggregate(new UrlViewCountAgg(),new UrlViewCountResult()) .print(); env.execute(); } //增量聚合,来一条数据 + 1 public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{ @Override public Long createAccumulator() { return 0L; } @Override public Long add(Event value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null; } } //包装窗口信息,输出UrlViewCount public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{ @Override public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception { Long start = context.window().getStart(); Long end = context.window().getEnd(); Long count = elements.iterator().next(); out.collect(new UrlViewCount(s,count,start,end)); } } }
为了方便处理,单独定义了一个POJO类,来表示输出结果的数据类型
public class UrlViewCount { public String url; public Long count; public Long windowStart; public Long windowEnd; public UrlViewCount() { } public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) { this.url = url; this.count = count; this.windowStart = windowStart; this.windowEnd = windowEnd; } @Override public String toString() { return "UrlViewCount{" + "url='" + url + '\'' + ", count=" + count + ", windowStart=" + new Timestamp(windowStart) + ", windowEnd=" + new Timestamp(windowEnd) + '}'; } }
代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一,得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。
Window重叠优化
窗口重叠是指在使用滑动窗口时,多个窗口之间存在重叠部分。这意味着同一批数据可能会被多个窗口同时处理。
例如,假设我们有一个数据流,它包含了0到9的整数。我们定义了一个大小为5的滑动窗口,滑动距离为2。那么,我们将会得到以下三个窗口:
- 窗口1:包含0, 1, 2, 3, 4
- 窗口2:包含2, 3, 4, 5, 6
- 窗口3:包含4, 5, 6, 7, 8
在这个例子中,窗口1和窗口2之间存在重叠部分,即2, 3, 4。同样,窗口2和窗口3之间也存在重叠部分,即4, 5, 6。
enableOptimizeWindowOverlap
方法是用来启用Flink的窗口重叠优化功能的。它可以减少计算重叠窗口时的计算量。
在我之前给出的代码示例中,我没有使用enableOptimizeWindowOverlap
方法来启用窗口重叠优化功能。这意味着Flink不会尝试优化计算重叠窗口时的计算量。
如果你想使用窗口重叠优化功能,你可以在你的代码中添加以下行:
env.getConfig().enableOptimizeWindowOverlap();
这将启用窗口重叠优化功能,Flink将尝试优化计算重叠窗口时的计算量。
触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...) .window(...) .trigger(new MyTrigger())
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,这块了解一下即可。
移除器(Evictor)
在 Apache Flink 中,移除器(Evictor)是用于在滚动窗口或会话窗口中控制数据保留和清理的组件。它可以根据特定的策略从窗口中删除一些数据,以确保窗口中保留的数据量不超过指定的限制。移除器通常与窗口分配器一起使用,窗口分配器负责确定数据属于哪个窗口,而移除器则负责清理窗口中的数据。
以下是一个使用 Flink 移除器的代码示例,演示如何在滚动窗口中使用基于计数的移除器。
javaCopy codeimport org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class FlinkEvictorExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建一个包含整数和时间戳的流 DataStream<Tuple2<Integer, Long>> dataStream = env.fromElements( Tuple2.of(1, System.currentTimeMillis()), Tuple2.of(2, System.currentTimeMillis() + 1000), Tuple2.of(3, System.currentTimeMillis() + 2000), Tuple2.of(4, System.currentTimeMillis() + 3000), Tuple2.of(5, System.currentTimeMillis() + 4000), Tuple2.of(6, System.currentTimeMillis() + 5000) ); // 在滚动窗口中使用基于计数的移除器,保留最近3个元素 dataStream .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .trigger(CountTrigger.of(3)) .evictor(CountEvictor.of(3)) .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction()) .print(); env.execute("Flink Evictor Example"); } // 自定义聚合函数 private static class MyAggregateFunction implements AggregateFunction<Tuple2<Integer, Long>, Integer, Integer> { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(Tuple2<Integer, Long> value, Integer accumulator) { return accumulator + 1; } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return a + b; } } // 自定义处理窗口函数 private static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, String, Integer, TimeWindow> { private transient ListState<Integer> countState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("countState", Integer.class); countState = getRuntimeContext().getListState(descriptor); } @Override public void process(Integer key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception { int count = elements.iterator().next(); countState.add(count); long windowStart = context.window().getStart(); long windowEnd = context.window().getEnd(); String result = "Window: " + windowStart + " to " + windowEnd + ", Count: " + countState.get().iterator().next(); out.collect(result); } } }
在上述示例中,创建了一个包含整数和时间戳的数据流,并使用基于计数的移除器将滚动窗口的大小限制为最近的3个元素。在聚合函数中,我们简单地将元素的数量累加起来,并在处理窗口函数中收集结果。最后,我们打印窗口的开始时间、结束时间和元素数量。
Flink Time时间语义
Flink定义了三类时间
- 事件时间(Event Time)数据在数据源产生的时间,一般由事件中的时间戳描述,比如用户日志中的TimeStamp。
- 处理时间(Process Time)数据进入Flink被处理的系统时间(Operator处理数据的系统时间)。
- 摄取时间(Ingestion Time)数据进入Flink的时间,记录被Source节点观察到的系统时间。
Flink流式计算的时候需要显示定义时间语义,根据不同的时间语义来处理数据,比如指定的时间语义是事件时间,那么我们就要切换到事件时间的世界观中,窗口的起始与终止时间都是以事件时间为依据
在Flink中默认使用的是Process Time,如果要使用其他的时间语义,在执行环境中可以设置
//设置时间语义为Ingestion Time env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) //设置时间语义为Event Time 我们还需要指定一下数据中哪个字段是事件时间(下文会讲) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 基于事件时间的Window操作
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object EventTimeWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("node01", 8888).assignAscendingTimestamps(data => { val splits = data.split(" ") splits(0).toLong }) stream .flatMap(x=>x.split(" ").tail) .map((_, 1)) .keyBy(_._1) // .timeWindow(Time.seconds(10)) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce((v1: (String, Int), v2: (String, Int)) => { (v1._1, v1._2 + v2._2) }) .print() env.execute() } }
Watermark(水印)
Watermark本质就是时间戳,说白了Watermark就是来处理迟到数据的。
在使用Flink处理数据的时候,数据通常都是按照事件产生的时间(事件时间)的顺序进入到Flink,但是在遇到特殊情况下,比如遇到网络延迟或者使用Kafka(多分区) 很难保证数据都是按照事件时间的顺序进入Flink,很有可能是乱序进入。
如果使用的是事件时间这个语义,数据一旦是乱序进入,那么在使用Window处理数据的时候,就会出现延迟数据不会被计算的问题
- 举例: Window窗口长度10s,滚动窗口
001 zs 2020-04-25 10:00:01
001 zs 2020-04-25 10:00:02
001 zs 2020-04-25 10:00:03
001 zs 2020-04-25 10:00:11 窗口触发执行
001 zs 2020-04-25 10:00:05 延迟数据,不会被上一个窗口所计算导致计算结果不正确
Watermark+Window可以很好的解决延迟数据的问题。
Flink窗口计算的过程中,如果数据全部到达就会到窗口中的数据做处理,如果过有延迟数据,那么窗口需要等待全部的数据到来之后,再触发窗口执行,需要等待多久?不可能无限期等待,我们用户可以自己来设置延迟时间,这样就可以尽可能保证延迟数据被处理。
根据用户指定的延迟时间生成水印(Watermak = 最大事件时间-指定延迟时间),当Watermak 大于等于窗口的停止时间,这个窗口就会被触发执行。
- 举例:Window窗口长度10s(01~10),滚动窗口,指定延迟时间3s
001 ls 2020-04-25 10:00:01 wm:2020-04-25 09:59:58
001 ls 2020-04-25 10:00:02 wm:2020-04-25 09:59:59
001 ls 2020-04-25 10:00:03 wm:2020-04-25 10:00:00
001 ls 2020-04-25 10:00:09 wm:2020-04-25 10:00:06
001 ls 2020-04-25 10:00:12 wm:2020-04-25 10:00:09
001 ls 2020-04-25 10:00:08 wm:2020-04-25 10:00:05 延迟数据
001 ls 2020-04-25 10:00:13 wm:2020-04-25 10:00:10
如果没有Watermark在倒数第三条数据来的时候,就会触发执行,那么倒数第二条的延迟数据就不会被计算,那么有了水印可以处理延迟3s内的数据。
注意:如果数据不会乱序进入Flink,没必要使用Watermark
DataStream API提供了自定义水印生成器和内置水印生成器。
生成水印策略
- 周期性水印(Periodic Watermark)根据事件或者处理时间周期性的触发水印生成器(Assigner),默认100ms,每隔100毫秒自动向流里注入一个Watermark
周期性水印API 1:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(100) val stream = env.socketTextStream("node01", 8888).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)) { override def extractTimestamp(element: String): Long = { element.split(" ")(0).toLong } })
- 周期性水印API 2:
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object EventTimeDelayWindow { class MyTimestampAndWatermarks(delayTime:Long) extends AssignerWithPeriodicWatermarks[String] { var maxCurrentWatermark: Long = _ //水印=最大事件时间-延迟时间 后被调用 水印是递增,小于上一个水印不会被发射出去 override def getCurrentWatermark: Watermark = { //产生水印 new Watermark(maxCurrentWatermark - delayTime) } //获取当前的时间戳 先被调用 override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { val currentTimeStamp = element.split(" ")(0).toLong maxCurrentWatermark = math.max(currentTimeStamp,maxCurrentWatermark) currentTimeStamp } } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(100) val stream = env.socketTextStream("node01", 8888).assignTimestampsAndWatermarks(new MyTimestampAndWatermarks(3000L)) stream .flatMap(x => x.split(" ").tail) .map((_, 1)) .keyBy(_._1) // .timeWindow(Time.seconds(10)) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { val start = context.window.getStart val end = context.window.getEnd var count = 0 for (elem <- elements) { count += elem._2 } println("start:" + start + " end:" + end + " word:" + key + " count:" + count) } }) .print() env.execute() } }
- 间歇性水印生成器
间歇性水印(Punctuated Watermark)在观察到事件后,会依据用户指定的条件来决定是否发射水印。
比如,在车流量的数据中,001卡口通信经常异常,传回到服务器的数据会有延迟问题,其他的卡口都是正常的,那么这个卡口的数据需要打上水印。
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time object PunctuatedWatermarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //卡口号、时间戳 env.socketTextStream("node01", 8888) .map(data => { val splits = data.split(" ") (splits(0), splits(1).toLong) }) .assignTimestampsAndWatermarks(new myWatermark(3000)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .reduce((v1: (String, Long), v2: (String, Long)) => { (v1._1 + "," + v2._1, v1._2 + v2._2) }).print() env.execute() } class myWatermark(delay: Long) extends AssignerWithPunctuatedWatermarks[(String, Long)] { var maxTimeStamp:Long = _ override def checkAndGetNextWatermark(elem: (String, Long), extractedTimestamp: Long): Watermark = { maxTimeStamp = extractedTimestamp.max(maxTimeStamp) if ("001".equals(elem._1)) { new Watermark(maxTimeStamp - delay) } else { new Watermark(maxTimeStamp) } } override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = { element._2 } } }