关于Flink框架窗口(window)函数最全解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

概述


在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。


Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

1.png


窗口类型


Flink中的窗口类型有两种:时间窗口(Time Window)、计数窗口(Count Window)。时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。计数窗口包含了:滚动计数窗口和滑动计数窗口。


滚动窗口(Tumbling Windows)


以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。

特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。


换句话说:如果制定了一个30分钟时间间隔的滚动窗口,然后就会将无界限的数据以30分钟为一个窗口期进行切割成有限的数据集合。

适用场景:做统计计算。做每个时间段的聚合计算。

1.png


滑动窗口(Sliding Windows)


以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。


窗口长度是固定的,窗口之间是可以重叠的。


说明:滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。


适用场景:(求某接口最近 5min 的失败率来决定是否要报警)对最近一个时间段内的统计。

2.png


会话窗口(Session Windows)


会话敞口只存在于时间窗口,计数窗口无会话窗口。

特点是时间无对齐

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。


当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

3.png



Window API使用


窗口分配器window()


在flink中可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。window() 方法接收的输入参数是一个 WindowAssigner WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner:滚动窗口(tumbling window)、滑动窗口(sliding window)、 会话窗口(session window)、全局窗口(global window)


代码如下:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //设置一个15秒的一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
                //会话窗口
                //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                //滑动窗口
                //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
        env.execute();
    }
}

Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口.


TimeWindow


TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。


CountWindow


CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。


CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。


创建不同类型的窗口


滚动时间窗口(tumbling time window)


.timeWindow(Time.seconds(15))

滑动时间窗口(sliding time window)


下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。

.timeWindow(Time.seconds(15),Time.seconds(5))


会话窗口(session window)


.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))

滚动计数窗口(tumbling count window)


默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

.countWindow(10)


滑动计数窗口(sliding count window)


下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。



.countWindow(10,2)

窗口函数


Flink中定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:增量聚合函数、全窗口函数。


增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有ReduceFunction AggregateFunction。

1.png

案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //对窗口进行聚合操作 增量窗口操作
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    //创建累加器
                    public Integer createAccumulator() {
                        return 0;
                    }
                    @Override
                    public Integer add(SensorReading sensorReading, Integer accumulator) {
                        return accumulator+1;
                    }
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }
                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return null;
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地测试:结果输出成功。


1.png

全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:ProcessWindowFunction,WindowFunction

1.png


案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
          //全量窗口函数
        DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
                        String id =tuple.getField(0);
                        Long windowEnd =window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id,windowEnd,count));
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地运行测试:结果输出成果

1.png


如何在winodws操作系统下使用nc命令进行代码测试:在Windows操作系统中怎样使用nc命令


其他API


触发器:.trigger() 定义 window 什么时候关闭,触发计算并输出结果

移除器:.evitor() 定义移除某些数据的逻辑

.allowedLateness() 允许处理迟到的数据

.sideOutputLateData() 将迟到的数据放入侧输出流

.getSideOutput()获取侧输出流


案例代码:


public class WindowTest2_CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });
        // 开计数窗口测试
        DataStream<Double> resultStream = dataStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new MyAvgTemp());
        //其他可选API 对迟到数据的处理方式
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                //.trigger()
                //.evictor()
                .allowedLateness(Time.minutes(1))
                //输出到测流
                .sideOutputLateData(outputTag)
                .sum("temperature");
       sumStream.getSideOutput(outputTag).print("late");
        resultStream.print();
        env.execute();
    }
    public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }
        @Override
        public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
        }
        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
        }
    }
}

1.png

此图来源于网络,window API 总览



注意点:


在 flink中我们定义一个window,必须在 keyBy操作之后。

窗口函数之后一定要有聚合操作。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 数据挖掘 测试技术
南大通用GBase8s数据库:LISTAGG函数的解析
南大通用GBase8s数据库:LISTAGG函数的解析
|
3月前
|
存储 Java
深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。
【10月更文挑战第16天】本文深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。HashSet基于哈希表实现,添加元素时根据哈希值分布,遍历时顺序不可预测;而TreeSet利用红黑树结构,按自然顺序或自定义顺序存储元素,确保遍历时有序输出。文章还提供了示例代码,帮助读者更好地理解这两种集合类型的使用场景和内部机制。
56 3
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
143 3
|
3月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
212 3
|
4天前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
58 28
|
26天前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
19天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
75 14
|
1月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
179 27
|
1月前
|
C语言 开发者
【C语言】断言函数 -《深入解析C语言调试利器 !》
断言(assert)是一种调试工具,用于在程序运行时检查某些条件是否成立。如果条件不成立,断言会触发错误,并通常会终止程序的执行。断言有助于在开发和测试阶段捕捉逻辑错误。
52 5
|
2月前
|
机器学习/深度学习 自然语言处理 语音技术
揭秘深度学习中的注意力机制:兼容性函数的深度解析
揭秘深度学习中的注意力机制:兼容性函数的深度解析

推荐镜像

更多