开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Flink Window 、Time(一)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10045
Flink Window 、Time (一)
目录
一、Window&time 介绍
1.为什么要有 Wndow
2 .Window 的类型
二、 Wndow API 的使用
1. Window 的三大组件
2 .TimesWatermark
3.时间语义
4.乱序问题解决 Watermark
5 .AllowLateness 正确设置与理船
6 .Sideoutput 在 Window中的使用
三、 Window 的内部实现原理
1 .Window 的处理流程
2 .Window 中的状态管理
四、 生产环境中的 Wndow 使用遇到的一些问题
一、 Window&time 介绍
ApacheFLink《nFL(以下简称 Flink)是一个天然支持无限流数据处理的分布式计算框架在 Flink中 Window 可以将无限流切分成有限流·是处理有限流的核心组件现在 Flink 中 Window 可以是时间驱动的(Time Window),也可以是数据驱动的(Count Window),下面的代码是在 Flink 中使用 Window 的两个代码级。
处理数据有一个特点,数据是一天的,比如说今天的零点到二十四点,这是有限的。对于 Flink 来说,处理的数据大多是无限的。我们需要通过某种方式使 Flink 处理这种有限的数据。
eyed windows
strea
.keyBy(...) keyed versus non-keyed windows
.vindowl...) - required:assigner
[.trigger(...)] - optional:trigger(else default trigge
.evictor(...)] < optional: "evictor" (else no evictor)
. allowedLateness(... ) < optional: "lateness"(else zero)
. sideOutputLateData-)<- optional:"output tag"else no side output for late data)
.reduce/aggregate/fold/apply() < required: "function"
L.getsideOutput(...)] < optional: "output tag
能看到右边这些是必须的,Window 的算子。一般场景,只需要实现 Window。对于这个 Window,简单来说,变成 keyby 1和之前的都是一样的了。在生产环境下,很少使用。
二、 Wndow API 的使用
(1). Window 的三大组件
从上一节我们已经知道 window 的一些基本概念以及相关 API,下面我们以一个实际例子来看看怎么使用 Window 相关的 APl。
代码来自 fuink-examples-
Datastrean
assignTinestampsAndwaternarksnew CarTimestamp())
.keyBy()
vindow(GlobalWindows./create/())
.evictor(TimeEvictor, /of/(Tine. /of/(evictionSec, TimeUnit./SECONDS/)))
trigger(DeltaTrigger./of/(triggerMeters,
new DeltaFunction()
private static final long /serialVersionUID/= 1L:
eoverride
public double getDelta(
Tuple4
Tuple4
return newDataPoint. f2-oldDataPoint.:
》
) carData. getType() createSerializer(env.getConfig())))
.maxBy(1):
这里的算子是 even timesession Window。
window 方法接收的输入是一个 WindowAssigner,
WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中) Flink 提供了几种通用的 windowAssigner: tumbling window (窗口间的元素无重复)· sliding window (口间的元素可能重复), session window 以及 global window,如果需要自己定制数据分发策略,则可以实现一个 class ,继承自 windowAssigner 即可。
有三种不同的 Window,横轴表示时间,对于 Flink 来说这里是一个 key,两条虚线之间是 Window 1 等,生成一个一个的 Window,然后两条虚线之间表示 Window size,所有的 Window 事不重叠的。
可以和下面的 Window 进行区分。
主要是对于这个 Window 来说,相邻两个元素之间的 Window 是不会重叠的。某个元素属于一个 Window,仅属于一个 Window。
同样的横轴表示时间,长的带箭头的线表示 Window size,表示 Window 有多长,短的线表示 Window size 每次滑多少。能看到 Window size 表示两条虚线的大小。每次会划过一条虚线。
这样的话得到,相邻的窗口是不重叠的。红色和蓝色的中间这一部分元素,既属于红色的 Window,也属于蓝色的 Window。每个元素可能会属于多个 Window,当这个数值等于窗口大小的时候,变成了上面的 Window。
横轴同样表示时间,每次打开网页的时候,有些网页活跃的时候,一段时间不活跃会让你重新登录。最小不登录的时间就是 gap。如果超过了这个时间的话,你就和之前不属于同一个 session 里了。中间带箭头这条线表示 session 的区别。
代码来说:
(rupleJstring, Long, integer> value Anput)(
ctx, collectWithTimestamp (value, value.)
etx, emitwatermark (new Watermark timestamp: value. -1))
ctx. emitNatermark (new WatermarkLong. MAX VALUE)
BOverride
public void cancel(
we create sessions for each with max timeout of 3 time units
Datastream> aggregated source
keyBy( fields: 0)
window (EventTimesessionWindows. withdap(. milliseconds(3L)))
positionToSum: 2)
if (fileOutput)
ggregated. writeAsText(params. get ("output"))
Ielse
System. out. printIn("Printing result to stdout. Use --output to specify output path. "
aggregated. print()
env.execute():
这条线对应的数值就是3毫秒。
evactor 主要用于做一些数据的自定义转换·可以在执行用户代码之前也可以在执行用户
代码之后更详细的描述可以参考
org. apache. ftink. streaming. api. windowing.evictors. Evictor evicBefore
evicArterFlink 两个方法·提供了如下三种通用的 evictor
CountEvictor 保留指定数量的元素
DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold, 判断是否删除一个元素
TimeEvictor 设定一个阅值 interval ,删除所有不再 max ts- interval 范围内的元素
其中 max ts 是窗口内时间戳的最大值
evictor 是可选的方法·如果用户不选择则默认没有
triggerwin 用来判断一个窗口是否需要被触发每个都自带一个默认的
trigger, 如果默认的 trigger 不能满足你的需求,则可以进行自定义一个类继承自 Trigger 即可
我们详细描述下 Trigger 的接口以及含义:
onEtement() 每次往 window 增加一个元素的时候都会触发
onEventTine() 当 event-time- timer 被触发的时候会调用。
onPrecessingtine() 当 processing-time- timer 被触发的时候调用
orfterge 对两个 trigger 的 sate 进行 merge 操作
elear wndow 的时候被调用
上面的接口中三个合返回一个 TriggerResult, Inggerkesutt 有如下几种可能的选择
CONTTNU 不做任何事情
FI 触发 window
P 清空整个 window 的元素并销致窗口
IRE PRG 触发窗口·然后销口
后面会更详细描述,Window的生命周期穿起来。
Window 结束时候触发 function,触发之前,做一些预处理。做一些过滤等,会使用 event。能看到自带的 Time evictor, Delta evictor, Count evictor。对于具体需要的 evictor,可以根据是否提供通用的一些实现。如果没有的话,只需要继承interface。实现 evictbefore 和 evictafter。
能看到代码里面,trigger 是表示一个 Window 要怎么被处理。有不同的子类,尝试加一些日志等,看一下 Window 整个的生命周期,数据流程。首先 trigger 是一个 option,一个 Window 怎么去处理。对于 evenTimesessionWindow,会提供一个默认的 trigger。
怎么在得到数据是晚到的,可能做一些特殊的处理。
(2)Time $Watermark
了解完上面的内容后·对于时间驱动的面口,我们还有两个需要满: Time 和 Watermark.
会包括不同类型的 Time,经常用的 Time 和 even Time。
我们知道分布式环境中 Time 是一个很重要的概念,在 Flink 中 Time 可分为三种event-Time, Processing--tme 以及 Ingestion--tme,三者的关系我们可以从下图中得知
数据发送到 message queue 里,在 source 里得到的时间 ingestion Time,会有处理的当前时间,也有墙上时间。有三种不同的时间。
Event-time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion--time 表示进入到系统的时间。
在 Flink 中我们可以通过下面的方式进行Tme类型的设置
env
setStreamTimeCharacteristic(TimeCharacteri ProcessingTime
设置使用
ProcessingTime
了解了 Time 之后,我们 Watermark 还需要知道相关的概念。
了解了 Time 之后·我们 Watermark 还需要知道相关的概念。
设置 event Time,所有时间基于 event Time。
我们可以考虑一个这样的例子:
某App会记录用户的所有点击行为,并回传日志(在网络不好的情况下先保存在本地,延后回传)·A用户在11:02对 App 进行操作用户都在11:03操作了 App,但是A用户的网络不太稳定,导致回传日志延迟了,在服务端我们先接受到B用户11:03的消息,然后接受到A用户11:02的消息,导致了消息的乱序。
那我们怎么保证基于 event time 的密口在销的时候以及处理完了所有的数据呢?这就是 watermark 的功能所在,watermark 会携带一个单调递增的时间戳t,,watermark 表所有时间不大与t的数据我们都已经处理完成了因此可以放心的触发和销毁窗口了,下图中一个乱序数据流中的 watermark 例子。
用户的手机 crash 后,需要分析 crash 之后的日志是什么样子,去完善 app。对于App 来说,它的用户可能是上千万,甚至更多,有用户在 wife 的场景下,可能比较好。在网络不太好的时候,这些日志一般先保存在手机本地,延迟上传给服务器。
Flink 先读到b 用户操作的数据,然后得到 a 用户的操作数据。实例来说,有一个乱序。
那么怎么知道 Window 有一个结束?
窗口有一个结束标志,是否应该结束,应该有一套机制来辨别。Window 可能翻译成窗口。对于 Window 来说,我们应该知道这个 Window 什么时候被触发。
触发的时候不会太早,可能有来的数据。接受用户12:03的数据,Window 刘被触发,a用户12:02的数据过一会才能到。一天的数据不需要再等一天。因为再等一天,时效性不好,等的越久,需要把之前的数据存储,各种操作。对数据进行操作,成本很大。这里会引入到 watermark 概念。
可以理解为标记现在数据已经到哪里,时间戳是一个单调递增函数。当 watermark是t的时候就,表示 t 之前的数据不会再来了,这只表示理论上。实际上,会收到一些。
越往右,时间戳越小。但是,这个是可能会有乱序的。虚线表示w(11)表示当前的watermark 为11,后面不会再收到小于等于11的时间戳对应的数据。w(17)同理表示不会再收到小于等于17的数据,这都是理想状态。在实际过程中,可能会晚到。只是希望在实际处理中,权衡一下,近似精确的值,代价不能太大。在真实世界,可能比较难得到一个真实的 watermark。处理 watermark,还需要上有的一些处理。
本来顺序已经乱了,应该怎样处理这个顺序。
迟到的数据
在理论中,watermark 为10的时候,不会受到小于10的数据,watermark 提供一个机制,会处理晚到的数据。
上面的 watermark 让我们能够应对乱序的数据但是真实世界中我们没法得到一个完美的 watermark-- 生成 watermark(t) 之后,还有较小的概车接受到时间戳t之前的数据,在 Flink中将这些数据定义为 late elements,同样我们可以在 window 中指定是允许延迟的最大时间(默认为0)可以使用下面的代码进行设置:
ingput
keyBylckey selector)
.vindow( assigners)
allowedateness
.orindowed transforration> function)
设置 allowedLateness 之后,迟来的数据同样可以触发窗口,进行输出,
利用 Flnk的side output 机制·我们可以获取到这些迟到的数据,使用方式如下:
final outputtagsts lateoutputTag new OutputTag("late-date")():
CatastreancT> inout ...
Simgleoutputstreanoperator result input
.keyeyldey selector)
.MindoweEndov assigserl
.llowecateness(ctines)
.sideoutputtateDataflateoutputTag)
indowed transforrat fonslerindow function;
DatastreancT> latestrean fesult getsideoutput (lateout putTanl;
需要注意的是设置了 allowedLateness 之后迟到的数据也可能触发窗口,对于 Session window 来说,可能会对窗口进行合并产生预期外的行为。
可以看到前面3个 Window,allow Window 可能会影响。