全网最详细4W字Flink入门笔记(下) 2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 全网最详细4W字Flink入门笔记(下)

集群级配置StateBackend

全局配置需要需改集群中的配置文件,修改flink-conf.yaml

  • 配置FsStateBackend
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
  • 配置MemoryStateBackend
state.backend: jobmanager
  • 配置RocksDBStateBackend
state.backend.rocksdb.checkpoint.transfer.thread.num: 1 同时操作RocksDB的线程数
  state.backend.rocksdb.localdir: 本地path   RocksDB存储状态数据的本地文件路径

Window

在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有数据都到齐了才开始处理。所以聚合计算其实在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。

说白了窗口就是将无界流通过窗口切割成一个个的有界流,窗口是左开右闭的

Flink中的窗口分为两类:基于时间的窗口(Time-based Window)和基于数量的窗口(Count-based Window)

  • 时间窗口(Time Window):按照时间段去截取数据,这在实际应用中最常见。
  • 计数窗口(Count Window):由数据驱动,也就是说按照固定的个数,来截取一段数据集。

时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)

计数窗口包含了:滚动计数窗口和滑动计数窗口

时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以由不同的功能应用。

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

滚动窗口(Tumbling Windows)

滚动窗口每个窗口的大小固定,且相邻两个窗口之间没有重叠。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

基于时间的滚动窗口:

DataStream<T> input = ...
// tumbling event-time windows
input
  .keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .<window function> (...)
// tumbling processing-time windows
input
  .keyBy(...)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  .<window function> (...)

在上面的代码中,我们使用了TumblingEventTimeWindowsTumblingProcessingTimeWindows来创建基于Event Time或Processing Time的滚动时间窗口。窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time中的secondsminuteshoursdays来设置。

基于计数的滚动窗口:

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TumblingCountWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
        input
            .keyBy(value -> 1)
            .countWindow(3)
            .reduce(new ReduceFunction<Long>() {
                @Override
                public Long reduce(Long value1, Long value2) throws Exception {
                    return value1 + value2;
                }
            })
            .print();
        env.execute();
    }
}

在上面的代码中,我们使用了countWindow方法来创建一个基于数量的滚动窗口,窗口大小为3个元素。当窗口中的元素数量达到3时,窗口就会触发计算。在这个例子中,我们使用了reduce函数来对窗口中的元素进行求和。

滑动窗口(Sliding Windows)

滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。同样,滑动窗口也可以基于时间和计算定义。

滑动窗口的参数有两个:窗口大小和滑动步长。滑动步长是固定的

img

基于时间的滑动窗口:

DataStream<T> input = ...
// sliding event-time windows
input
  .keyBy(...)
  .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  .<window function> (...)

基于计数的滑动窗口:

DataStream<T> input = ...
input
  .keyBy(...)
  .countWindow(10, 5)
  .<window function> (...)

countWindow方法来创建一个基于计数的滑动窗口,窗口大小为10个元素,滑动步长为5个元素。当窗口中的元素数量达到10时,窗口就会触发计算。

会话窗口(Session Windows)

会话窗口是Flink中一种基于时间的窗口类型,每个窗口的大小不固定,且相邻两个窗口之间没有重叠。“会话”终止的标志就是隔一段时间没有数据来

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<T> input = ...
input
  .keyBy(...)
  .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  .<window function> (...)

在上面的代码中,使用了EventTimeSessionWindows来创建基于Event Time的会话窗口。withGap方法用来设置会话窗口之间的间隔时间,当两个元素之间的时间差超过这个值时,它们就会被分配到不同的会话窗口中。

按键分区窗口和非按键分区窗口

在Flink中,数据流可以按键分区(keyed)或非按键分区(non-keyed)。按键分区是指将数据流根据特定的键值进行分区,使得相同键值的元素被分配到同一个分区中。这样可以保证相同键值的元素由同一个worker实例处理。只有按键分区的数据流才能使用键分区状态和计时器。

非按键分区是指数据流没有根据特定的键值进行分区。这种情况下,数据流中的元素可以被任意分配到不同的分区中。

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作。

按键分区窗口:

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class KeyedWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
        input
            .keyBy(value -> 1)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .reduce(new ReduceFunction<Long>() {
                @Override
                public Long reduce(Long value1, Long value2) throws Exception {
                    return value1 + value2;
                }
            })
            .print();
        env.execute();
    }
}

在上面的代码中,使用了keyBy方法来对数据流进行按键分区,然后使用window方法来创建一个基于Event Time的滚动时间窗口。在这个例子中,我们使用了reduce函数来对窗口中的元素进行求和。

非按键分区窗口:

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class NonKeyedWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
        AllWindowedStream<Long, ?> windowedStream = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
        windowedStream.reduce(new ReduceFunction<Long>() {
            @Override
            public Long reduce(Long value1, Long value2) throws Exception {
                return value1 + value2;
            }
        }).print();
        env.execute();
    }
}

在上面的代码中,使用了windowAll方法来对非按键分区的数据流进行窗口操作。windowAll方法接受一个WindowAssigner参数,用来指定窗口类型。然后使用了reduce函数来对窗口中的元素进行求和。

按键分区窗口(Keyed Windows)经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

非按键分区(Non-Keyed Windows)如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式

窗口函数(WindowFunction)

所谓的“窗口函数”(window functions),就是定义窗口如何进行计算的操作。

窗口函数根据处理的方式可以分为两类:增量聚合函数和全量聚合函数。

增量聚合函数

增量聚合函数每来一条数据就立即进行计算,中间保持着聚合状态;但是不立即输出结果。等到窗口到了结束时间需要输出计算结果的时候,取出之前聚合的状态直接输出。

常见的增量聚合的函数有:reduce(reduceFunction)、aggregate(aggregateFunction)、sum()、min()、max()。

下面是一个使用增量聚合函数的Java代码示例:

DataStream<Tuple2<String, Integer>> input = ...
input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return value.f0;
        }
    })
    .timeWindow(Time.seconds(5))
    .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
            return new Tuple2<>(t0.f0, t0.f1 + t1.f1);
        }
    });

这段代码首先使用keyBy方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的时间窗口,并使用reduce方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是将具有相同key(即f0相同)的元素的第二个元素(f1)相加。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值之和的数据流。

另外还有一个常用的函数是聚合函数(AggregateFunction),ReduceFunction和AggregateFunction都是增量聚合函数,但它们之间有一些区别。AggregateFunction则更加灵活,ReduceFunction的输入类型、输出类型和中间状态类型必须相同,而AggregateFunction则允许这三种类型不同。

例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用ReduceFunction,那么我们应该先把数据转换成二元组 (sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。而使用AggregateFunction则可以更加简单地实现这个需求

下面是使用AggregateFunction计算平均值的代码示例:

DataStream<Tuple2<String, Double>> input = ...
input
    .keyBy(new KeySelector<Tuple2<String, Double>, String>() {
        @Override
        public String getKey(Tuple2<String, Double> value) throws Exception {
            return value.f0;
        }
    })
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple2<Double, Integer>, Double>() {
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }
        @Override
        public Tuple2<Double, Integer> add(Tuple2<String, Double> value, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);
        }
        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
        }
    });

这段代码首先使用keyBy方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的翻滚事件时间窗口,并使用aggregate方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是计算具有相同key(即f0相同)的元素的第二个元素(f1)的平均值。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值平均值的数据流。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
276 3
|
Java Linux API
flink入门-流处理
flink入门-流处理
167 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
183 0
|
8月前
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
210 3
|
8月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
8月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
8月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1439 3
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
126 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
223 0