Flink教程(11)- Flink高级API(Window)

简介: Flink教程(11)- Flink高级API(Window)

01 引言

在前面的博客,我们已经对Flink批流一体API的使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的教程,我们知道Flink的四大基石十分重要,如下图,本文先讲解下Window

02 Window

流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算,Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

2.1 为什么需要Window?

在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。

在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算

2.2 Window分类

2.2.1 按照time和count分类

  • 时间窗口(time-window) :根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据
  • 数量窗口(count-window):根据数量划分窗口,如:每xx个数据统计最近xx个数据

2.2.2 按照slide和size分类

窗口有两个重要的属性,窗口大小size和滑动间隔slide,根据它们的大小关系可分为:

  • 滚动窗口(tumbling-window): size=slide,比如: 每隔10s统计最近10s的数据
  • 滑动窗口(sliding-window):size>slide,比如:每隔5s统计最近10s的数据

注意:当size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所有开发中不用。

2.2.3 总结

按照上面窗口的分类方式进行组合,可以得出如下的窗口:

分类 使用频率
基于时间的滚动窗口:tumbling-time-window 用的较多
基于时间的滑动窗口:sliding-time-window 用的较多
基于数量的滚动窗口:tumbling-count-window 用的较少
基于数量的滑动窗口:sliding-count-window 用的较少

注意:Flink还支持一个特殊的窗口,即 Session会话窗口,需要设置一个会话超时时间,如30s:则表示30s内没有数据到来,则触发上个窗口的计算。

2.3 Window API

2.3.1 window和windowAll

何时使用:

  • 使用keyby的流,应该使用window方法
  • 未使用keyby的流,应该调用windowAll方法

2.3.2 WindowAssigner

window/windowAll 方法接收的输入是一个 WindowAssignerWindowAssigner负责将每条输入的数据分发到正确的 window中,Flink提供了很多各种场景用的WindowAssigner

如果需要自己定制数据分发策略,则可以实现一个 class,继承自WindowAssigner

2.3.3 evictor

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.EvictorevicBeforeevicAfter两个方法。

Flink 提供了如下三种通用的 evictor:

  • CountEvictor 保留指定数量的元素
  • TimeEvictor设定一个阈值 interval,删除所有不再 max_ts - interval范围内的元
    素,其中 max_ts 是窗口内时间戳的最大值。
  • DeltaEvictor通过执行用户给定的 DeltaFunction 以及预设的theshold,判断是否删除一个元素。

2.3.4 trigger

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,如果默认的trigger不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

  • onElement() :每次往 window 增加一个元素的时候都会触发
  • onEventTime() :当 event-time timer 被触发的时候会调用
  • onProcessingTime() :当 processing-time timer被触发的时候会调用
  • onMerge() :对两个 riggerstate 进行merge 操作
  • clear()window销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResultTriggerResult有如下几种可能的选

择:

  • CONTINUE 不做任何事情;
  • FIRE 触发window
  • PURGE 清空整个 window 的元素并销毁窗口;
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口。

2.3.5 API调用示例

source.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
source.keyBy(0).
timeWindow(Time.seconds(5))

03 Window案例演示

3.1 基于时间的滚动和滑动窗口

需求1:基于时间的滚动窗口 – 每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量

需求2:基于时间的滑动窗口 --每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量

模拟数据如下(信号灯编号和通过该信号灯的车的数量):

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

代码实现:

/**
 * TimeWindow
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 6:35 下午
 */
public class TimeWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });
        //分组
        //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
        // * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
        //timeWindow(Time size窗口大小, Time slide滑动间隔)
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.timeWindow(Time.seconds(5))//当size==slide,可以只写一个
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");
        // * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
        SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.timeWindow(Time.seconds(10), Time.seconds(5))
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum("count");
        //4.Sink
        /*
        1,5
        2,5
        3,5
        4,5
        */
        //result1.print();
        result2.print();
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

3.2 基于数量的滚动和滑动窗口

需求1:基于数量的滚动窗口:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计

需求2:基于数量的滑动窗口:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计

示例代码如下:

/**
 * CountWindow
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 6:40 下午
 */
public class CountWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });
        //分组
        //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
        // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        //countWindow(long size, long slide)
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.countWindow(5L, 5L)
                .countWindow(5L)
                .sum("count");
        // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        //countWindow(long size, long slide)
        SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                .countWindow(5L, 3L)
                .sum("count");
        //4.Sink
        //result1.print();
        /*
        1,1
        1,1
        1,1
        1,1
        2,1
        1,1
         */
        result2.print();
        /*
        1,1
        1,1
        2,1
        1,1
        2,1
        3,1
        4,1
         */
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

3.3 会话窗口

需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

示例代码如下:

/**
 * SessionWindow
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 6:42 下午
 */
public class SessionWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });
        //需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)
        SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum("count");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

04 文末

本文主要讲解Flink高级APIWindow,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
3月前
|
人工智能 数据可视化 测试技术
Postman 性能测试教程:快速上手 API 压测
本文介绍API上线后因高频调用导致服务器告警,通过Postman与Apifox进行压力测试排查性能瓶颈。对比两款工具在批量请求、断言验证、可视化报告等方面的优劣,探讨API性能优化策略及行业未来发展方向。
Postman 性能测试教程:快速上手 API 压测
|
5月前
|
JSON 监控 API
在线网络PING接口检测服务器连通状态免费API教程
接口盒子提供免费PING检测API,可测试域名或IP的连通性与响应速度,支持指定地域节点,适用于服务器运维和网络监控。
549 0
|
5月前
|
JSON API PHP
通用图片搜索API:百度源免费接口教程
本文介绍一款基于百度图片搜索的免费API接口,由接口盒子提供。支持关键词搜索,具备详细请求与返回参数说明,并提供PHP及Python调用示例。开发者可快速集成实现图片搜索功能,适用于内容聚合、素材库建设等场景。
728 0
|
5月前
|
JSON 机器人 API
随机昵称网名API接口教程:轻松获取百万创意昵称库
接口盒子提供随机昵称网名API,拥有百万级中文昵称库,支持聊天机器人、游戏角色等场景的昵称生成。提供详细调用指南及多语言示例代码,助力开发者高效集成。
326 0
|
3月前
|
人工智能 API 开发者
图文教程:阿里云百炼API-KEY获取方法,亲测全流程
本文详细介绍了如何获取阿里云百炼API-KEY,包含完整流程与截图指引。需先开通百炼平台及大模型服务,再通过控制台创建并复制API-KEY。目前平台提供千万tokens免费额度,适合开发者快速上手使用。
2603 5
|
5月前
|
JSON API PHP
天气预报免费API接口【地址查询版】使用教程
本文介绍了如何使用中国气象局官方数据提供的免费天气预报API接口,通过省份和地点查询指定地区当日天气信息。该接口由接口盒子支持,提供JSON格式数据、GET/POST请求方式,并需注册获取用户ID和KEY进行身份验证。
2816 2
|
5月前
|
JSON API PHP
ICP备案查询免费API接口使用教程
本文介绍如何通过接口盒子提供的免费API接口查询域名ICP备案信息,包含请求地址、参数说明及PHP和Python调用示例,适用于开发者快速集成备案查询功能。
483 1
|
5月前
|
存储 JSON API
文本存储免费API接口教程
接口盒子提供免费文本存储服务,支持1000条记录,每条最多5000字符,适用于公告、日志、配置等场景,支持修改与读取。
178 0
|
5月前
|
数据采集 JSON 监控
获取网页状态码(可指定地域)免费API接口教程
本文介绍如何使用接口盒子的免费API获取网页状态码,支持国内、香港、美国等不同地域访问节点。内容包括接口参数、调用方法及示例,适用于网站监控、链接检查等场景。
408 0
|
5月前
|
JSON 物联网 API
天气预报免费API接口【IP查询版】使用教程
IP查询天气API是一款免费实用的接口,可根据IP地址自动获取所在地天气预报,支持自定义IP查询。核心功能包括自动识别请求IP、全国IP天气查询,数据源自中国气象局,无日调用上限。提供详细的返回参数及多语言示例代码,适用于网站、APP、物联网设备等应用场景。
986 0