开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Flink Table API 编程】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10033
Flink Table API 编程
目录
一、什么是 Table API
二、Table API 编程
三、Table API 动态
一、 什么是 Table API
(1) Flink API 总览
由上到下,表达能力增强,可以用来操作 Timer 等复杂的操作。
Date Stream 提供一些 Window。SQL 和 Table API 使用起来是最便捷的,它自身有很多特点。
1. 声明式-用户只关心做什么,不用关心怎么做
2.高性能-支持查询优化,可以获取更好的执行性能
只需要描述做一些聚合的操作,不需要关心底层怎么实现。Table 和底层有一个优化器,例如:写了一个聚合操作,写了两个 Count,多写了一个 Count,查询优化器在优化的时候就会发现重复的聚合操作,在最终执行的时候,只会用到一个聚合操作。输出的时候把相同的一个值输出两次即可,程序的性能会更好。
流批统-相同的统计逻辑,既可以流模式运行,也可以批模式运行,方便开发
4.标准稳定-语义遵循 SL 标准,不易变动。不用考虑 API 兼容问题。
5.易理解一语义明确,所见即所得。很清晰知道背后做 Count 操作。
Table API:
tab.groupBy("word")
.select("word, count(1)as count")
SQL:
SELECT word, COUNT(")AS cnt
FROM MyTable
GROUP BY word
(2) Table API 的特性
1.Table API 使得多声明的数据处理写起来比较容易。
比如做一些过滤的操作,把 a<10 的数据筛选出来,然后写到一个外部表里,a>10 进行过滤,插入到另外一张表。这种数据处理很简单,用了两行就可以把逻辑写出来。
2.Table 使得扩展标准 SQL 更为容易(当直仅当需要的时候)
扩展 SQL 并不是随意扩展,需要考虑标准 SQL 的语义,不能乱写,慎重采取,当且仅当需要的时候。
总的来说,Table API 和 SQL 比较,Table API 是 SQL 的一个抄写。SQL 里面的操作 Table API 里面也有,他是 Flink 自身的一套 API,所以可以有一些易用性和功能性的提升。
二、 Table API 编程
(1) WordCount示例
https: //github.com/hequn8128/TableApiDemo
impart org. apache. flink. api. commontypeinfo. Types;
import org. apache. flink. api. Java. ExecutionEnvironnent;
inport oro. apache. funk. table. ap1. Table:
import org. apache. flink. table. api. java.
BatchTablcEnvironnent;
import org. apache. flink. tabledescriptors. FileSystem;
inport org. apache. flink, table. descriptor.oldcsv;
Amport oro. apache. flink, table. descriptors. Schema;
import org. apache. flink. types. Row;
publit class JavaBatchwordCount
public static veid main(String [ args) throws Exception
ExecutionEnvironment env= Executio. getExecutionEnviromm
BatchTableenvironment tEnv= BatchTableEn. createenv
String path= JavaBatchkordCount. class. getClassLoader(). getResource(
tEny. connect(new Filesystem() path(path))
1thFormat( Oldcsv() field( fieldName "word", Types. STRING).
withschema(new Schema() ficld( fieldName: "word", Types. STRING))
registerTableSource( name: "fileSource"):
Table result tEnv. scan( strings: "fileSource")
groupBy( $ "word")
select("word, count(1) as count");
tEnv. toDataSet(result, Row. class).print()
第十五,十六行是对 environment 的初始化,一开始会返回一个执行的环节,拿到执行环节后,再用 Batch Tab environment,都是 java 版本。
一开始初学者可能会调入坑中,目前的 environment 很多。
batch
a. java
org.apache flink.api.java. ExecutionEnvironment
b. scala
org.apache. flink.table.api.java. Batch TableEnvironment
stream
a.java
org.apache.flink.api.scala. ExecutionEnvironment
orgapache. flink.table.api.scala. BatchTableE
b.scala
org. apache. flink. streaming. api.environment. StreamExecutionEnvironment
org.apache. flink.table.api.java. Stream
org. apache. flink. streaming. api.scala .StreamExecutionEnvironment
org.apache.flink.table.api.scala.StreamTable Environment
大致概括了一下,进行分类,每一种都有自己特有的 Environment。使用过程中,注意不要写错。而且对于流和批来说,一定区分开。
可以用一个 Environment,用户操作多个容易搞错。
Table Environment 使用优化
https: //mal. google. com/mail/u/o/?tab searchAabel%3Aflink-dev+table+ environment/ FMfcgxvz MBlhTWVjxlzCnVZLvvDkmph
https: //cwiki apache org/confluence/d
.32%3A+Restructure+flink-table+ for+ future+ contributions.
拿到 environment 后,注册 keep source。首先指定一个文件的路径,指定格式,指定文件对应的 schema。简单的例子,只有单词一列,类型是 STRING 类型。定义后,把 source 注册到 environment 里面去。
18~22行把 source 注册好了,可以通过 environment 的 schema 的 file source 来拿到 Table,拿到 Table 对象之后,可以执行 Table 的一些操作。
把 Table 输出,输出成 Dateset。到这里就是完整的例子。
(2)Table API 操作
1. how to get a Table?
可以认为 Table 是从 Table environment 里面 scan 出来的,scan 里的 Table 又是注册进去的。注册 Table 方式有三种。
Table myTable tableEnvironment.scan(MyTable")
a.Table descriptor
tEnv
.connect(
new FileSystem()
.path(path))
.with Format(
new OldCsv()
field("word", Types. STRING)
lineDelimiter("\n"))
.withSchema(
new Schema(
.field("word", Types.STRING)
register TableSource("source Table)
指定格式和 schema,注册到 environment 里,如果需要自定义的 source,可以根据 Table source 的接口写自己的自定义的 Table source,然后可以用 Table environment 的 Table source 把自己的 Table source 注册到 environment 里,然后 scan 出来。
b. User defined table source
Tablesource csvSource new Csv TableSource(
path,
new StringU("word"}
new Typelnformation [KTypes.STRING))
tEnv.registerTableSource("source Table2", csvSource
c. Register a DataStream
Datastreamsstring stream
/register the DataStream as table "myTable3"with
fields"word
tableEmv
registerDataStream("my Table3", stream, "word"):
命名 my Table3,对应的只有一列 word。
所以这三种方式都可以注册,就可以拿到 Table。也可以用这三种方式输出 Table。
2. how to emit a Table
当拿到结果表的时候,result Table 是 Table 的类型。执行 insertInto,跟 SQL 很相似,所以他的 API 都是根据标准的 SQL 来定义的。insertInto 到一个目标表里去。以下三种方式和注册 Table 大致是一样的。
resultTable. insertInto( "TargetTable");
a.Table descriptor tEnv
.connect(
new FileSystem()
.path(path))
.with Format(
new OldCsv()
field("word", Types. STRING)
.lineDelimiter("\n"))
.withSchema(
new Schema()
field("word", Types.STRING))
.registerTableSink("targetTable");
b.User defined table sink
TableSink csvSink new CsvTableSink(
path,
new StringU("word")
new Typelnformation[](Types. STRING);
tEnv.registerTable Sink("sinkTable2", csvSink);
c. emit to a DataStream
emit the result table to a Datastream
DataStream<Tuple2<Boolean, Row>> stream
tableEnv
toRetractStream(result Table, Row.class),
Boolean可以表明是一个ant消息或者是delete消息。
3. how to query Table
Table
-select
-as
-filter
-where
-groupBy
-distinct
-join(inner, left, right, full)
- joinLateral(inner, left)
-minus
-minusAll
-union
-unionAll
-window(OverWindow)
-window(GroupWindow)
-dropColumns
-map/flatmap
Table 是有很多操作,这些操作跟标准 SQL 定义是一样的。除此之外,做 group By 的时候生成 GroupdTable,它只会有 select 的操作又会转成 Table,然后在调用这些操作。
对于 over WindowTable,只会有 select 的操作,又会转成 Table。Group Window Table 然后做 group By 得到 Window GroupTable,然后得到 select 操作,最后转成 Table。
可以大致看下 Flink 代码。
Flink 里面可以找到 Flink 的类,这个类里面定义了很多方法。比如 Group by 一些字段,返回 GroupTable,可以看到 Group Table 里面只有 select 的操作。select 之后返回的是一个 Table。
一个 Table 之外为什么还需要很多T able?
拥有隐身 Table,保证 API 操作的便利性,如果只有一个 Table,Groupby 返回的还是一个 Table,再继续 groupby,再定义一个 groupBy,就没办法回去了。更方便用户写出正确的 API。
API 操作可以分类。
a. 和 SQL 对齐的操作
SQL 操作比较熟悉,下次课会讲。
b. 提升 Table API 易用性的操作
Columns Operation-易用性
假设有一张100列的表,我们需要去掉一列,需要怎么操作?
Operators |
exemples |
AddColumns |
Table orders= tableEnv.scan("Orders"); Table result =orders. addColumns ("concat(c, 'sunny') as desc"); |
AddOrReplaceColumns |
Table orders =tableEnv.scan("Orders"); Table result= orders. addOrReplaceColumns("concat(c, 'sunny'") as desc"); |
DropColumns |
Table orders =tableEnv.scan("Orders"); Table result= orders. dropColumns("b, c"); |
RenameColumns |
Table orders =tableEnv.scan("Orders"); Table result =orders. renameColumns("b as b2, c as c2"); |
只用一个新的 API,比如 dropcolumns。scan 出 order 一个表,把里面 b, c 两列去掉,就不需要把其他的列选出来了。
对于 AddColumns 来说,可以添加一个新的列,比如叫 desc,它有一个要求,不能跟之前原有的列重合。原先order里面也有一个 desc 的列,就会错误。
如果业务上先加的列覆盖原有的列,用 AddOrReplaceColumns 操作,如果 orders 表里面也有这个列的话,会自动覆盖。
RenameColumns 希望给某些列重命名。把 b 叫做 b2,c 叫做 c2,所以 Table 上 Columns Operation 提升易用性。
假设有一张100列的表,我们需要选择第20第80列,需要怎么操作?
SYNTAX |
DESC |
withColumns(...) |
select the specified columns
|
withoutColumns(...) |
deselect the columns specified |
一个是选择,另一个是反选,就比如刚才选择20-80列可以用 withColumns。
假设一个圆表里面又 abcde,selectwithColumns(2 to4),可以得到bcd三列。如果用 withoutColumns,得到 ae。
Syntax
The proposed column operation syntax is as follows:
columnOperation:
withColumns(columnExprs)/ withoutColumns(columnExprs)
columnExprs
columnExpr.[ columnExpr]"
columnExpr:
columnRef |columnindex to columnIndex |columnName to columnName
columnRef:
columnName(The field name that exists in the table) columnindex(a positive
integer starting at 1)
Example: withColumns(a, b, 2 to 10, w to 2)
可以是 name 也可以是 name 范围。
总结
API |
Example |
Columns Operation |
table.dropColumns('a,b)
|
Columns Function |
table.select(withColumns('a, 1 to 10)
|
Row-based operation
Map operation-易用性
Method signature
方法签名
del map(scalarFunction: Expression): Table
Usage
用法
val res lab
.map(fun().as(a, b, 'c)
.select('a,)
Benefit
好处
table.select((), udf20), udf3()....)
VS
table.map(udt())
class MyMap extends ScalarFunction
var param: String=
def evaluser defined inputs)row={
val result new Row(3)
//Business processing based on data and parameters
//根据数据和参数进行业务处理
result
override def get ResultType(signature: Array[Class_]):
TypelnformationL]={
Types. ROW(Types.5TRING, Types.INT, Types.LONG)
eval 接受一些参数,并输入,进行业务逻辑的处理,再把最终的结果返回出来。
getResult Type 的方法根据结果指定类型。row 里面有三列,三个类型。
拿到一个 tab,map 里面可以看方法签名,传入一个 scala Function。返回三列,名字 abc,可以做接下来的操作,比如 select。如果列很多的时候,每一列都要进行 udf,并且返回一个结果的时候,如果不用 map,用 scala,需要把每一列写出来。现在直接点 map 和 udf。
需要说明,map 和 Date map 差不多,输入一条输出一条,类似映射的关系。是不是有其他的算子,输入一条,输出多条。
Method signature
方法签名
def flatMap( tableFunction: Expression): Table
Usage
用法
val res tab
.flatMap(fun('e,' f).as(name,'age)
.select('name,'age)
Benefit
好处
table.joinLateral(udtf) VS table. flatMap(udtf())
case class User(name: String, age: Int)
class MyFlatMap extends TableFunction[User)
def eval([user defined inputs]): Unit=
for(..)
collect(User(name, age))
))
传入的是一个 Table Function,实现的例子里继承的是 Table Function,跟刚才的 scala Function 有点类似。但是这里没有 gapresulttap,因为这里返回值的类型是 u 的类型,Flink 可以自动分析出它的类型,名字也可以分析出来,这样就不用定义 gapresulrta p。
定义要 Table Function,点 flatmap 传入一个 Table Function,可以直接 select 出来。
Aggregate operation-易用性
Method signature
方法签名
def aggregate(aggregate Function): Expression: AggregatedTable
class AggregatedTable(table: Table, groupKeys Seq[Expression], aggFunction: Expression)
Usage
用法
val res= tab
.groupBy('a)
.aggregate(agg('e,' f) as ('a, 'b, 'c))
.select('a,'c)
Benefit
好处
table.select(agg1(), agg20), agg30)...)
VS
table.aggregate(agg())
class CountAccumulator extends JTuple1(Long)
f0 OL //count
class CountAgg extends AggregateFunction[JLong, CountAccumulator]
def accumulate(acc: CountAccumulator): Unit={
acc.f0+=1L
override def getValue(acc: CountAccumulator): JLong={
acc.fo
INPUT(Row) OUTPUT(Row)
. retract()/merge()
N(>=0)
}
方法签名接受一个 aggregate Function,举了一个 Count 例子,首先需要定义 Accumulator,累加器。一开始初始化为0,定义好之后定义 Count 聚合函数。
有两个方法,一个是 accumulator 怎么计算,对 Count 来说加一,最终 getvalue 把结果返回。
用法上tab点group by,aggregate把Function传进来,重命名结果,最后进行select操作。
aggregate Function可以接受多行输出一行。
有没有操作输入多行同时输出多行呢?
通常 groupby t 之后,求 topn,就是这种聚合操作。这个操作提供了新的 operation,类似于 flat aggregate。
c.增强 Table API 功能性的操作
FlatAggregate operation–功能性
Method signature
方法签名
def flatAggregate(tableAggregateFounction Expression): FlatAggregateTable
class FlatAggregateTable(table: Table, groupKey: SeqlExpression], tableAggFun: Expression)
Usage
用法
val res =tab
groupBy('a)
flatAggregate( flatAggFunc('e,' f) as ('a, 'b, c))
.select(a, c)
Benefit
好处
功能性提升。如果不增加这种 Operation,其他没有这种语义,没办法输入多行聚合输出多行,所以是一种功能性填充。
新增了一种 agg,能输出多行
class TopNAcc{
var data: MapView[JInt, JLong] =_/ (rank-> value),
class TopN(n: Int) extends TableAggregateFunction[(, Long),
TopNAccum]
def accumulate(acc: TopNAcc, [user defined inputs])
def emitValue(acc: TopNAcc, out: Collector[(Int, Long)): Unit={
INPUT(Row) OUTPUT(Row)
..retract/merge
它的参数需要 tableAggregateFounction,与前面有很大的相似之处,先定义一个 topNacc,然后里面有一个accumulator 处理方法,不断地来,不断进行操作。
区别 emitvalue 的方法,可以拿到 collector,之后多次输出结果。
Aggregate VS TableAggregate
用 Aggregate 做一个 max 的操作,然后用 TableAggregate 做 Top2 的操作。这里有个输入表,对 price 求最大值,或者求 top2。
看一下 max 操作,第一步 creatAccumulator,创建累加器,第二步在累加器上进行 Accumulate 操作,比如数据依次输入到 agg 里。
第三步 getValue,得到最后结果
总的区别就是getValue和emitvalue。
总结
|
Single Row Input 单行输入 |
Multiple Row Input 多行输入 |
Single Row Output 单行输出 |
ScalarFunction (select/map) |
AggregateFunction (select/aggregate) |
Multiple Row output 多行输出 |
TableFunction oinLateral/flatmap) |
TableAggregateFunction (flatAggregate) |
三、 Table API 动态
3.1 Flip29
https: //issues. apache. org/iira/browse/flink-10972
3.2 Python Table API
https: //issues apache.org/iira/browse/flink-12308
3.3 Interactive Programming(交互式编程)
https: //issues. apache org/iira/browse/flink-11199
交互式编程提供一个 catch 的算子,Table 点 catch 的操作,把之前操作保存。可以便捷操作 API,提高程序性能。
3.4 Iterative processing(迭代计算)
https://issues.apache.org/iira/browse/FLINK-11199
在机器学习上用的较多。