Flink Window 、Time(一)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习 Flink Window 、Time 。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink Window 、Time(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10045


Flink Window 、Time (一)

 

目录

一、Window&time 介绍

1.为什么要有 Wndow

2 .Window 的类型

二、 Wndow API 的使用

1. Window 的三大组件

2 .TimesWatermark

3.时间语义

4.乱序问题解决 Watermark

5 .AllowLateness 正确设置与理船

6 .Sideoutput 在 Window中的使用

三、 Window 的内部实现原理

1 .Window 的处理流程

2 .Window 中的状态管理

四、 生产环境中的 Wndow 使用遇到的一些问题


一、 Window&time 介绍

ApacheFLink《nFL(以下简称 Flink)是一个天然支持无限流数据处理的分布式计算框架在 Flink中 Window 可以将无限流切分成有限流·是处理有限流的核心组件现在 Flink 中 Window 可以是时间驱动的(Time Window)也可以是数据驱动的(Count Window),下面的代码是在  Flink 中使用 Window 的两个代码级。

处理数据有一个特点,数据是一天的,比如说今天的零点到二十四点,这是有限的。对于 Flink 来说,处理的数据大多是无限的。我们需要通过某种方式使 Flink 处理这种有限的数据。

eyed windows

strea

.keyBy(...)  keyed versus non-keyed windows

.vindowl...) - required:assigner

[.trigger(...)] - optional:trigger(else default trigge

.evictor(...)] < optional: "evictor" (else no evictor)

. allowedLateness(... ) < optional: "lateness"(else zero)

. sideOutputLateData-)<- optional:"output tag"else no side output for late data)

.reduce/aggregate/fold/apply()  < required: "function"

L.getsideOutput(...)]  < optional: "output tag

能看到右边这些是必须的,Window 的算子。一般场景,只需要实现 Window。对于这个 Window,简单来说,变成 keyby 1和之前的都是一样的了。在生产环境下,很少使用。

 

二、 Wndow API 的使用

(1). Window 的三大组件

从上一节我们已经知道 window 的一些基本概念以及相关 API下面我们以一个实际例子来看看怎么使用 Window 相关的 APl

代码来 fuink-examples-

Datastrean

assignTinestampsAndwaternarksnew CarTimestamp())

.keyBy()

vindow(GlobalWindows./create/())

.evictor(TimeEvictor, /of/(Tine. /of/(evictionSec, TimeUnit./SECONDS/)))

trigger(DeltaTrigger./of/(triggerMeters,

new DeltaFunction()

private static final long /serialVersionUID/= 1L:

eoverride

public double getDelta(

Tuple4

Tuple4

return newDataPoint. f2-oldDataPoint.:

) carData. getType() createSerializer(env.getConfig())))

.maxBy(1):

这里的算子是 even timesession Window。

window 方法接收的输入是一个 WindowAssigner,

WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中) Flink 提供了几种通用的  windowAssigner: tumbling window (窗口间的元素无重复)· sliding window (口间的元素可能重复),  session window 以及  global window,如果需要自己定制数据分发策略,则可以实现一个 class ,继承自 windowAssigner 即可。

有三种不同的 Window,横轴表示时间,对于 Flink 来说这里是一个 key,两条虚线之间是 Window 1 等,生成一个一个的 Window,然后两条虚线之间表示 Window size,所有的 Window 事不重叠的。

可以和下面的 Window 进行区分。

主要是对于这个 Window 来说,相邻两个元素之间的 Window 是不会重叠的。某个元素属于一个 Window,仅属于一个 Window。

同样的横轴表示时间,长的带箭头的线表示 Window size,表示 Window 有多长,短的线表示 Window size 每次滑多少。能看到 Window size 表示两条虚线的大小。每次会划过一条虚线。

这样的话得到,相邻的窗口是不重叠的。红色和蓝色的中间这一部分元素,既属于红色的 Window,也属于蓝色的 Window。每个元素可能会属于多个 Window,当这个数值等于窗口大小的时候,变成了上面的 Window。

横轴同样表示时间,每次打开网页的时候,有些网页活跃的时候,一段时间不活跃会让你重新登录。最小不登录的时间就是 gap。如果超过了这个时间的话,你就和之前不属于同一个 session  里了。中间带箭头这条线表示 session 的区别。

代码来说:

(rupleJstring, Long, integer> value Anput)(

ctx, collectWithTimestamp (value, value.)

etx, emitwatermark (new Watermark timestamp: value. -1))

ctx. emitNatermark (new WatermarkLong. MAX VALUE)

BOverride

public void cancel(

we create sessions for each with max timeout of 3 time units

Datastream> aggregated source

keyBy( fields: 0)

window (EventTimesessionWindows. withdap(. milliseconds(3L)))

positionToSum: 2)

if (fileOutput) 

ggregated. writeAsText(params. get ("output"))

Ielse

System. out. printIn("Printing result to stdout. Use --output to specify output path. "

aggregated. print()

env.execute():

这条线对应的数值就是3毫秒。

evactor 主要用于做一些数据的自定义转换·可以在执行用户代码之前也可以在执行用户

代码之后更详细的描述可以参考

org. apache. ftink. streaming. api. windowing.evictors. Evictor evicBefore

evicArterFlink 两个方法·提供了如下三种通用的 evictor

CountEvictor 保留指定数量的元素

DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold, 判断是否删除一个元素

TimeEvictor 设定一个阅值 interval ,删除所有不再 max ts- interval 范围内的元素

其中 max ts 是窗口内时间戳的最大值

evictor 是可选的方法·如果用户不选择则默认没有

triggerwin 用来判断一个窗口是否需要被触发每个都自带一个默认的

trigger, 如果默认的 trigger 不能满足你的需求,则可以进行自定义一个类继承自 Trigger 即可

我们详细描述下 Trigger 的接口以及含义:

onEtement() 每次往 window 增加一个元素的时候都会触发

onEventTine() 当 event-time- timer 被触发的时候会调用。

onPrecessingtine() 当 processing-time- timer 被触发的时候调用

orfterge 对两个 trigger 的 sate 进行 merge 操作

elear wndow 的时候被调用

上面的接口中三个合返回一个 TriggerResult, Inggerkesutt 有如下几种可能的选择

CONTTNU 不做任何事情

FI 触发 window

P 清空整个 window 的元素并销致窗口

IRE PRG 触发窗口·然后销口

后面会更详细描述,Window的生命周期穿起来。

Window 结束时候触发 function,触发之前,做一些预处理。做一些过滤等,会使用 event。能看到自带的 Time evictor, Delta evictor, Count evictor。对于具体需要的 evictor,可以根据是否提供通用的一些实现。如果没有的话,只需要继承interface。实现 evictbefore 和 evictafter。

能看到代码里面,trigger 是表示一个 Window 要怎么被处理。有不同的子类,尝试加一些日志等,看一下 Window 整个的生命周期,数据流程。首先 trigger 是一个 option,一个 Window 怎么去处理。对于 evenTimesessionWindow,会提供一个默认的 trigger。

怎么在得到数据是晚到的,可能做一些特殊的处理。

(2)Time $Watermark

了解完上面的内容后·对于时间驱动的面口我们还有两个需要满: Time 和  Watermark.

会包括不同类型的 Time,经常用的 Time 和 even Time。

我们知道分布式环境中 Time 是一个很重要的概念,在 Flink 中 Time 可分为三种event-Time, Processing--tme 以及 Ingestion--tme三者的关系我们可以从下图中得知

数据发送到 message queue 里,在 source 里得到的时间 ingestion Time,会有处理的当前时间,也有墙上时间。有三种不同的时间。

Event-time 表示事件发生的时间Processing-Time 则表示处理消息的时间(墙上时间)Ingestion--time 表示进入到系统的时间 

在 Flink 中我们可以通过下面的方式进行Tme类型的设置

env setStreamTimeCharacteristic(TimeCharacteri ProcessingTime 设置使用

ProcessingTime

了解了 Time 之后我们  Watermark 还需要知道相关的概念

了解了 Time 之后·我们 Watermark 还需要知道相关的概念。

设置 event Time,所有时间基于 event Time。

我们可以考虑一个这样的例子:

某App会记录用户的所有点击行为,并回传日志(在网络不好的情况下先保存在本地,延后回传)·A用户在11:02对 App 进行操作用户都在11:03操作了 App,但是A用户的网络不太稳定,导致回传日志延迟了,在服务端我们先接受到B用户11:03的消息,然后接受到A用户11:02的消息,导致了消息的乱序。 

那我们怎么保证基于 event time 的密口在销的时候以及处理完了所有的数据呢?这就是 watermark 的功能所在,watermark 会携带一个单调递增的时间戳t,,watermark 表所有时间不大与t的数据我们都已经处理完成了因此可以放心的触发和销毁窗口了,下图中一个乱序数据流中的 watermark 例子。

用户的手机 crash 后,需要分析 crash 之后的日志是什么样子,去完善 app。对于App 来说,它的用户可能是上千万,甚至更多,有用户在 wife 的场景下,可能比较好。在网络不太好的时候,这些日志一般先保存在手机本地,延迟上传给服务器。

Flink 先读到b 用户操作的数据,然后得到 a 用户的操作数据。实例来说,有一个乱序。

那么怎么知道 Window 有一个结束?

窗口有一个结束标志,是否应该结束,应该有一套机制来辨别。Window 可能翻译成窗口。对于 Window 来说,我们应该知道这个 Window 什么时候被触发。

触发的时候不会太早,可能有来的数据。接受用户12:03的数据,Window 刘被触发,a用户12:02的数据过一会才能到。一天的数据不需要再等一天。因为再等一天,时效性不好,等的越久,需要把之前的数据存储,各种操作。对数据进行操作,成本很大。这里会引入到 watermark 概念。

可以理解为标记现在数据已经到哪里,时间戳是一个单调递增函数。当 watermark是t的时候就,表示 t 之前的数据不会再来了,这只表示理论上。实际上,会收到一些。

越往右,时间戳越小。但是,这个是可能会有乱序的。虚线表示w(11)表示当前的watermark 为11,后面不会再收到小于等于11的时间戳对应的数据。w(17)同理表示不会再收到小于等于17的数据,这都是理想状态。在实际过程中,可能会晚到。只是希望在实际处理中,权衡一下,近似精确的值,代价不能太大。在真实世界,可能比较难得到一个真实的 watermark。处理 watermark,还需要上有的一些处理。

本来顺序已经乱了,应该怎样处理这个顺序。

迟到的数据

在理论中,watermark 为10的时候,不会受到小于10的数据,watermark 提供一个机制,会处理晚到的数据。

上面的 watermark 让我们能够应对乱序的数据但是真实世界中我们没法得到一个完美的 watermark-- 生成 watermark(t) 之后,还有较小的概车接受到时间戳t之前的数据,在 Flink中将这些数据定义为 late elements,同样我们可以在 window 中指定是允许延迟的最大时间(默认为0)可以使用下面的代码进行设置:

ingput

keyBylckey selector)

.vindow( assigners)

allowedateness

.orindowed transforration> function)

设置 allowedLateness 之后,迟来的数据同样可以触发窗口,进行输出,

利用 Flnk的side output 机制·我们可以获取到这些迟到的数据,使用方式如下:

final outputtagsts lateoutputTag new OutputTag("late-date")():

CatastreancT> inout ...

Simgleoutputstreanoperator result input

.keyeyldey selector)

.MindoweEndov assigserl

.llowecateness(ctines)

.sideoutputtateDataflateoutputTag)

indowed transforrat fonslerindow function;

DatastreancT> latestrean fesult getsideoutput (lateout putTanl;

需要注意的是设置了 allowedLateness 之后迟到的数据也可能触发窗口,对于 Session window 来说,可能会对窗口进行合并产生预期外的行为。

可以看到前面3个 Window,allow Window 可能会影响。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
流计算
Flink窗口——window
Flink窗口——window
39 0
|
6月前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
315 2
|
API 流计算 Windows
Flink之窗口 (Window) 下篇2
Flink之窗口 (Window) 下篇
131 0
|
缓存 API 流计算
Flink之窗口 (Window) 下篇1
Flink之窗口 (Window) 下篇
141 0
|
存储 程序员 BI
Flink之窗口 (Window) 上篇
Flink之窗口 (Window) 上篇
348 0
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
666 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
流计算
Flink Window 排序
## 概述 - 对增量Window进行输出排序 - WordCount增量(按单词名称排序) - WordCount增量(按单词个数,再单词名称排序)
5322 0
|
存储 流计算
Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口
​上一篇文章提到了CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。...
410 0
Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口
|
存储 Java Apache
Flink Window 、Time(二)| 学习笔记
快速学习 Flink Window 、Time 。
|
API 流计算 Windows
关于Flink框架窗口(window)函数最全解析
在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。
关于Flink框架窗口(window)函数最全解析