(2)滑动窗口(HOP)
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在SQL中通过调用HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。
(3)累积窗口(CUMULATE)
累积窗口是窗口TVF中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据 在SQL中可以用CUMULATE()函数来定义,具体如下:
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
注意第三个参数为步长step,第四个参数则是最大窗口长度。上面所有的语句只是定义了窗口,类似于DataStream API中的窗口分配器;在SQL中窗口的完整调用,还需要配合聚合操作和其它操作。
五、聚合(Aggregation)查询
Flink 中的SQL是流处理与标准SQL结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及SQL原生的聚合查询方式。
5.1 分组聚合
SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:
Table eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable");
而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。例如之前我们举的例子,可以按照用户名进行分组,统计每个用户点击url的次数:
SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user
这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。
分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。
5.2 窗口聚合
在Flink的Table API和SQL中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过GROUP BY子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组key的一部分定义出来。
在Flink 1.12版本之前,是直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”;而1.13版本开始使用了“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end。例如:
Table result = tableEnv.sqlQuery( "SELECT " + "user, " + "window_end AS endT, " + "COUNT(url) AS cnt " + "FROM TABLE( " + "TUMBLE( TABLE EventTable, " + "DESCRIPTOR(ts), " + "INTERVAL '1' HOUR)) " + "GROUP BY user, window_start, window_end " );
Flink SQL目前提供了滚动窗口TUMBLE()、滑动窗口HOP()和累积窗口(CUMULATE)三种表值函数(TVF)。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚合的完整代码,以累积窗口为例:
public class CumulateWindowExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源,并分配时间戳、生成水位线 SingleOutputStreamOperator<Event> eventStream = env .fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 1000L), new Event("Alice", "./prod?id=1", 25 * 60 * 1000L), new Event("Alice", "./prod?id=4", 55 * 60 * 1000L), new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); // 创建表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表,并指定时间属性 Table eventTable = tableEnv.fromDataStream( eventStream, $("user"), $("url"), $("timestamp").rowtime().as("ts") ); // 为方便在SQL中引用,在环境中注册表EventTable tableEnv.createTemporaryView("EventTable", eventTable); // 设置累积窗口,执行SQL统计查询 Table result = tableEnv .sqlQuery( "SELECT " + "user, " + "window_end AS endT, " + "COUNT(url) AS cnt " + "FROM TABLE( " + "CUMULATE( TABLE EventTable, " + // 定义累积窗口 "DESCRIPTOR(ts), " + "INTERVAL '30' MINUTE, " + "INTERVAL '1' HOUR)) " + "GROUP BY user, window_start, window_end " ); tableEnv.toDataStream(result).print(); env.execute(); } }
基于窗口的聚合,是流处理中聚合统计的一个特色,也是与标准SQL最大的不同之处。在实际项目中,很多统计指标其实都是基于时间窗口来进行计算的,所以窗口聚合是Flink SQL中非常重要的功能;基于窗口TVF的聚合未来也会有更多功能的扩展支持,比如窗口TOP-N、会话窗口、窗口联结等等。
5.3 开窗(Over)聚合
在标准SQL中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。
与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫作“OVER聚合”(Over Aggregation)。基本语法如下:
SELECT <聚合函数> OVER ( [PARTITION BY <字段1>[, <字段2>, ...]] ORDER BY <时间属性字段> <开窗范围>), ... FROM ...
这里OVER关键字前面是一个聚合函数,它会应用在后面OVER定义的窗口上。在OVER子句中主要有以下几个部分:
PARTITION BY(可选)
用来指定分区的键(key),类似于GROUP BY的分组,这部分是可选的;
ORDER BY
在OVER子句中必须用ORDER BY明确地指出数据基于那个字段排序。在Flink的流处理中,目前只支持按照时间属性的升序排列,所以这里ORDER BY后面的字段必须是定义好的时间属性。
开窗范围
由BETWEEN <下界> AND <上界> 定义,也就是“从下界到上界”的范围。目前支持的上界只能是CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为:
BETWEEN ... PRECEDING AND CURRENT ROW
开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。
范围间隔
范围间隔以RANGE为前缀,就是基于ORDER BY指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前1小时的数据:
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
行间隔
行间隔以ROWS为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的5行数据(最终聚合会包括当前行,所以一共6条数据):
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
下面是一个具体示例:
SELECT user, ts, COUNT(url) OVER ( PARTITION BY user ORDER BY ts RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) AS cnt FROM EventTable
开窗聚合与窗口聚合(窗口TVF聚合)本质上不同,不过也还是有一些相似之处的:它们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是“窗口”的思路。事实上,在Table API中确实就定义了两类窗口:分组窗口(GroupWindow)和开窗窗口(OverWindow);而在SQL中,也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:
SELECT user, ts, COUNT(url) OVER w AS cnt, MAX(CHAR_LENGTH(url)) OVER w AS max_url FROM EventTable WINDOW w AS ( PARTITION BY user ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
上面的SQL中定义了一个选取之前2行数据的OVER窗口,并重命名为w;接下来就可以基于它调用多个聚合函数,扩展出更多的列提取出来。
5.4 应用实例 —— TOP-N
目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法。下面是一个具体案例的代码实现。由于用户访问事件Event中没有商品相关信息,因此我们统计每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询。
public class WindowTopNExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源,并分配时间戳、生成水位线 SingleOutputStreamOperator<Event> eventStream = env .fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 1000L), new Event("Alice", "./prod?id=1", 25 * 60 * 1000L), new Event("Alice", "./prod?id=4", 55 * 60 * 1000L), new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); // 创建表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表,并指定时间属性 Table eventTable = tableEnv.fromDataStream( eventStream, $("user"), $("url"), $("timestamp").rowtime().as("ts") // 将timestamp指定为事件时间,并命名为ts ); // 为方便在SQL中引用,在环境中注册表EventTable tableEnv.createTemporaryView("EventTable", eventTable); // 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表 String subQuery = "SELECT window_start, window_end, user, COUNT(url) as cnt " + "FROM TABLE ( " + "TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) " + "GROUP BY window_start, window_end, user "; // 定义TOP-N的外层查询 String topNQuery = "SELECT * " + "FROM (" + "SELECT *, " + "ROW_NUMBER() OVER ( " + "PARTITION BY window_start, window_end " + "ORDER BY cnt desc " + ") AS row_num " + "FROM (" + subQuery + ")) " + "WHERE row_num <= 2"; // 执行SQL得到结果表 Table result = tableEnv.sqlQuery(topNQuery); tableEnv.toDataStream(result).print(); env.execute(); } }
六、联结(Join)查询
在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。
6.1 常规联结查询
常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。目前仅支持“等值条件”作为联结条件,也就是关键字ON后面必须是判断两表中字段相等的逻辑表达式。
1. 等值内联结(INNER Equi-JOIN)
内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。例如:
SELECT * FROM Order INNER JOIN Product ON Order.product_id = Product.id
2. 等值外联结(OUTER Equi-JOIN)
与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。具体用法如下:
SELECT * FROM Order LEFT JOIN Product ON Order.product_id = Product.id SELECT * FROM Order RIGHT JOIN Product ON Order.product_id = Product.id SELECT * FROM Order FULL OUTER JOIN Product ON Order.product_id = Product.id
这部分知识与标准SQL中是完全一样的。
6.2 间隔联结查询
我们曾经学习过DataStream API中的双流Join,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:
两表的联结
间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。
联结条件
联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。
时间间隔限制
我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用ltime和rtime表示左右表中的时间字段:
(1)ltime = rtime (2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE (3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
例如,我们现在除了订单表Order外,还有一个“发货表”Shipment,要求在收到订单后四个小时内发货。那么我们就可以用一个间隔联结查询,把所有订单与它对应的发货信息连接合并在一起返回。
SELECT * FROM Order o, Shipment s WHERE o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time