客户端操作(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习客户端操作。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 客户端操作(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10035


客户端操作(二)


四、 基本用法

select 查询

Flink SQL> SELECT 'Hello World:

按Q”退出这个界面

打开ttp127.0.0.18081能看到这条 http://127.0.01:01 select 语句产生的查询任务已经结束了。这个查询采用

的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink.且只输出一条结果。

(1) explain

explain 命可以查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT()AS cnt FROM (VALUES (Bob"). ("Alice).

Greg"). (Bob') AS NameTable(name) GROUP BY name:

= Abstract Syntax Tree==抽象语法树

LogicalAggregate(group=HO), cnt=[COUNTOD)

LogicalValues(tuples=[( _UTF-16LE'Bob ') {_UTF-16LE'Alice'), {_UTF-16LE'Greg').

(_UTF-16LE'Bob')

==Optimized Logical Plan==优化后的逻辑执行计划

DataStreamGroupAggregate(groupBy=[ name], select=[name, COUNT(")AS cntl)

DataStreamValues(tuples=( UTF-16LE'Bob') _UTF-16LE'Alice').

_UTF-16LE'Greg') _UTF-16LE'Bob')'))

== Physical Execution Plan==物理执行计划

Stage 3: Data Source

content collect elements with CollectionInputFormat

Stage 5:Operator

content groupBy: (name), select: (name, COUNT(") AS cnt)

ship_strategy: HASH

 

五、 结果展示

SOL Clent 支持两种模式来维护并展示查询结果

(1)table mode

在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命用 table

mode

SET execution.result-mode=table

(2).changelog mode

不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。

SET execution. result-mode=changelog

接下来通过实际的例子进行演示。

table mode

Flink SOL> SET execution.result-mode=table:

[INFO] Session property has been set.

Flink SQL> SELECT name, COUNT( )AS cnt FROM (VALUES ("Bob"), ("Alice). (Greg).

(Bob')AS NameTable(name)GROUP BY name;

(2)changlog mode

Flink SQL> SET execution. result-mode=changelog:

[INFO] Session property has been set

Flink SOL> SELECT name, COUNT()AS cnt FROM (VALUES ('Bob"). (Alice). (Greg).

(Bob')) AS NameTable(name) GROUP BY name:

(3)Environment Files

目前的 SQL Client 还不支持 DD 语句只能通过 yaml 文件的方式来定义 SQL 查询需要的表udf和运行参数等信息。

首先准备 envyaml 和 input csv 两个文件。

flink-1.7.2 cat /tmp/env.yaml

tables:

-name: MyTableSource

type: source-table

update-mode: append

connector:

type: filesystem

path: "/tmp/input. csv"

result-mode: table 

which table programs are submitted to.

deployment:

response-timeout: 5000

+flink-1.7.2 cat /tmp/input.csv

1,hello

2.world

3.hello world

1.ok

3,bye bye

4. Yes

(4)启动 SQL Client

fink-1.7.2/bin/sgl--client.sh embedded-mp/envyam

No default environment specified.

Searching for

/Users/baoniu/Documents/work/tool/flink/fink-1.7.2/conf/sql-client-defaults.yamnl"..found

Reading default environment from:

file:

Flink SQL> describe MyView1;

root

-MyField1: Integer

Flink SQL> select"from MyTableSource; 

使用 insert into写入结果表:

Flink SoL> insert into MyTableSink select from My TableSourcer

[INFO] Submitting SQL update statement to the cluster

[INFO] Table update statement has been successfully submitted to the cluster

Cluster ID: StandaloneClusterld

Job ID: 3fac2be1fd891e3e07595c684bb7b7a0

web interface: http://ocalhost:8081

查询生成的结果数据文件:

flink-1.7.2 cat /tmp/output. csv

1.hello

2.world

3,hello world

1.ok

3,bye bye

4.yes

也可以在 environment 件里面定义 udf,在  SQL Client 面通过 SHOW FUNCTIONSJ

查询和使用,这里就不再说明了。

SOL Client 功能社区还在开发中详见 FLIP-24

(5)Restful APl

下来们演示如何通过 rest api 来提交 jar 包和执行任务

更详的操作请参考 flink 的 restful api 文档:

bttos anache vrolectsifink flink-docs- stableimonitoring/rest api.html

4ink-1.7 2 curt htt:/127.0.0.: 8081/overview

Caskmanagers": 1. slots-total":,"slots-available:,"jobs-running": 3. "jobs-finished": 0."

cancelled 0. obs-tailed":0. "flink-version"."1.7.2", "flink-commit": "cebaBary

fink-1.7.2 POSTH"Expect:"-F

jartile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7. 1/examples/streaming/ TopSpe

edwindowing. jar" http://127.0.0.1:8081/jars/upload

filename"varr/folders/2b/r6d49pcs23z43b8fqs8885c0000gn/t/flin-web-124c4895-cfo

-4eec-8e15-8263d347efc2/flink-web-upload6077eca7-6db0-4570-a4d0-4c3e05a5dc59T

opSpeedWindowing. jar", "status": "success")%

flink-1.7.2 curl http:/127.0.0.1: 8081/jars

(address": "http://ocalhost:8081", "files":i("id"."TopSpeedWindowing.jar,"name:"TopSpeedW.jar,uploaded:1553743438000

entry":["name": "org. apache. flink. streamingexamples. windowing. TopSpeedWindowing"."d

escription":) 1)%

flink-1.7.2 curl

htto1278081ars6077eca7-6db0 TopSpeedWindowin

g.jar/planplanid:41029eb3feb9132619e454ec9b2a89fb,"name":"CarTopSpeedWindowingExample,nodesid:obea66dec231edf33913ecd54406c1,"parallelism1.operator

,"operator_strategy".,"description". "Window(GlobalWindows(). Delta Trigger, TimeEvictor.

ComparableAggregator, PassThroughWindowF)->: Sink: Print to Std.

Out, inputs((num0id:cbc357ccb63d2852fee8c4fc7df2,ship_ strategy:"HASH"

,"exchange" "pipelined_bounded"),"optimizer_":)."cbc357ccb763d(2852fe

e8c4fc7d5512"."parallelism": 1,"operator ." "operator_strategy".","description"."Source:

Custom Source->: Timestamps/Watermarks, "optimizer_properties": 0))%

flink-1..2 curl-X POST

http://127.0.0.1:8081/jars/6077eca7-6db0-4570_ TopSpeedWindowin

g.jar/run

(jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad")%

Restful API 还提供了很多监控和 Metrics 相关的功能,对任务提交的操作比如命行方式支

持的全面。

(6)Web

在 Flink Dashboard 页面左侧可以看到有个  ISubmit new Job 的地方。用户可以上传 jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。


六、示例 Flink 应用程序启动预提交阶段

但是当进程具有外部状态时,需要作些额外的处理外部状态通常以写入外部系统(如 aka)的形式出现在这种情况下为了提供 Exactly--onc 保证,外部系统必须支持事务这样才能和两阶段提交协议集成

在本示例中的数据需要写入 Kafka,因此数据输出端(Data Sink)有外部状态,在这种情况下在预提交阶段除了将其状态写入  state backend 之外数据输出端还必须预先提交其外部事务

当 Scheckpoint barrieroperator 在所有都传递了一遍,并且发的 chockpoint 调成功完成时,预提交经就结束了,所有触发的状态快照都破为该 checkpoint 一部分, checkpoin 是整个应用程序状态的快照,预先提交的外部状态,如果发生放障,我们可以回滚到上次成功完成快照的时间点。

下一步是通知所有 operator, checkpoint 已经成功了,这是两阶段提交协议的提交阶段, JobManagn 为应用程序中的每个 operatorcheckpoint 发出已完成的回调数据源和 widnow operator 没有外部状态,因此在提交阶段,这些 operator 不必执行任何操作,但是,数据出(Data Sink)拥有外部状态,此时应该提交外部事务。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 SQL NoSQL
事务功能使用及原理介绍(二)|学习笔记
快速学习事务功能使用及原理介绍
197 0
事务功能使用及原理介绍(二)|学习笔记
|
SQL 存储 NoSQL
事务功能使用及原理介绍(一)|学习笔记
快速学习事务功能使用及原理介绍
312 0
事务功能使用及原理介绍(一)|学习笔记
|
存储 缓存 监控
ChangeStreams 使用及原理(二)|学习笔记
快速学习 ChangeStreams 使用及原理
661 0
ChangeStreams 使用及原理(二)|学习笔记
|
SQL 运维 监控
ChangeStreams 使用及原理(一)|学习笔记
快速学习 ChangeStreams 使用及原理
424 0
ChangeStreams 使用及原理(一)|学习笔记
|
存储 缓存 边缘计算
缓存基础(二)|学习笔记
快速学习 缓存基础(二)
123 0
缓存基础(二)|学习笔记
|
Web App开发 缓存 负载均衡
缓存基础(一)|学习笔记
快速学习 缓存基础(一)
108 0
缓存基础(一)|学习笔记
|
SQL Kubernetes 关系型数据库
阿里云 K8s 环境创建(上)|学习笔记
快速学习阿里云 K8s 环境创建(上)
阿里云 K8s 环境创建(上)|学习笔记
|
Kubernetes 监控 固态存储
阿里云 K8s 环境创建(下)|学习笔记
快速学习阿里云 K8s 环境创建(下)
阿里云 K8s 环境创建(下)|学习笔记
|
NoSQL Java 数据库
注册功能(接口)| 学习笔记
快速学习 注册功能(接口)
400 0
注册功能(接口)| 学习笔记
|
编译器 Go 开发者
包使用注意事项和细节(1)|学习笔记
快速学习包使用注意事项和细节(1)
包使用注意事项和细节(1)|学习笔记