开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 : Flink SQL 编程(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10032
Flink SQL 编程(二)
四. 用 window 聚合和非 windo 聚合知识解决实际问题
需求1(filter)
统计出现在纽约的车。
那么如何计算出现在纽约的行车记录呢?
刚刚在 rides 表里讲到有这样两个字段:
lon:FLOAT
//经度
lat:FLOAT
//纬度
通过经度和纬度可以计算出在地图上的点,通过这个点可以得出行车记录是不是发生在纽约,所以要开发一个是否在纽约的 udf。
(开发 udf 时,运行时代码最好用 java 写。)
以下是一份已经做好的的代码:
package com.dataartisans.udfs;
import
org.apache.flink.table.functions.ScalarFunction;
import static
com.dataartisans.udfs.util.GeoUtils.isInnyc:
/**
* Table API / SQL Scalar UDF to check if a coordinate is in
NYC.
*/
public class
IsInNYC
extends
ScaLarFunction {
public
booLean eval(
float
Lon,
float
Lat){
return
isInNYC(Lon, Lat);
解释一下这段 udf 的代码:
要实现一个 udf,首先要继承自 scalarfunction 这个类,位置在 package 的下面。
然后要声明一个 eval 方法。eval 的参数可以任意定义,这里的定义是两个 float 的类型的经度和纬度,返回的类型是一个 Boolean:是不是在纽约。
纽约的经度纬度如果在这个经纬度范围里,那么就可以确定哪条是发生在纽约的行车记录。
如果想让这个 udf 在 SQL 里面运转的话,首先需要打包:
输入
SQL-udfs git:(master) mvn clean package
打包完成之后, target 上会有一个 jar 包,包含了开发的这些类。然后把这个 jar 包拖到 SQL client 的 lab 上面。
在 yaml 的配置文件里定义刚刚开发的函数,定义的这个函数在 function 下面可以定义 N 个,我们只需要定义一个isInNYC 的函数,它的名字就是 isInNYC。这样就注册好了。
注册好了用 SQL 试一下:
输入
Flink SQL>show function
可以看到 isInNYC 已经在表里了:
Flink SQL> show functions;
timeDiff
toCoords
isInNYC
toAreaId
Drivers
然后用 udf 进行一些操作
输入
Flink SQL> select * from Rides where isInNYC(lon, lat);
出现以下界面。
可以得出:出现在纽约的行车记录已经被选出。
接下来的需求是把它放到 view 里,因为后面要经常用到纽约这个需求。这样直接用 view 即可。
这就涉及到 CREATE VIEW 的语法。
需求2:
做一个无限流的聚合,计算搭载每种乘客数量的行车事件数。搭载一个乘客的行车数有多少个?大概两个乘客的行车数有多少个?
分析:Key 是每种乘客数量。它的 group by 里是没有开创这个函数的。一般来说,我们要用 count 去计算。
它的 query 写出来是比较简单的:
1
SELECT
2
psgCnt,
3
COUNT(*)as cnt
4
FROM Rides
5
GROUP BY psgCnt;
需求3(Window Agg)
为了持续地监测城市的交通流量,计算每个区域每5分钟的进入的车辆数。
我们只关心纽约的区域交通情况,并且只关心至少有5辆车子进入的区域
toAreaId 里面的 udf 已经计算好了,通过经度和纬度我们能够定位到地球上一个区域Id。
写出它的 Query:
Select
toAreald(lon,lat),AS area,
isStart,
TUMBLE_END(rideTime,INTERVAL '5'MINUTE), AS window_end,
COUNT(*)as AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
toAreald(lon,lat),
isStart,
TUMBLE(rideTime,INTERVAL ‘5’MINUTE)
HAVING COUNT(*)>=5;
根据它的区域 ID 和五分钟的窗口来做聚合。
由图可见窗口的结束时间是不断在变化的,而且区域 ID 和 window
的 END 都没有重复的结果。印证了 window 聚合的特点:只输出了一
次,不会再重复输出。
需求4(write to Kafka)
将每10分钟的搭乘的乘客数写入 Kafka
结果表: Sink_TenMinPsgCnts
Yaml表:
-name:Sink_ TenMinPsgCnts
type:sink
update-mode: append
schema:-name: cntStart
type:TIMESTAMP
-name:cntEnd
type:TIMESTAMP
-name:cnt
type:LONG connector:
property-version:1
type: kafka
version: 0.11
topic:TenMinPsgCnts
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: zookeeper:2181
-key:bootstrap.servers
value:kafka:9092
- key: group.id
value: trainingGroup
format:
property-version:1
type: json
输出模式是 append,因为 window 的输出没有更新。
Schema 有三个字段,起始时间、结束时间以及个数。
存在的类型是 kafka。
Properties 是位置的信息。
由 json 的方式储存。
Sink 表的定义和 source 表的定义,模式是一样的,只是 type 不一样。
Flink SQL > show tables;
DriverChanges
Fares
Rides
Sink_AreaCnts
Sink_TenMinPsgCnts
Flink SQL > d
escribe Sink_TenMinPsgCnts
Root
I--cntStart:Timestamp
I--cntEnd:Timestamp
I-- cnt :long
针对需求,执行以下 query:
Flink SQL >
SELECT
TUMBLE_START(rideTime,INTERVAL'10'MINUTE),
TUMBLE_END(rideTime, INTERVAL '10' MINUTE),
SUM(psgCnt) as cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
用 query 做出的结果,我们需要把它写到 kafka 里面
只需要在刚刚写出的 query 上面一行加上 INSERT INTO 这一语法
写到 Sink_TenMinPsgCnts 表里:
INSERT INTO Sink_TenMinPsgCnts
SELECT
TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),
SUM(psgCnt)as cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);
然后用 kafka 的命令监控 kafka 这一 topic 执行输入的情况:
输入
docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --boot
得出此表:
sql-training git:(master)
✘docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh--bootstrap-server kafka:9092--topic TenMinPsgCnts --from-beginning
[2019-04-29 12:55:47,317] WARN Error while fetching metadata with correlation id 2: {TenMinPsgCnts=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)
然后执行刚刚的 query
Flink SQL >INSERT INTO Sink_TenMinPsgCnts
SELECT
TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),
SUM(psgCnt)as cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);
显示错误
因为 schema 不匹配,现在 Flink 在 INSERT INTO 的时候是强类型匹配,要输入进去一定要输入相同的类型,所以query 里还要加入 cast:
输入
Flink SQL >INSERT INTO Sink_TenMinPsgCnts
SELECT
TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),
CAST(SUM(psgCnt)as BIGINT) as cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);
这样就提交成功了。
打开 docker 可以看到这边结果已经输出了:
cntStart":1357008600000,"cntEnd":1357009200000,"cnt":180721} cntStart":1357009200000,"cntEnd":1357009800000,"cnt":18024} cntStart":1357009800000,"cntEnd":1357010400000,"cnt":172017} cntStart":1357010400000,"cntEnd":1357011000000,"cnt":16870} cntStart":1357011000000,"cntEnd":1357011600000,"cnt":15888}
cntStart":1357011600000,"cntEnd":1357012200000,"cnt":15221} cntStart":1357012200000,"cntEnd":1357012800000,"cnt":14427}
说明已经成功地把 query 的结果插入到 kafka 里面了。
然后看 job 的运行情况
最上面一行是刚刚提交的 query。
下面的框里面,第一个是从 source 里面读,然后做了一个 window,后面接了一个 sink 连到 kafka 里面。
需求5(write to ES)
从每个区域出发的行车数,写入到ES。
结果表:Sink AreaCnts
要根据行车区域(区域 id)来做聚合,也就是统计行车数
写一下 query:
SELECT
toAreaId(lon, lat) as area,
COUNT(*)as cnt
FROM Rides
GROUP BY toAreaId(lon, lat)
可以看出跟之前的 group by query 不太一样,之前是 window 的聚合,每次只产生一个结果 ,不会更新,输出的是append 的模式。
group by 没有窗口,是在无限流上的聚合,输出的结果是 update 的结果,是不断的更新之前的结果。所以后面如果要接 sink,也是 update 的 sink。
看 yaml 文件的配置:
-name: Sink_AreaCnts
type: sink
update-mode: upsert
schema:
-name: areaId
type: INT
-name: cnt
type:LONG
connector:
type: elasticsearch
version: 6
hosts
-hostname: "elasticsearch"
port: 9200
protocol:"http"
index:"area-cnts"
document-type:"areacnt"
key-delimiter: "$"
format:
property-version:1
type: json
update-mode 是 upsert 的mode,所以它能够接受非 window 聚合的结果。
Schema 里有两个字段,一个是 areaId,一个是 Cnt。
定义 elasticsearch 的索引的名字为 area-cnts。
Format 类型是 json。
由此可以看出跟之前的主要区别点是 update-mode 是 upsert。目前 elasticsearch 支持 upsert 模式,但是像 kafka是不支持的。所以如果要把 group 的聚合写到结果表里,需要选一个可存储的结果表。
如果要把 query 写入结果表的话,要在前面加上 INSERT INTO Sink_AreaCnts
因为原 query 数据量过大,所以要加一个限制:where isInNYC (lon,lat)
Flink SQL >IN
SERT INTO Sink_AreaCnts
SELECT
toAreaId(lon, lat) as areaId,
COUNT(*)as cnt
FROM Rides
Where isInNYC(lon, lat)
GROUP BY toAreaId(lon, lat)
就可以得出需求5的答案了。
五. 答疑环节
1,问:请问统计用户当前点击事件发生时间倒推过去一小时内的点击数,用什么窗口?比如用户在13:24发生的一个点击事件要统计12:24到13:24的点击数。
答:目前 Flink SQL 的窗口还不支持这种功能。
2,问:一个表既是 source 表又充当 sink 表该怎么定义?
答:目前在 SQL client 里,如果一个表又是 source 又是 sink,在老版本里需要定义两遍,在新的版本里面 Type 等于both 可以定义这种类型。
3,问:无限流聚合没有窗口函数,SQL client 中如何设置 State TTL?如果不能设置,如何解决?
答:SQL client 中的配置 execution的item,里面可以配置 min 和 ma x 的 idle-state 的时间,用这两个参数和用代码写是一样的效果。
4,问:Flink SQL 批处理数据采用什么方式定时调度
答:目前不支持定时调度。
5,问:多表进行关联是需要依次生成中间表关联还是直接多表关联
答:可以直接多表进行关联。