代码仓库
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
上节进度
上节完成了 滚动窗口 时间驱动 TumblingWindow TimeWindowFunction。
本节和上节内容相似,但是上节是:TimeWindow时间驱动,本节是:GlobalWindow 事件驱动。
上节是使用 Socket 的方式推送数据,为了测试方便,修改为 kafka 进行测试。
Kafka In Docker
修改代码
之前的内容
DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999);
修改后内容
// ==== 定义变量 private static final String KAFKA_SERVER = "0.0.0.0"; private static final Integer KAFKA_PORT = 9092; private static final String KAFKA_TOPIC = "test"; // ==== // ==== Kafka DataStreamSource Properties properties = new Properties(); properties.setProperty("bootstrap.servers", String.format("%s:%d", KAFKA_SERVER, KAFKA_PORT)); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, new SimpleStringSchema(), properties); DataStreamSource<String> dataStreamSource = env.addSource(consumer); // ====
TimeWindow 时间驱动
滚动窗口
Flink 中的滚动窗口(Tumbling Window)是一种常见的窗口机制,用于对数据流进行分割和处理。在滚动窗口中,时间驱动是窗口触发和关闭的关键机制。
什么是滚动窗口?
滚动窗口将数据流按照固定的时间间隔进行分割,每个时间间隔形成一个独立的窗口。滚动窗口的特点是窗口之间不重叠,每个元素只属于一个窗口。
事件驱动
核心代码
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream.countWindow(3); countWindow.apply(new MyCountWindowFunction()).print();
StartApp
package icu.wzk.demo06; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.api.java.tuple.Tuple2; import java.text.SimpleDateFormat; import java.util.Random; /** * 滚动时间窗口 Tumbling Window * 时间对齐,窗口长度固定,没有重叠 * @author wzk * @date 10:48 2024/6/22 **/ public class TumblingWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { //设置执行环境,类似spark中初始化sparkContext StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // ================ 事件驱动 ============================ // 每相隔3个事件(即三个相同key的数据), 划分一个窗口进行计算 WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream.countWindow(3); countWindow.apply(new MyCountWindowFunction()).print(); env.execute(); } }
MyCountWindowFunction
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于事件驱动 GlobalWindow * @author wzk * @date 10:27 2024/6/22 **/ public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, GlobalWindow> { @Override public void apply(String s, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input){ sum += tuple2.f1; } // 无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。 long maxTimestamp = window.maxTimestamp(); out.collect("key:" + s + " value: " + sum + "| maxTimeStamp :" + maxTimestamp + "," + format.format(maxTimestamp) ); } }