Flink教程(12)- Flink高级API(Time与Watermaker)

简介: Flink教程(12)- Flink高级API(Time与Watermaker)

01 引言

在前面的博客,我们已经对Flink批流一体API的使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的教程,我们已经学习了Flink的四大基石里面的Window了,如下图,本文讲解下Time

02 Time

Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

可以看到Time分为如下几类:

  • 事件时间(EventTime): 事件真真正正发生产生的时间
  • 摄入时间(IngestionTime): 事件到达Flink的时间
  • 处理时间(ProcessingTime): 事件真正被处理/计算的时间

毋庸置疑,EventTime事件时间是最为重要的,因为只要事件时间一产生就不会变化,事件时间更能反映事件的本质!

为何事件时间这么重要?

举个例子:比如在地下车库点外卖,下单的时候是11:59分,但是由于地下车库没信号,程序一直在重试提交,当走出地下车库时,已经12:05分了,这个时候,如果要统计12之前的订单金额,那么这笔交易是否应被统计?当然应该统计,因为该数据的真真正正的产生时间为11点59分,这就是时间时间了。

事件时间反映了事件的时间,因为数据可能因为网络延迟等原因,所以需要一种机制来解决一定程度上的数据乱序或延迟到底的问题!也就是我们接下来要学习的Watermaker水印机制/水位线机制。

03 Watermaker水印机制/水位线机制

3.1 Watermaker定义

Watermaker:就是给数据再额外的加的一个时间列,也就是Watermaker是个时间戳!

Watermaker计算公式(这样可以保证Watermaker水位线会一直上升(变大),不会下降):

Watermaker = 数据的事件时间(当前窗口的最大的事件时间) - 最大允许的延迟时间或乱序时间

3.2 Watermaker的作用

之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,

一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!

那么现在有了Watermaker,窗口就可以按照Watermaker来触发计算! 也就是说Watermaker是用来触发窗口计算的!

3.3 Watermaker如何触发窗口计算

窗口计算的触发条件为:

  • 窗口中有数据
  • Watermaker >= 窗口的结束时间

因为前面说到,Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间,也就是说只要不断有数据来,就可以保证Watermaker水位线是会一直上升/变大的,不会下降/减小的所以最终一定是会触发窗口计算的。

3.4 图解Watermaker

触发公式:

  • Watermaker >= 窗口的结束时间
  • Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
  • 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间
  • 当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间

如上图所示的窗口时间为:[10:00:00~10:10:00]CBDA数据依次到达窗口。

情形一:假如没有Watermaker机制:B数据迟到了(最少迟到了2分钟),那么B数据就丢失了。

情形二:有了Watermaker机制,并设置最大允许的延迟时间或乱序时间为5分钟,那么:

  • C数据到达时,Watermaker=max(10:11:00)-5=10:06:00 < 窗口结束时间10:10:00 – 不触发条件
  • B数据到达时,Watermaker=max(10:11:00,10:09:00) -5=10:06:00 < 窗口结束时间10:10:00 – 不触发条件
  • D数据到达时,Watermaker=max(10:11:00,10:09:00,10:15:00) -5=10:10:00=窗口结束时间10:10:00 – 满足触发条件,这时候窗口才触发计算,B数据不会丢失

注意:Watermaker机制可以在一定程度上解决数据乱序后延迟到达问题,但是更严重的还是无法解决如果A数据到达窗口已经计算完毕,所以A数据还是会丢失。如果要让A数据不丢失,可以将最大允许的延迟时间或乱序时间再设置大一点,或使用后续学习的Allowed Lateness 侧道输出机制

04 案例演示

4.1 Watermaker案例演示

需求:有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额),要求每隔5s,计算5秒内,每个用户的订单总金额,并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

核心APIDataStream.assignTimestampsAndWatermarks(...)

定期生成 根据特殊记录生成
显示时间驱动 数据驱动
每隔一段时间调用生成方法 每一次分配TimeStamp都会调用生成方法
实现AssignerWithPeriodicWathermarks 实现AssignerWithPunctuatedWatermarks

注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor

实现方式1https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html

/**
 * @author : YangLinWei
 * @createTime: 2022/3/7 11:07 下午
 * <p>
 * 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
 * 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
 * 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
 */
public class WatermakerDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        //3.Transformation
        //-告诉Flink要基于事件时间来计算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
        //-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
        /*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间
                    @Override
                    public long extractTimestamp(Order element) {
                        return element.eventTime;
                        //指定事件时间是哪一列,Flink底层会自动计算:
                        //Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                    }
        });*/
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );
        //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
        DataStream<Order> result = watermakerDS
                .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

运行结果:

实现方式2

/**
 * Check
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 11:15 下午
 */
public class WatermakerDemo02 {
    public static void main(String[] args) throws Exception {
        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    System.out.println("发送的数据为: " + userId + " : " + df.format(eventTime));
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        //3.Transformation
        /*DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );*/
        //开发中直接使用上面的即可
        //学习测试时可以自己实现
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        new WatermarkStrategy<Order>() {
                            @Override
                            public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                                return new WatermarkGenerator<Order>() {
                                    private int userId = 0;
                                    private long eventTime = 0L;
                                    private final long outOfOrdernessMillis = 3000;
                                    private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
                                    @Override
                                    public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                                        userId = event.userId;
                                        eventTime = event.eventTime;
                                        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                                    }
                                    @Override
                                    public void onPeriodicEmit(WatermarkOutput output) {
                                        //Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间
                                        Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
                                        System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));
                                        output.emitWatermark(watermark);
                                    }
                                };
                            }
                        }.withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );
        //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
       /* DataStream<Order> result = watermakerDS
                 .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");*/
        //开发中使用上面的代码进行业务计算即可
        //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间
        DataStream<String> result = watermakerDS
                .keyBy(Order::getUserId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //把apply中的函数应用在窗口中的数据上
                //WindowFunction<IN, OUT, KEY, W extends Window>
                .apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
                        //准备一个集合用来存放属于该窗口的数据的事件时间
                        List<String> eventTimeList = new ArrayList<>();
                        for (Order order : input) {
                            Long eventTime = order.eventTime;
                            eventTimeList.add(df.format(eventTime));
                        }
                        String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",
                                key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);
                        out.collect(outStr);
                    }
                });
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

运行结果:

4.2 Watermaker案例演示

需求:有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。并使用OutputTag+allowedLateness解决数据丢失问题。

API:

示例代码:

/**
 * allowedLateness
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 11:18 下午
 */
public class WatermakerDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    //TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        //3.Transformation
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );
        //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
        OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));
        SingleOutputStreamOperator<Order> result = watermakerDS
                .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .sideOutputLateData(outputTag)
                .sum("money");
        DataStream<Order> result2 = result.getSideOutput(outputTag);
        //4.Sink
        result.print("正常的数据和迟到不严重的数据");
        result2.print("迟到严重的数据");
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

运行结果:

05 文末

本文主要讲解了TimeWatermaker的原理与用法,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
233 0
|
10月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
745 12
Flink CDC YAML:面向数据集成的 API 设计
|
9月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
397 5
|
11月前
|
传感器 监控 数据挖掘
Flink 四大基石之 Time (时间语义) 的使用详解
Flink 中的时间分为三类:Event Time(事件发生时间)、Ingestion Time(数据进入系统时间)和 Processing Time(数据处理时间)。Event Time 通过嵌入事件中的时间戳准确反映数据顺序,支持复杂窗口操作。Watermark 机制用于处理 Event Time,确保数据完整性并触发窗口计算。Flink 还提供了多种迟到数据处理方式,如默认丢弃、侧输出流和允许延迟处理,以应对不同场景需求。掌握这些时间语义对编写高效、准确的 Flink 应用至关重要。
673 21
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
150 0
|
数据安全/隐私保护 流计算
Flink四大基石——2.Time
Flink四大基石——2.Time
157 1
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
1122 2
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
2月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
2月前
|
JSON 算法 API
Python采集淘宝商品评论API接口及JSON数据返回全程指南
Python采集淘宝商品评论API接口及JSON数据返回全程指南