开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 : Flink SQL 编程(一)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10032
Flink SQL 编程(一)
内容介绍
一, 什么是 Flink SQL
二, 执行 window aggregate 和 non-window aggregate,并理解其区别
三, 如何用 SQL 消费 Kafka 数据
四, 用 window 聚合和非 windo 聚合知识解决实际问题
五, 答疑环节
一. 什么是 Flink SQL
1. 声明式 API
Flink Table API 和Flink SQL都属于结构式的,关系式的 API。
Flink Table 是一种编程式的 API,而 Flink SQL 是一种纯文本的 API,但它们都是声明式的 API,都是属于 Flink 里面最高层的 API。
因为它是声明式的,用户只需要表达”我需要什么东西”而不用表达”我该怎么做”。
对于”怎么做” 这一问题的解答是:Flink 系统自动会帮用户做。
所以在使用上是较为简单的。
2. 自动优化
Flink SQL 也能够被自动优化,因为 Flink 能了解“你想要干什么”,知道”你想要做的事情”。
如果自己直接去写一个 job,是需要操作 State 的。但操作 State 是一件比较高门槛的事情,需要做比较多的一些 State 的结构设计以及 State 的优化。
但这些 Flink SQL 都会自动的帮忙去做,屏蔽了 State 的复杂性,自动做到最优处理。
3. 流批统一
同样一个 query,既可以用流的模式入仓,也可以用批的模式入仓。
一样的 SQL 能产生一样的结果。
4. 应用广泛
阿里巴巴大部分应用都是用的 Flink SQL 写的,例如 ETL,统计分析,实时报表以及实时的风控。所以 Flink SQL 的应用是非常广泛的。
二. 执行 window aggregate 和 non-window aggregate,并理解其区别
1.window 的聚合
已知的是我们需要在无限的数据流上做一个画窗的动作。
(1) Flink SQL 已经内置了三种常用的画窗的方法:
①TUMBLE WINDOW (固定的窗口)
TUMBLE (t , INTERVAL ’2’HOUR )
代表的是在时间线上固定的画好两小时的窗口。
比如说0点到2点是一个窗口,2点到4点是一个窗口,4点到6点是一个窗口。
窗口里面的数据再去做聚合。
② HOP WINDOW (滑动窗口)
HOP WINDOW 的第一个参数是 window 的大小,第二个参数是滑动的步长。
HOP(t, INTERVAL ’2 ’HOUR , INTERVAL ’1 ’HOUR)
定义的是两小时的窗口大小,每一小时滑动一下的一个滑动窗口,也就是0点到2点是一个窗口,1点到3点它也是一个窗口,2点到4点又是一个窗口,所以它是一种重叠式的 window。
最后再在画好的窗口上做聚合。
③ SESSION WINDOW (会话窗口)
SESSION(t, INTERVAL ’30 ‘ MINUTE)
的参数指的是会话窗口最大的间隔时间,如果超过这个间隔时间,就会认为是会话的结束,比如下图的两个窗口之间的间隔已经超过30分钟,所以它们是两个独立的会话窗口。
我们会发现这三种窗口的函数都有“t”的字段,这在 Flink SQL 里面叫做一种时间属性的字段,只有寄予时间属性,才能去做 window。在 Flink 里面,时间属性的字段分为两种,一种是 Processing Time(系统时间),一种是 Row Time(事件时间),只能作用在系统时间上。
(2)Window 聚合具体事例:
如果要统计每小时每个用户的点击次数,用 window aggregation 来做的话,只需要用里面的 Group By。首先按照 user 进行一下聚合,其次加上一个窗口的划分,这里用的是 TUMBLE,如下图所示,根据用户的点击时间来做1小时的划分,select 闪出:用户、每个窗口的结束时间、每个用户点击窗口的次数。
原表是一个click(点击)表,有三个字段,分别是用户、点击时间、url。
例如:有一批12:00到13:00的数据,经过 query 再经过聚合,可以统计出来Mary 一共点击了3次,Bob 点击了1次。
它们的窗口结束时间都是13:00。
又比如这样一组数据,是从13:00到14:00的。经过 window 的聚合得出:Bob 点击了1次,Liz点击了2次。
它们的窗口结束时间都是14:00。
Window 聚合的一个特点是会等到 window 的结束时间才会输出结果,而且只输出一次。
意思就是12:00到13:00的数据要等到13:00才会输出,并且只输出一次。
2.group 的聚合
(1)什么是 group 聚合
group 聚合是统计从历史到现在每一个用户的点击次数,所以没有 window 聚合里面画窗的步骤。
group 聚合属于无限流上的聚合。
(2) group聚合具体事例
query 直接按照用户做分组,使用 count 作为统计次数,原表是一个 click(点击)表,有三个字段,分别是user(用户)、cTime(点击时间)、url。
有一条 Mary 的点击记录,经过 query,得出 Mary 点击了一次,输出 Mary1。
又来了一条 Bob 的点击记录,同样的经过 query,统计出 Bob 点击了一次。
如果又来了一条 Mary 的记录,经过 query 得出 Mary 又点击了一次,之前的结果会发生更新,变成 Mary2。
所以可以得出一个结论:group 聚合,也就是非 window 的聚合的特点是,group聚合的计算模式是每来一条记录就会计算一次,输出的是最新的计算结果,因为它的计算结果是更新的结果,一般来说要选择一个可更新的数据库来存储。
3.总结
|
Window aggregation |
Group aggregation |
输出模式 |
按时输出 |
提前输出 |
输出量 |
只输出一次结果 |
Per key输出N个结果(Sink压力) |
输出流 |
Append Stream |
Update Stream |
状态清理 |
及时清理过期数据 |
状态无限增长 |
Sink |
均可 |
可更新的结果表(Hbase,MySQL) |
注:
(1) 在输出量上,Window 聚合对于一个 key 加 window 来说只输出一次它认为正确的结果,后面不需要对结果进行更新。
Group 聚合是在全局数据流无限流上的一个聚合,是一种永远都不会结束的过程。所以它任何时候发出的数据都是不正确的结果,是一种提前输出的结果。
当来了新的数据之后,它要对之前发出的数据进行不断地修正,不断地 update。
输出量是“每个 key 会输出N个结果”,N取决于上游的数据同key的数据量有多少。
window 聚合和 group 聚合在输出量上相比,window 的输出量要少很多,因为 window 聚合把数据聚合成了一条。而 group 聚合输出的数据量是很多的,如果没有任何调优的操作,它的输出量基本是和输入量差不多的。
所以同样一个 query 如果用 group 聚合来表达, sink 的压力是比较大的。
(2)在状态清理上,group 聚合是状态无限增长的。
主要看 group 聚合的 key 是不是无限增长,比如统计每个用户的点击次数,一般来说一个系统的用户是无限增长的,所以 group 聚合的状态也会随着 key 不断增长。
在生产上不建议用 group 聚合,因为到后面作业的稳定性、性能都没法达到一定的状态。如果要上线一个 group 聚合的作业,建议添加一个 State TTL 的配置。
配置的方式是
.withIdLeStateRetentionTime
(Time.day(1),Time.day(2))
这一配置的时间是最小到最大的一个等待时间,超过这个时间系统会自动把状态清除掉。
如果清除掉的数据以后再也用不到,那它对精确性则没有影响。但如果清除掉的数据后面还会再用到,后面访问到了之后输出的结果就是一个不正确的结果。所以 State TTL 这个配置是在精确性和状态大小之间的一个权衡。
一般按照经验来说是一天半左右。最好和 job 去分析一下业务可以容忍多久的数据,来决定要不要配置 State TTL。
三、如何用 SQL 消费 Kafka 数据
SQL Client 是 Flink SQL 提供的一个更便于学生学习和 demo 的一个工具,已事先注册好了五张表:
Flink SQL>show tables
>;
DriverChanges
Fares
Rides
Sink_AreaCnts
Sink_TenMinPsgCnts
只需要学习 Rides 这张表:
Flink SQL> describe Rides
;
root
I-- rideId:Long
I-- taxiId: Long
I-- isStart: Boolean
I-- Lon: FLoat
I-- Lat: FLoat
I-- rideTime: TimeIndicatorTypeInfo(rowtime)
I-- psgCnt: Intege
这些字段代表的含义是:
rideld: BIGINT
//行为 ID(包含两条记录,一条入一条出)
taxiId: BIGINT
//出租车 ID
isStart:BOOLEAN
//开始 or 结束
lon:FLOAT
//经度
lat:FLOAT
//纬度
rideTime:TIMESTAMP
//时间
psgCnt: INT
//乘客数
进入 Flink SQL>sel
ect*from Rides;
出现以下表格
可视化界面显示出实时从原表读取的数据,分为 ride Id、taxi Id、isStart、Lon。
SQL client 提供了一个配置文件的方式,下面大致的介绍配置的方式。
配置文件里有一个叫 tables 的属性,可以在里面定义N个表,如下图就是定义了一个“Rides”表。还可以定义其他的表,这里详细介绍 Rides 表。
Type 代表它是 source 还是 sink。
Update-mode 代表了这个表里面的数据是什么样的行为,有 append 模式,也有 update 模式。Rides 表每一个数据都是独立的数据,没有更新的数据,所以填的是 append。
Schema 中以下部分上文讲解过:
tables:
-name:Rides
type:isource
update-mode: append
schema:
-name: rideId
type:LONG
-name: taxiId
type:LONG
-name: isStart
type: BOOLEAN
-name: Lon
type:FLOAT
-name: Lat
type: FLOAT
Ridetime 是事件发生的时间,要用这个时间来进行例如窗口的操作,所以要把它变成 rowtime 字段。这里需要额外声明 Watermark,因为 rowtime 和 watermark 是有紧密联系的。这里的 delay 的意思是间隔60秒:
-name: rideTime
type:TIMESTAMP
rowtime:
imestamps:
type:"from-field"
from:"eventTime"
watermarks:
type:"periodic-bounded"
delay:"60000”
Connector 部分。Type 是 kafka 代表这是一张在 kafka 里面的原表,version 是0.11表示 kafka 使用的版本是0.11,kafka 的 topic 是 rides。Startup-mode 是 earliest-offset 代表最早从 offset 读起,也就是从表的一开始读起:
connector:
property-version: 1
type: kafka
version:0.11
topic: Rides
startup-mode: earliest-offset
Properties 是 kafka 的一个物理的位置,包括 zookeeper 的地址、bootstrap 的地址和 group id:
properties:
-key: zookeeper.connect
value:s{ZOOKEEPER}:2181
-key: bootstrap.servers
value: s{KAFKA}:9092
-key:group.id
value: testGroup
Format 部分。
怎么解析 kafka 里面的数据?
数据在存进去的时候就存成了一种 json 的格式,解析的时候也用 json 这种 format 来解析。Flink 现在内置的解析的format 有 json 等几种。