客户端操作(二)| 学习笔记

简介: 快速学习客户端操作。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 客户端操作(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10035


客户端操作(二)


四、 基本用法

select 查询

Flink SQL> SELECT 'Hello World:

按Q”退出这个界面

打开ttp127.0.0.18081能看到这条 http://127.0.01:01 select 语句产生的查询任务已经结束了。这个查询采用

的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink.且只输出一条结果。

(1) explain

explain 命可以查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT()AS cnt FROM (VALUES (Bob"). ("Alice).

Greg"). (Bob') AS NameTable(name) GROUP BY name:

= Abstract Syntax Tree==抽象语法树

LogicalAggregate(group=HO), cnt=[COUNTOD)

LogicalValues(tuples=[( _UTF-16LE'Bob ') {_UTF-16LE'Alice'), {_UTF-16LE'Greg').

(_UTF-16LE'Bob')

==Optimized Logical Plan==优化后的逻辑执行计划

DataStreamGroupAggregate(groupBy=[ name], select=[name, COUNT(")AS cntl)

DataStreamValues(tuples=( UTF-16LE'Bob') _UTF-16LE'Alice').

_UTF-16LE'Greg') _UTF-16LE'Bob')'))

== Physical Execution Plan==物理执行计划

Stage 3: Data Source

content collect elements with CollectionInputFormat

Stage 5:Operator

content groupBy: (name), select: (name, COUNT(") AS cnt)

ship_strategy: HASH

 

五、 结果展示

SOL Clent 支持两种模式来维护并展示查询结果

(1)table mode

在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命用 table

mode

SET execution.result-mode=table

(2).changelog mode

不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。

SET execution. result-mode=changelog

接下来通过实际的例子进行演示。

table mode

Flink SOL> SET execution.result-mode=table:

[INFO] Session property has been set.

Flink SQL> SELECT name, COUNT( )AS cnt FROM (VALUES ("Bob"), ("Alice). (Greg).

(Bob')AS NameTable(name)GROUP BY name;

(2)changlog mode

Flink SQL> SET execution. result-mode=changelog:

[INFO] Session property has been set

Flink SOL> SELECT name, COUNT()AS cnt FROM (VALUES ('Bob"). (Alice). (Greg).

(Bob')) AS NameTable(name) GROUP BY name:

(3)Environment Files

目前的 SQL Client 还不支持 DD 语句只能通过 yaml 文件的方式来定义 SQL 查询需要的表udf和运行参数等信息。

首先准备 envyaml 和 input csv 两个文件。

flink-1.7.2 cat /tmp/env.yaml

tables:

-name: MyTableSource

type: source-table

update-mode: append

connector:

type: filesystem

path: "/tmp/input. csv"

result-mode: table 

which table programs are submitted to.

deployment:

response-timeout: 5000

+flink-1.7.2 cat /tmp/input.csv

1,hello

2.world

3.hello world

1.ok

3,bye bye

4. Yes

(4)启动 SQL Client

fink-1.7.2/bin/sgl--client.sh embedded-mp/envyam

No default environment specified.

Searching for

/Users/baoniu/Documents/work/tool/flink/fink-1.7.2/conf/sql-client-defaults.yamnl"..found

Reading default environment from:

file:

Flink SQL> describe MyView1;

root

-MyField1: Integer

Flink SQL> select"from MyTableSource; 

使用 insert into写入结果表:

Flink SoL> insert into MyTableSink select from My TableSourcer

[INFO] Submitting SQL update statement to the cluster

[INFO] Table update statement has been successfully submitted to the cluster

Cluster ID: StandaloneClusterld

Job ID: 3fac2be1fd891e3e07595c684bb7b7a0

web interface: http://ocalhost:8081

查询生成的结果数据文件:

flink-1.7.2 cat /tmp/output. csv

1.hello

2.world

3,hello world

1.ok

3,bye bye

4.yes

也可以在 environment 件里面定义 udf,在  SQL Client 面通过 SHOW FUNCTIONSJ

查询和使用,这里就不再说明了。

SOL Client 功能社区还在开发中详见 FLIP-24

(5)Restful APl

下来们演示如何通过 rest api 来提交 jar 包和执行任务

更详的操作请参考 flink 的 restful api 文档:

bttos anache vrolectsifink flink-docs- stableimonitoring/rest api.html

4ink-1.7 2 curt htt:/127.0.0.: 8081/overview

Caskmanagers": 1. slots-total":,"slots-available:,"jobs-running": 3. "jobs-finished": 0."

cancelled 0. obs-tailed":0. "flink-version"."1.7.2", "flink-commit": "cebaBary

fink-1.7.2 POSTH"Expect:"-F

jartile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7. 1/examples/streaming/ TopSpe

edwindowing. jar" http://127.0.0.1:8081/jars/upload

filename"varr/folders/2b/r6d49pcs23z43b8fqs8885c0000gn/t/flin-web-124c4895-cfo

-4eec-8e15-8263d347efc2/flink-web-upload6077eca7-6db0-4570-a4d0-4c3e05a5dc59T

opSpeedWindowing. jar", "status": "success")%

flink-1.7.2 curl http:/127.0.0.1: 8081/jars

(address": "http://ocalhost:8081", "files":i("id"."TopSpeedWindowing.jar,"name:"TopSpeedW.jar,uploaded:1553743438000

entry":["name": "org. apache. flink. streamingexamples. windowing. TopSpeedWindowing"."d

escription":) 1)%

flink-1.7.2 curl

htto1278081ars6077eca7-6db0 TopSpeedWindowin

g.jar/planplanid:41029eb3feb9132619e454ec9b2a89fb,"name":"CarTopSpeedWindowingExample,nodesid:obea66dec231edf33913ecd54406c1,"parallelism1.operator

,"operator_strategy".,"description". "Window(GlobalWindows(). Delta Trigger, TimeEvictor.

ComparableAggregator, PassThroughWindowF)->: Sink: Print to Std.

Out, inputs((num0id:cbc357ccb63d2852fee8c4fc7df2,ship_ strategy:"HASH"

,"exchange" "pipelined_bounded"),"optimizer_":)."cbc357ccb763d(2852fe

e8c4fc7d5512"."parallelism": 1,"operator ." "operator_strategy".","description"."Source:

Custom Source->: Timestamps/Watermarks, "optimizer_properties": 0))%

flink-1..2 curl-X POST

http://127.0.0.1:8081/jars/6077eca7-6db0-4570_ TopSpeedWindowin

g.jar/run

(jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad")%

Restful API 还提供了很多监控和 Metrics 相关的功能,对任务提交的操作比如命行方式支

持的全面。

(6)Web

在 Flink Dashboard 页面左侧可以看到有个  ISubmit new Job 的地方。用户可以上传 jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。


六、示例 Flink 应用程序启动预提交阶段

但是当进程具有外部状态时,需要作些额外的处理外部状态通常以写入外部系统(如 aka)的形式出现在这种情况下为了提供 Exactly--onc 保证,外部系统必须支持事务这样才能和两阶段提交协议集成

在本示例中的数据需要写入 Kafka,因此数据输出端(Data Sink)有外部状态,在这种情况下在预提交阶段除了将其状态写入  state backend 之外数据输出端还必须预先提交其外部事务

当 Scheckpoint barrieroperator 在所有都传递了一遍,并且发的 chockpoint 调成功完成时,预提交经就结束了,所有触发的状态快照都破为该 checkpoint 一部分, checkpoin 是整个应用程序状态的快照,预先提交的外部状态,如果发生放障,我们可以回滚到上次成功完成快照的时间点。

下一步是通知所有 operator, checkpoint 已经成功了,这是两阶段提交协议的提交阶段, JobManagn 为应用程序中的每个 operatorcheckpoint 发出已完成的回调数据源和 widnow operator 没有外部状态,因此在提交阶段,这些 operator 不必执行任何操作,但是,数据出(Data Sink)拥有外部状态,此时应该提交外部事务。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
监控 Linux 应用服务中间件
linux查看日志文件tail -f用法
在 Linux 中,查看和监控日志文件是系统管理员和开发者常用的操作之一。tail 命令就是用来查看文件内容的,它默认显示文件的最后部分。tail -f 是 tail 命令的一个非常有用的选项,用于实时查看和跟踪日志文件的更新,尤其是在监控运行中的服务时非常有用。
741 0
|
SQL Web App开发 资源调度
客户端操作(一)| 学习笔记
快速学习客户端操作。
|
1天前
|
数据采集 人工智能 安全
|
10天前
|
云安全 监控 安全
|
2天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
910 150
|
15天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1646 8
|
6天前
|
人工智能 前端开发 文件存储
星哥带你玩飞牛NAS-12:开源笔记的进化之路,效率玩家的新选择
星哥带你玩转飞牛NAS,部署开源笔记TriliumNext!支持树状知识库、多端同步、AI摘要与代码高亮,数据自主可控,打造个人“第二大脑”。高效玩家的新选择,轻松搭建专属知识管理体系。
365 152
|
7天前
|
人工智能 自然语言处理 API
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸
一句话生成拓扑图!next-ai-draw-io 结合 AI 与 Draw.io,通过自然语言秒出架构图,支持私有部署、免费大模型接口,彻底解放生产力,绘图效率直接爆炸。
604 152
|
9天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
570 13