待待深度探索 Flink SQL(一)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习待待深度探索 Flink SQL。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 待待深度探索 Flink SQL(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10041


待待深度探索 Flink SQL(一)

 

内容介绍:

一.New TableEnvironment

二.New Catalog & DDL

三.Blink Planner

四.Q&A

 

一.New TableEnvironment

1.TableEnvironment 整体设计

FLIP-32 中提出,将 Blink 完全开源,合并到 Flink 主分支中。

合并后在 Flink 1.9 中会存在两个 Planner:Flink Planner 和 Blink Planner。

这两个 planner 中有很大一部分的代码是共享的。

Flink table 是从 flink-libraries 这一二级目录移出来的,相当于在之前的版本中,Flink Table 在整个 Flink 中是一个二等公民

Flink SQL 越来越受重视,Flink Table 模块也因此被提升为一等公民。

而 Blink 在设计之初就考虑到流和批的统一,批只是流的一种特殊形式,所以可以用同一个 TableEnvironment 来表述流和批。

image.png

可以看出,TableEnvironment 组成部分如下:

flink-table-common:

这个包中主要是包含 Flink Planner 和 Blink Planner 一些共用的代码。

flink-table-api-java:

这部分是用户编程使用的 API,包含了大部分的 API。

flink-table-api-scala:

这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。

④两个 Bridge:

flink-table-api-scala-bridge 和 flink-table-api-java-bridge,从图中可以看出,Flink Planner 和 Blink Planner 都会依赖于具体的 JAVA API,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为 Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者 Data Set。 

2.新旧 TableEnvironment 对比

在 Flink 1.9 之前,原来的 Flink Table 模块,有 7 个 Environment,使用和维护上相对困难。

7 个 Environment 包括:StreamTableEnvironment、BatchTableEnvironment 两类,JAVA 和 Scala 分别 2 个,一共 4 个,加上 3 个父类,一共就是 7 个。 

在新的框架之下,社区希望流和批统一,因此对原来的设计进行精简。

首先,提供统一的 TableEnvironment,放在 flink-table-api-java 这个包中。

然后,在 Bridge 中,提供了两个用于衔接 Scala DataStream 和 Java DataStream StreamTableEnvironment。

最后,因为 Flink Planner 中还残存在着 toDataSet 类似的操作,所以,暂时保留 BatchTableEnvironment。

这样,目前一共是 5 个 TableEnvironment。

因为未来 Flink Planner 将会被移除,BatchTableEnvironment 就会被废弃,整个 TableEnvironment 的设计也会更加简洁明了。

3. 新 TableEnvironment 的应用

第一行,简单起见,在后续将新的 TableEnvironment 称为 UnifyTableEnvironment。

在 Blink 中,Batch 被认为是 Stream 的一个特例,因此 Blink 的 Batch 可以使用 UnifyTableEnvironment。

UnifyTableEnvironment 在 1.9 中有一些限制,比如它不能够注册 UDAF 和 UDTF,当前新的 Type System 的类型推导功能还没有完成(Java、Scala 的类型推导还没统一),所以这部分的功能暂时不支持。

此外,UnifyTableEnvironment 无法和 DataStream 和 DataSet 互转。

第二行,Stream TableEnvironment 支持转化成 DataStream,也可以注册 UDAF 和 UDTF。

如果是 JAVA 写的,就注册到 JAVA 的 StreamTableEnvironment,如果是用 Scala 写的,就注册到 Scala 的 StreamTableEnvironment。

注意,Blink Batch 作业不支持 Stream TableEnvironment ,因为目前 Batch 没法和 DataStream 互转,所以 toDataStream 这样的语义暂时不支持。

从图中也可以看出,目前Blink Batch只能使用 TableEnvironment。

最后一行,BatchTableEvironment 能够使用 toDataSet 转化为 DataSet。

从上图中,可以很清晰的看出各个 TableEnvironment 能够做什么事情,以及他们有哪些限制。

接下来,将使用示例对各种情况进行说明

示例1:Blink Batch

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv…

tEnv.execute(“job name”);

Blink Batch 只能使用 TableEnvironment(即UnifyTableEnvironment),代码中,首先需要创建一个 EnvironmentSetting,同时指定使用 Blink Planner,并且指定用 Batch 模式。

之所以需要指定 Blink Planner,是因为目前 Flink 1.9 中,将 Flink Planner 和 Blink Planner 的 jar 同时放在了 Flink 的 lib 目录下。

如果不指定使用的 Planner,整个框架并不知道需要使用哪个 Planner, 所以必须显示指定。

如果整个框架只有一个blink或者flink,这时候是可以不用加载的。因为通过服务发现的方式可以找到唯一一个planner直接work,如果找到多一个就可以直接去报错。

在 UnifyEnvironment 中,用户是无法获取到 ExecutionEnvironment 的,即用户无法在写完作业流程后,使用 executionEnvironment.execute 方法启动任务。需要显式的使用 tableEnvironment.execute 方法启动任务,这和之前的作业启动很不相同。

示例 2:Blink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamExecutionEnvironment execEnv = … StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings);

tEnv… 

Blink Stream 既可以使用 UnifyTableEnvironment,也可以使用 StreamTableEnvironment,与 Batch 模式基本类似,只是需要将 inBatchMode 换成 inStreamingMode。

示例 3:Flink Batch

ExecutionEnvironment execEnv= ...

BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);

tEnv...

与之前没有变化,不做过多介绍。

示例 4:Flink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv…

tEnv.execute(“job name”); 

Flink Stream 也是同时支持 UnifyEnvironment 和 StreamTableEnvironment,只是在指定 Planner 时,需要指定为 useOldPlanner,也即 Flink Planner。

因为未来 Flink Planner 会被移除,因此,特意起了一个 OlderPlanner 的名字,而且只能够使用 inStreamingMode,无法使用 inBatchMode。因为inBatchMode会通过BatchTableEnvironment创建。


二.New Catalog & DDL

1. 新 Catalog 设计

构建一个新的 Catalog API 主要是 FLIP-30 提出的,之前的 ExternalCatalog 将被废弃,Blink Planner 中已经不支持 ExternalCatalog,而Flink Planner 还支持 ExternalCatalog。 

下图是新 Catalog 的整体设计:

image.png

可以看到,新的 Catalog 有三层结构,最顶层是 Catalog 的名字,中间一层是 Database,最底层是各种 MetaObject,如 Table,Partition,Function 等。 

当前,内置了两个 Catalog 实现:MemoryCatalog 和 HiveCatalog。当然,用户也可以实现自己的 Catalog。

Catalog 能够做什么事情呢?

首先,它可以支持 Create,Drop,List,Alter,Exists 等语句,另外它也支持对 Database,Table,Partition,Function,Statistics 等的操作。基本上,常用的 SQL 语法都已经支持

CatalogManager 正如它名字一样,主要是用来管理 Catalog,且可以同时管理多个 Catalog。也就是说,可以通过在一个相同 SQL 中,跨 Catalog 做查询或者关联操作。

例如,支持对 A Hive Catalog 和 B Hive Catalog 做相互关联,这给 Flink 的query带来了很大的灵活性。

CatalogManager 支持的操作包括:

· 注册 Catalog(registerCatalog)

· 获取所有的 Catalog(getCatalogs)

· 获取特定的 Catalog(getCatalog)

· 获取当前的 Catalog(getCurrentCatalog)

· 设置当前的 Catalog(setCurrentCatalog)

· 获取当前的 Database(getCurrentDatabase)

· 设置当前的 Database(setCurrentDatabase)

Catalog 虽然设计了三层结构,但在使用的时候,并不需要完全指定三层结构的值,可以只写Table Name

这时候,系统会使用 getCurrentCatalog,getCurrentDatabase 获取到默认值,自动补齐三层结构,这种设计简化了对 Catalog 的使用。

如果需要switch默认的 Catalog,只需要调用 setCurrentCatalog 就可以了。

在 TableEnvironment 层,提供了操作 Catalog 的方法,例如:

· 注册 Catalog(registerCatalog)

· 列出所有的 Catalog(listCatalogs)

· 获取指定 Catalog(getCatalog)

· 使用某个 Catalog(useCatalog)

在 SQL Client 层,也做了一定的支持,但是功能有一定的限制。

用户不能够使用 Create 语句直接创建 Catalog,只能通过在 yaml 文件中,通过定义 Deion 的方式去描述 Catalog,然后在启动 SQL Client 的时候,通过传入 -e把yaml文件传给sqlClient,让它去加载。

目前 SQL Client 支持列出已定义的 Catalog,使用一个已经存在的 Catalog 等操作。

从纵向的角度看各个catalog在各个层级里面有哪些操作,可以帮助大家更好的认识和使用catalog。

2.DDL 设计与使用

有了 Catalog,就可以使用 DDL 来操作 Catalog 的内容,可以使用 TableEnvironment 的 sqlUpdate 方法执行 DDL 语句,也可以在 SQL Client 执行 DDL 语句。

sqlUpdate 方法中,支持 Create Table、Create View、Drop Table、Drop View 四个命令。

当然,inset into 这样的语句也是支持的。

下面分别对 4 个命令进行说明: 

Create Table:

可以显示的指定 Catalog Name 或者 DB Name,如果缺省,就按照用户设定的 Current Catalog补齐,然后可以指定字段名称,字段的说明也可以支持 Partition By 语法。 

最后是With 参数,用户可以在此处指定使用的 Connector,例如Kafka,CSV,HBase 等。With 参数需要配置一堆的属性值,可以从各个 Connector 的 Factory 定义中找到。Factory 中会指出有哪些必选属性,哪些可选属性值。 

需要注意的是,目前 DDL 中还不支持计算列和 Watermark 的定义,后续的版本中将会继续完善这部分。 

Create Table [[catalog_name.]db_name.]table_name(

a int comment 'column comment',

b bigint,

c varchar

)comment 'table comment'

[partitioned by(b)]

With(

update-mode='append',

connector.type='kafka',

... )

Create View

需要指定 View 的名字,然后紧跟着的是 SQL。View 将会存储在 Catalog 中。

CREATE VIEW view_name AS SELECT xxx

Drop Table&Drop View:

和标准 SQL 语法差不多,支持使用 IF EXISTS 语法,如果未加 IF EXISTS ,Drop 一个不存在的表,会抛出异常。

DROP TABLE

[IF EXISTS] [[catalog_name.]db_name.]table_name

SQL Client 中执行 DDL:

大部分都只支持查看操作,仅可以使用 Create View 和 Drop View。Catalog,Database,Table ,Function 这些只能做查看。 

用户可以在 SQL Client 中 Use 一个已经存在的 Catalog,修改一些属性,或者做 Deion,Explain 这样的一些操作。

CREATE VIEW

DROP VIEW

SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS

USE CATALOG xxx

SET xxx=yyy

DESCRIBE table_name

EXPLAIN SELECT xxx

DDL 部分,在 Flink 1.9 中其实基本已经成型,只是还有一些特性,在未来需要逐渐的完善

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
SQL 数据处理 流计算
实时计算 Flink版产品使用合集之sql真正的执行顺序是怎样的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 传感器 API
Flink(十四)【Flink SQL(中)查询】(1)
Flink(十四)【Flink SQL(中)查询】
|
5月前
|
SQL Java API
Flink(十四)【Flink SQL(中)查询】(3)
Flink(十四)【Flink SQL(中)查询】
|
5月前
|
SQL 存储 关系型数据库
Flink(十四)【Flink SQL(中)查询】(2)
Flink(十四)【Flink SQL(中)查询】
|
12月前
|
SQL 关系型数据库 MySQL
Flink教程(16)- Flink Table与SQL
Flink教程(16)- Flink Table与SQL
308 0
|
12月前
|
SQL 消息中间件 Kafka
Flink教程(17)- Flink Table与SQL(案例与SQL算子)
Flink教程(17)- Flink Table与SQL(案例与SQL算子)
202 0
|
SQL 监控 数据可视化
Flink SQL_Table 介绍与实战(一)|学习笔记
快速学习 Flink SQL_Table 介绍与实战
168 0
Flink SQL_Table 介绍与实战(一)|学习笔记
|
SQL 消息中间件 存储
Flink SQL_Table 介绍与实战(二)|学习笔记
快速学习 Flink SQL_Table 介绍与实战
251 0
Flink SQL_Table 介绍与实战(二)|学习笔记
|
SQL 缓存 算法
待待深度探索 Flink SQL(二)| 学习笔记
快速学习待待深度探索 Flink SQL。
待待深度探索 Flink SQL(二)| 学习笔记
|
SQL 消息中间件 JSON
Flink SQL 编程(二)| 学习笔记
快速学习 Flink SQL 编程。
 Flink SQL 编程(二)| 学习笔记
下一篇
无影云桌面