开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :客户端操作(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10035
客户端操作(二)
四、 基本用法
select 查询
Flink SQL> SELECT 'Hello World:
按Q”退出这个界面
打开ttp127.0.0.18081能看到这条 http://127.0.01:01 select 语句产生的查询任务已经结束了。这个查询采用
的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink.且只输出一条结果。
(1) explain
explain 命可以查看 SQL 的执行计划。
Flink SQL> explain SELECT name, COUNT()AS cnt FROM (VALUES (Bob"). ("Alice).
Greg"). (Bob') AS NameTable(name) GROUP BY name:
= Abstract Syntax Tree==
∥抽象语法树
LogicalAggregate(group=HO), cnt=[COUNTOD)
LogicalValues(tuples=[( _UTF-16LE'Bob ') {_UTF-16LE'Alice'), {_UTF-16LE'Greg').
(_UTF-16LE'Bob')
==Optimized Logical Plan==
优化后的逻辑执行计划
DataStreamGroupAggregate(groupBy=[ name], select=[name, COUNT(")AS cntl)
DataStreamValues(tuples=( UTF-16LE'Bob') _UTF-16LE'Alice').
_UTF-16LE'Greg') _UTF-16LE'Bob')'))
== Physical Execution Plan==
物理执行计划
Stage 3: Data Source
content collect elements with CollectionInputFormat
Stage 5:Operator
content groupBy: (name), select: (name, COUNT(") AS cnt)
ship_strategy: HASH
五、 结果展示
SOL Clent 支持两种模式来维护并展示查询结果
(1)table mode
在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命用 table
mode
SET execution.result-mode=table
(2).changelog mode
不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。
SET execution. result-mode=changelog
接下来通过实际的例子进行演示。
table mode
Flink SOL> SET execution.result-mode=table:
[INFO] Session property has been set.
Flink SQL> SELECT name, COUNT( )AS cnt FROM (VALUES ("Bob"), ("Alice). (Greg).
(Bob')AS NameTable(name)GROUP BY name;
(2)changlog mode
Flink SQL> SET execution. result-mode=changelog:
[INFO] Session property has been set
Flink SOL> SELECT name, COUNT()AS cnt FROM (VALUES ('Bob"). (Alice). (Greg).
(Bob')) AS NameTable(name) GROUP BY name:
(3)Environment Files
目前的 SQL Client 还不支持 DD 语句,只能通过 yaml 文件的方式来定义 SQL 查询需要的表,udf和运行参数等信息。
首先,准备 envyaml 和 input csv 两个文件。
flink-1.7.2 cat /tmp/env.yaml
tables:
-name: MyTableSource
type: source-table
update-mode: append
connector:
type: filesystem
path: "/tmp/input. csv"
result-mode: table
which table programs are submitted to.
deployment:
response-timeout: 5000
+flink-1.7.2 cat /tmp/input.csv
1,hello
皮
2.world
3.hello world
1.ok
3,bye bye
4. Yes
(4)启动 SQL Client
fink-1.7.2/bin/sgl--client.sh embedded-mp/envyam
No default environment specified.
Searching for
/Users/baoniu/Documents/work/tool/flink/fink-1.7.2/conf/sql-client-defaults.yamnl"..found
Reading default environment from:
file:
Flink SQL> describe MyView1;
root
-MyField1: Integer
Flink SQL> select"from MyTableSource;
使用 insert into写入结果表:
Flink SoL> insert into MyTableSink select from My TableSourcer
[INFO] Submitting SQL update statement to the cluster
[INFO] Table update statement has been successfully submitted to the cluster
Cluster ID: StandaloneClusterld
Job ID: 3fac2be1fd891e3e07595c684bb7b7a0
web interface: http://ocalhost:8081
查询生成的结果数据文件:
flink-1.7.2 cat /tmp/output. csv
1.hello
2.world
3,hello world
1.ok
3,bye bye
4.yes
也可以在 environment 件里面定义 udf,在 SQL Client 面通过 SHOW FUNCTIONSJ
查询和使用,这里就不再说明了。
SOL Client 功能社区还在开发中详见 FLIP-24
(5)Restful APl
下来们演示如何通过 rest api 来提交 jar 包和执行任务
更详的操作请参考 flink 的 restful api 文档:
bttos anache vrolectsifink flink-docs- stableimonitoring/rest api.html
4ink-1.7 2 curt htt:/127.0.0.: 8081/overview
Caskmanagers": 1. slots-total":,"slots-available:,"jobs-running": 3. "jobs-finished": 0."
cancelled 0. obs-tailed":0. "flink-version"."1.7.2", "flink-commit": "cebaBary
fink-1.7.2 POSTH"Expect:"-F
jartile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7. 1/examples/streaming/ TopSpe
edwindowing. jar" http://127.0.0.1:8081/jars/upload
filename"varr/folders/2b/r6d49pcs23z43b8fqs8885c0000gn/t/flin-web-124c4895-cfo
-4eec-8e15-8263d347efc2/flink-web-upload6077eca7-6db0-4570-a4d0-4c3e05a5dc59T
opSpeedWindowing. jar", "status": "success")%
flink-1.7.2 curl http:/127.0.0.1: 8081/jars
(address": "http://ocalhost:8081", "files":i("id"."
TopSpeedWindowing.jar,"name:"TopSpeedW.jar,uploaded:1553743438000
entry":["name": "org. apache. flink. streamingexamples. windowing. TopSpeedWindowing"."d
escription":) 1)%
flink-1.7.2 curl
htto1278081ars6077eca7-6db0 TopSpeedWindowin
g.jar/plan
planid:41029eb3feb9132619e454ec9b2a89fb,"name":"CarTopSpeedWindowingEx
ample,nodesid:obea66dec231edf33913ecd54406c1,"parallelism1.operator
,"operator_strategy".,"description". "Window(GlobalWindows(). Delta Trigger, TimeEvictor.
ComparableAggregator, PassThroughWindowF)->: Sink: Print to Std.
Out, inputs((num0id:cbc357ccb63d2852fee8c4fc7df2,ship_ strategy
:"HASH"
,"exchange" "pipelined_bounded"),"optimizer_":)."cbc357ccb763d(2852fe
e8c4fc7d5512"."parallelism": 1,"operator ." "operator_strategy".","description"."Source:
Custom Source->: Timestamps/Watermarks, "optimizer_properties": 0))%
flink-1..2 curl-X POST
http://127.0.0.1:8081/jars/6077eca7-6db0-4570_ TopSpeedWindowin
g.jar/run
(jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad")%
Restful API 还提供了很多监控和 Metrics 相关的功能,对任务提交的操作比如命行方式支
持的全面。
(6)Web
在 Flink Dashboard 页面左侧可以看到有个 ISubmit new Job 的地方。用户可以上传 jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。
六、示例 Flink 应用程序启动预提交阶段
但是,当进程具有外部状态时,需要作些额外的处理,外部状态通常以写入外部系统(如 aka)的形式出现,在这种情况下,为了提供 Exactly--onc 保证,外部系统必须支持事务,这样才能和两阶段提交协议集成。
在本示例中的数据需要写入 Kafka,因此数据输出端(Data Sink)有外部状态,在这种情况下,在预提交阶段,除了将其状态写入 state backend 之外,数据输出端还必须预先提交其外部事务。
当 Scheckpoint barrieroperator 在所有都传递了一遍,并且发的 chockpoint 调成功完成时,预提交经就结束了,所有触发的状态快照都破为该 checkpoint 一部分, checkpoin 是整个应用程序状态的快照,预先提交的外部状态,如果发生放障,我们可以回滚到上次成功完成快照的时间点。
下一步是通知所有 operator, checkpoint 已经成功了,这是两阶段提交协议的提交阶段, JobManagn 为应用程序中的每个 operatorcheckpoint 发出已完成的回调数据源和 widnow operator 没有外部状态,因此在提交阶段,这些 operator 不必执行任何操作,但是,数据出(Data Sink)拥有外部状态,此时应该提交外部事务。