待待深度探索 Flink SQL(一)| 学习笔记

简介: 快速学习待待深度探索 Flink SQL。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 待待深度探索 Flink SQL(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10041


待待深度探索 Flink SQL(一)

 

内容介绍:

一.New TableEnvironment

二.New Catalog & DDL

三.Blink Planner

四.Q&A

 

一.New TableEnvironment

1.TableEnvironment 整体设计

FLIP-32 中提出,将 Blink 完全开源,合并到 Flink 主分支中。

合并后在 Flink 1.9 中会存在两个 Planner:Flink Planner 和 Blink Planner。

这两个 planner 中有很大一部分的代码是共享的。

Flink table 是从 flink-libraries 这一二级目录移出来的,相当于在之前的版本中,Flink Table 在整个 Flink 中是一个二等公民

Flink SQL 越来越受重视,Flink Table 模块也因此被提升为一等公民。

而 Blink 在设计之初就考虑到流和批的统一,批只是流的一种特殊形式,所以可以用同一个 TableEnvironment 来表述流和批。

image.png

可以看出,TableEnvironment 组成部分如下:

flink-table-common:

这个包中主要是包含 Flink Planner 和 Blink Planner 一些共用的代码。

flink-table-api-java:

这部分是用户编程使用的 API,包含了大部分的 API。

flink-table-api-scala:

这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。

④两个 Bridge:

flink-table-api-scala-bridge 和 flink-table-api-java-bridge,从图中可以看出,Flink Planner 和 Blink Planner 都会依赖于具体的 JAVA API,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为 Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者 Data Set。 

2.新旧 TableEnvironment 对比

在 Flink 1.9 之前,原来的 Flink Table 模块,有 7 个 Environment,使用和维护上相对困难。

7 个 Environment 包括:StreamTableEnvironment、BatchTableEnvironment 两类,JAVA 和 Scala 分别 2 个,一共 4 个,加上 3 个父类,一共就是 7 个。 

在新的框架之下,社区希望流和批统一,因此对原来的设计进行精简。

首先,提供统一的 TableEnvironment,放在 flink-table-api-java 这个包中。

然后,在 Bridge 中,提供了两个用于衔接 Scala DataStream 和 Java DataStream StreamTableEnvironment。

最后,因为 Flink Planner 中还残存在着 toDataSet 类似的操作,所以,暂时保留 BatchTableEnvironment。

这样,目前一共是 5 个 TableEnvironment。

因为未来 Flink Planner 将会被移除,BatchTableEnvironment 就会被废弃,整个 TableEnvironment 的设计也会更加简洁明了。

3. 新 TableEnvironment 的应用

第一行,简单起见,在后续将新的 TableEnvironment 称为 UnifyTableEnvironment。

在 Blink 中,Batch 被认为是 Stream 的一个特例,因此 Blink 的 Batch 可以使用 UnifyTableEnvironment。

UnifyTableEnvironment 在 1.9 中有一些限制,比如它不能够注册 UDAF 和 UDTF,当前新的 Type System 的类型推导功能还没有完成(Java、Scala 的类型推导还没统一),所以这部分的功能暂时不支持。

此外,UnifyTableEnvironment 无法和 DataStream 和 DataSet 互转。

第二行,Stream TableEnvironment 支持转化成 DataStream,也可以注册 UDAF 和 UDTF。

如果是 JAVA 写的,就注册到 JAVA 的 StreamTableEnvironment,如果是用 Scala 写的,就注册到 Scala 的 StreamTableEnvironment。

注意,Blink Batch 作业不支持 Stream TableEnvironment ,因为目前 Batch 没法和 DataStream 互转,所以 toDataStream 这样的语义暂时不支持。

从图中也可以看出,目前Blink Batch只能使用 TableEnvironment。

最后一行,BatchTableEvironment 能够使用 toDataSet 转化为 DataSet。

从上图中,可以很清晰的看出各个 TableEnvironment 能够做什么事情,以及他们有哪些限制。

接下来,将使用示例对各种情况进行说明

示例1:Blink Batch

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv…

tEnv.execute(“job name”);

Blink Batch 只能使用 TableEnvironment(即UnifyTableEnvironment),代码中,首先需要创建一个 EnvironmentSetting,同时指定使用 Blink Planner,并且指定用 Batch 模式。

之所以需要指定 Blink Planner,是因为目前 Flink 1.9 中,将 Flink Planner 和 Blink Planner 的 jar 同时放在了 Flink 的 lib 目录下。

如果不指定使用的 Planner,整个框架并不知道需要使用哪个 Planner, 所以必须显示指定。

如果整个框架只有一个blink或者flink,这时候是可以不用加载的。因为通过服务发现的方式可以找到唯一一个planner直接work,如果找到多一个就可以直接去报错。

在 UnifyEnvironment 中,用户是无法获取到 ExecutionEnvironment 的,即用户无法在写完作业流程后,使用 executionEnvironment.execute 方法启动任务。需要显式的使用 tableEnvironment.execute 方法启动任务,这和之前的作业启动很不相同。

示例 2:Blink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamExecutionEnvironment execEnv = … StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings);

tEnv… 

Blink Stream 既可以使用 UnifyTableEnvironment,也可以使用 StreamTableEnvironment,与 Batch 模式基本类似,只是需要将 inBatchMode 换成 inStreamingMode。

示例 3:Flink Batch

ExecutionEnvironment execEnv= ...

BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);

tEnv...

与之前没有变化,不做过多介绍。

示例 4:Flink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();

TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv…

tEnv.execute(“job name”); 

Flink Stream 也是同时支持 UnifyEnvironment 和 StreamTableEnvironment,只是在指定 Planner 时,需要指定为 useOldPlanner,也即 Flink Planner。

因为未来 Flink Planner 会被移除,因此,特意起了一个 OlderPlanner 的名字,而且只能够使用 inStreamingMode,无法使用 inBatchMode。因为inBatchMode会通过BatchTableEnvironment创建。


二.New Catalog & DDL

1. 新 Catalog 设计

构建一个新的 Catalog API 主要是 FLIP-30 提出的,之前的 ExternalCatalog 将被废弃,Blink Planner 中已经不支持 ExternalCatalog,而Flink Planner 还支持 ExternalCatalog。 

下图是新 Catalog 的整体设计:

image.png

可以看到,新的 Catalog 有三层结构,最顶层是 Catalog 的名字,中间一层是 Database,最底层是各种 MetaObject,如 Table,Partition,Function 等。 

当前,内置了两个 Catalog 实现:MemoryCatalog 和 HiveCatalog。当然,用户也可以实现自己的 Catalog。

Catalog 能够做什么事情呢?

首先,它可以支持 Create,Drop,List,Alter,Exists 等语句,另外它也支持对 Database,Table,Partition,Function,Statistics 等的操作。基本上,常用的 SQL 语法都已经支持

CatalogManager 正如它名字一样,主要是用来管理 Catalog,且可以同时管理多个 Catalog。也就是说,可以通过在一个相同 SQL 中,跨 Catalog 做查询或者关联操作。

例如,支持对 A Hive Catalog 和 B Hive Catalog 做相互关联,这给 Flink 的query带来了很大的灵活性。

CatalogManager 支持的操作包括:

· 注册 Catalog(registerCatalog)

· 获取所有的 Catalog(getCatalogs)

· 获取特定的 Catalog(getCatalog)

· 获取当前的 Catalog(getCurrentCatalog)

· 设置当前的 Catalog(setCurrentCatalog)

· 获取当前的 Database(getCurrentDatabase)

· 设置当前的 Database(setCurrentDatabase)

Catalog 虽然设计了三层结构,但在使用的时候,并不需要完全指定三层结构的值,可以只写Table Name

这时候,系统会使用 getCurrentCatalog,getCurrentDatabase 获取到默认值,自动补齐三层结构,这种设计简化了对 Catalog 的使用。

如果需要switch默认的 Catalog,只需要调用 setCurrentCatalog 就可以了。

在 TableEnvironment 层,提供了操作 Catalog 的方法,例如:

· 注册 Catalog(registerCatalog)

· 列出所有的 Catalog(listCatalogs)

· 获取指定 Catalog(getCatalog)

· 使用某个 Catalog(useCatalog)

在 SQL Client 层,也做了一定的支持,但是功能有一定的限制。

用户不能够使用 Create 语句直接创建 Catalog,只能通过在 yaml 文件中,通过定义 Deion 的方式去描述 Catalog,然后在启动 SQL Client 的时候,通过传入 -e把yaml文件传给sqlClient,让它去加载。

目前 SQL Client 支持列出已定义的 Catalog,使用一个已经存在的 Catalog 等操作。

从纵向的角度看各个catalog在各个层级里面有哪些操作,可以帮助大家更好的认识和使用catalog。

2.DDL 设计与使用

有了 Catalog,就可以使用 DDL 来操作 Catalog 的内容,可以使用 TableEnvironment 的 sqlUpdate 方法执行 DDL 语句,也可以在 SQL Client 执行 DDL 语句。

sqlUpdate 方法中,支持 Create Table、Create View、Drop Table、Drop View 四个命令。

当然,inset into 这样的语句也是支持的。

下面分别对 4 个命令进行说明: 

Create Table:

可以显示的指定 Catalog Name 或者 DB Name,如果缺省,就按照用户设定的 Current Catalog补齐,然后可以指定字段名称,字段的说明也可以支持 Partition By 语法。 

最后是With 参数,用户可以在此处指定使用的 Connector,例如Kafka,CSV,HBase 等。With 参数需要配置一堆的属性值,可以从各个 Connector 的 Factory 定义中找到。Factory 中会指出有哪些必选属性,哪些可选属性值。 

需要注意的是,目前 DDL 中还不支持计算列和 Watermark 的定义,后续的版本中将会继续完善这部分。 

Create Table [[catalog_name.]db_name.]table_name(

a int comment 'column comment',

b bigint,

c varchar

)comment 'table comment'

[partitioned by(b)]

With(

update-mode='append',

connector.type='kafka',

... )

Create View

需要指定 View 的名字,然后紧跟着的是 SQL。View 将会存储在 Catalog 中。

CREATE VIEW view_name AS SELECT xxx

Drop Table&Drop View:

和标准 SQL 语法差不多,支持使用 IF EXISTS 语法,如果未加 IF EXISTS ,Drop 一个不存在的表,会抛出异常。

DROP TABLE

[IF EXISTS] [[catalog_name.]db_name.]table_name

SQL Client 中执行 DDL:

大部分都只支持查看操作,仅可以使用 Create View 和 Drop View。Catalog,Database,Table ,Function 这些只能做查看。 

用户可以在 SQL Client 中 Use 一个已经存在的 Catalog,修改一些属性,或者做 Deion,Explain 这样的一些操作。

CREATE VIEW

DROP VIEW

SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS

USE CATALOG xxx

SET xxx=yyy

DESCRIBE table_name

EXPLAIN SELECT xxx

DDL 部分,在 Flink 1.9 中其实基本已经成型,只是还有一些特性,在未来需要逐渐的完善

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 关系型数据库 HIVE
hive与postgresql 之爆炸函数
hive与postgresql 之爆炸函数
|
7月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1501 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
9月前
|
存储 数据采集 数据处理
【数据结构进阶】位图
位图是一种高效的数据结构,通过二进制的0和1表示数据的存在状态,适用于海量数据的压缩存储与快速检索。本文从概念、实现到应用场景全面解析位图。核心思想是将数据映射到位图的比特位,利用位运算实现O(1)时间复杂度的增删查操作。文章通过C++代码示例展示了位图的三大接口(set、unset、test)实现,并对比自定义位图与标准库`bitset`的异同。位图优点在于极高的时间和空间效率,但仅适用于整型数据。它为布隆过滤器等高级结构奠定了基础,在数据处理领域具有重要价值。
648 1
|
10月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
848 7
Flink Materialized Table:构建流批一体 ETL
|
10月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1463 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
10月前
|
存储 关系型数据库 MySQL
Flink基于Paimon的实时湖仓解决方案的演进
本文整理自阿里云智能集团苏轩楠老师在Flink Forward Asia 2024论坛的分享,涵盖流式湖仓架构的背景介绍、技术演进和未来发展规划。背景部分介绍了ODS、DWD、DWS三层数据架构及关键组件Flink与Paimon的作用;技术演进讨论了全量与增量数据处理优化、宽表构建及Compaction操作的改进;发展规划则展望了Range Partition、Materialized Table等新功能的应用前景。通过这些优化,系统不仅简化了复杂度,还提升了实时与离线处理的灵活性和效率。
903 3
Flink基于Paimon的实时湖仓解决方案的演进
|
存储 测试技术 Apache
阿里云实时计算企业级状态存储引擎 Gemini 技术解读
本文整理自阿里云 Flink 存储引擎团队李晋忠,兰兆千,梅源关于阿里云实时计算企业级状态存储引擎 Gemini 的研究。
127155 4
阿里云实时计算企业级状态存储引擎 Gemini 技术解读
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1429 27
|
SQL 关系型数据库 MySQL
flink cdc 同步问题之如何提高用户速度
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
存储 SQL 分布式计算
浅谈MPP架构
浅谈MPP架构