Flink SQL 编程(二)| 学习笔记

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink SQL 编程。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品  Flink SQL 编程(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10032


Flink SQL 编程(二)

 

四. 用 window 聚合和非 windo 聚合知识解决实际问题

需求1(filter)

统计出现在纽约的车。

那么如何计算出现在纽约的行车记录呢? 

刚刚在 rides 表里讲到有这样两个字段: 

lon:FLOAT      //经度

lat:FLOAT      //纬度

通过经度和纬度可以计算出在地图上的点,通过这个点可以得出行车记录是不是发生在纽约,所以要开发一个是否在纽约的 udf。

(开发 udf 时,运行时代码最好用 java 写。)

以下是一份已经做好的的代码:

package  com.dataartisans.udfs;

import org.apache.flink.table.functions.ScalarFunction;

import static com.dataartisans.udfs.util.GeoUtils.isInnyc:

/**

* Table API / SQL Scalar UDF to check if a coordinate is in NYC.

*/

public class IsInNYC extends ScaLarFunction {

public booLean eval(float Lon, float Lat){

return isInNYC(Lon, Lat);

解释一下这段 udf 的代码:

要实现一个 udf,首先要继承自 scalarfunction 这个类,位置在 package 的下面。

然后要声明一个 eval 方法。eval 的参数可以任意定义,这里的定义是两个 float 的类型的经度和纬度,返回的类型是一个 Boolean:是不是在纽约。

纽约的经度纬度如果在这个经纬度范围里,那么就可以确定哪条是发生在纽约的行车记录。

如果想让这个 udf 在 SQL 里面运转的话,首先需要打包:

输入

SQL-udfs git:(master) mvn clean package

打包完成之后, target 上会有一个 jar 包,包含了开发的这些类。然后把这个 jar 包拖到 SQL client  的 lab 上面。

在 yaml 的配置文件里定义刚刚开发的函数,定义的这个函数在 function 下面可以定义 N 个,我们只需要定义一个isInNYC 的函数,它的名字就是 isInNYC。这样就注册好了。

注册好了用 SQL 试一下:

输入

Flink SQL>show function

可以看到 isInNYC 已经在表里了:

Flink SQL> show functions;

timeDiff

toCoords

isInNYC

toAreaId

Drivers

然后用 udf 进行一些操作

输入

Flink SQL> select * from Rides where isInNYC(lon, lat); 

出现以下界面。

可以得出:出现在纽约的行车记录已经被选出。

接下来的需求是把它放到 view 里,因为后面要经常用到纽约这个需求。这样直接用 view 即可。

这就涉及到 CREATE VIEW 的语法。

需求2:

做一个无限流的聚合,计算搭载每种乘客数量的行车事件数。搭载一个乘客的行车数有多少个?大概两个乘客的行车数有多少个?

分析:Key 是每种乘客数量。它的 group by 里是没有开创这个函数的。一般来说,我们要用 count 去计算。

它的 query 写出来是比较简单的: 

1 SELECT

2 psgCnt,

3 COUNT(*)as cnt

4 FROM Rides

5 GROUP BY psgCnt;

需求3(Window Agg)

为了持续地监测城市的交通流量,计算每个区域每5分钟的进入的车辆数。

我们只关心纽约的区域交通情况,并且只关心至少有5辆车子进入的区域

toAreaId 里面的 udf 已经计算好了,通过经度和纬度我们能够定位到地球上一个区域Id。

写出它的 Query:

Select

toAreald(lon,lat),AS area,

isStart,

TUMBLE_END(rideTime,INTERVAL '5'MINUTE), AS window_end,

COUNT(*)as AS cnt

FROM Rides

WHERE isInNYC(lon, lat)

GROUP BY

toAreald(lon,lat),

isStart,

TUMBLE(rideTime,INTERVAL ‘5’MINUTE)

HAVING COUNT(*)>=5;

根据它的区域 ID 和五分钟的窗口来做聚合。

image.png

由图可见窗口的结束时间是不断在变化的,而且区域 ID 和 window

的 END 都没有重复的结果。印证了 window 聚合的特点:只输出了一

次,不会再重复输出。

需求4(write to Kafka)

将每10分钟的搭乘的乘客数写入 Kafka

结果表: Sink_TenMinPsgCnts 

Yaml表:

-name:Sink_ TenMinPsgCnts

type:sink

update-mode: append

schema:-name: cntStart

type:TIMESTAMP

-name:cntEnd

type:TIMESTAMP

-name:cnt

type:LONG  connector:

property-version:1

type: kafka

version: 0.11

topic:TenMinPsgCnts

startup-mode: earliest-offset

properties:

- key: zookeeper.connect

value: zookeeper:2181

-key:bootstrap.servers

value:kafka:9092

- key: group.id

value: trainingGroup

format:

property-version:1

type: json

输出模式是 append,因为 window 的输出没有更新。

Schema 有三个字段,起始时间、结束时间以及个数。

存在的类型是 kafka。

Properties 是位置的信息。

由 json 的方式储存。

Sink 表的定义和 source 表的定义,模式是一样的,只是 type 不一样。

Flink SQL > show tables;

DriverChanges

Fares

Rides

Sink_AreaCnts

Sink_TenMinPsgCnts

Flink SQL > describe Sink_TenMinPsgCnts

Root

I--cntStart:Timestamp

I--cntEnd:Timestamp

I-- cnt :long

针对需求,执行以下 query:

Flink SQL > SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE),

TUMBLE_END(rideTime, INTERVAL '10' MINUTE),

SUM(psgCnt) as cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

用 query 做出的结果,我们需要把它写到 kafka 里面

只需要在刚刚写出的 query 上面一行加上 INSERT INTO 这一语法

写到 Sink_TenMinPsgCnts 表里:

INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

SUM(psgCnt)as  cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);

然后用 kafka 的命令监控 kafka 这一 topic 执行输入的情况:

输入

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh   --boot

得出此表:

sql-training git:(master) ✘docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh--bootstrap-server kafka:9092--topic TenMinPsgCnts --from-beginning

[2019-04-29 12:55:47,317] WARN Error while fetching metadata with correlation id 2: {TenMinPsgCnts=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)

然后执行刚刚的 query

Flink SQL >INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

SUM(psgCnt)as  cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);

显示错误

因为 schema 不匹配,现在 Flink 在 INSERT INTO 的时候是强类型匹配,要输入进去一定要输入相同的类型,所以query 里还要加入 cast:

输入

Flink SQL >INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

CAST(SUM(psgCnt)as BIGINT) as cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);  

这样就提交成功了。 

打开 docker 可以看到这边结果已经输出了:

cntStart":1357008600000,"cntEnd":1357009200000,"cnt":180721} cntStart":1357009200000,"cntEnd":1357009800000,"cnt":18024} cntStart":1357009800000,"cntEnd":1357010400000,"cnt":172017} cntStart":1357010400000,"cntEnd":1357011000000,"cnt":16870} cntStart":1357011000000,"cntEnd":1357011600000,"cnt":15888}

cntStart":1357011600000,"cntEnd":1357012200000,"cnt":15221} cntStart":1357012200000,"cntEnd":1357012800000,"cnt":14427}

说明已经成功地把 query 的结果插入到 kafka 里面了。

然后看 job 的运行情况

最上面一行是刚刚提交的 query。

下面的框里面,第一个是从 source 里面读,然后做了一个 window,后面接了一个 sink 连到 kafka 里面。

需求5(write to ES)

从每个区域出发的行车数,写入到ES。

结果表:Sink AreaCnts

要根据行车区域(区域 id)来做聚合,也就是统计行车数

写一下 query: 

SELECT

toAreaId(lon, lat) as area,

COUNT(*)as cnt

FROM Rides

GROUP BY toAreaId(lon, lat)

可以看出跟之前的 group by query 不太一样,之前是 window 的聚合,每次只产生一个结果 ,不会更新,输出的是append 的模式。

group by 没有窗口,是在无限流上的聚合,输出的结果是 update 的结果,是不断的更新之前的结果。所以后面如果要接 sink,也是 update 的 sink。

看 yaml 文件的配置: 

-name: Sink_AreaCnts

type: sink

update-mode: upsert

schema:

-name: areaId

type: INT

-name: cnt

type:LONG

connector:

type: elasticsearch

version: 6

hosts

-hostname: "elasticsearch"

port: 9200

protocol:"http"

index:"area-cnts"

document-type:"areacnt"

key-delimiter: "$"

format:

property-version:1

type: json

update-mode 是 upsert 的mode,所以它能够接受非 window 聚合的结果。

Schema 里有两个字段,一个是 areaId,一个是 Cnt。

定义 elasticsearch 的索引的名字为 area-cnts。

Format 类型是 json。

由此可以看出跟之前的主要区别点是 update-mode 是 upsert。目前 elasticsearch 支持 upsert 模式,但是像 kafka是不支持的。所以如果要把 group 的聚合写到结果表里,需要选一个可存储的结果表。

如果要把 query 写入结果表的话,要在前面加上 INSERT INTO Sink_AreaCnts

因为原 query 数据量过大,所以要加一个限制:where isInNYC (lon,lat)

Flink SQL >INSERT INTO Sink_AreaCnts

SELECT

toAreaId(lon, lat) as areaId,

COUNT(*)as cnt

FROM Rides

Where isInNYC(lon, lat)

GROUP BY toAreaId(lon, lat)

就可以得出需求5的答案了。

 

五. 答疑环节

1,问:请问统计用户当前点击事件发生时间倒推过去一小时内的点击数,用什么窗口?比如用户在13:24发生的一个点击事件要统计12:24到13:24的点击数。

答:目前 Flink SQL 的窗口还不支持这种功能。 

2,问:一个表既是 source 表又充当 sink 表该怎么定义?

答:目前在 SQL client 里,如果一个表又是 source 又是 sink,在老版本里需要定义两遍,在新的版本里面 Type 等于both 可以定义这种类型。 

3,问:无限流聚合没有窗口函数,SQL client 中如何设置 State TTL?如果不能设置,如何解决?

答:SQL client 中的配置 execution的item,里面可以配置 min 和 ma x 的 idle-state 的时间,用这两个参数和用代码写是一样的效果。

4,问:Flink SQL 批处理数据采用什么方式定时调度

答:目前不支持定时调度。

5,问:多表进行关联是需要依次生成中间表关联还是直接多表关联

答:可以直接多表进行关联。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
157 15
|
2天前
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
43 0
|
2月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
71 2
|
2月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
46 1
|
6月前
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
583 2
|
6月前
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
681 6
|
6月前
|
SQL 消息中间件 Oracle
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
103 4
|
6月前
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
130 3

热门文章

最新文章

下一篇
无影云桌面