客户端操作(一)| 学习笔记

简介: 快速学习客户端操作。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 客户端操作(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10035


客户端操作

 

目录

一、 环境说明

二、 课程概要

(1) Scala Shell

(2)SQL Client

(3) CommendLine

(4) Restful

(5) Web

三、flink 命令行

四、基本用法

五、结果展示

 

一、环境说明

本期课程基于社区 fink-172 版本运行环境的准备和集群(Slandalone,yarn)的部参考前面的课程本文所有操作的演示都是基于 Mac/ linux 系统和  Chrome 浏览器

 

二、课程概要

Flink 提供了丰富的客户端操作来提交任务和与任务进行交互。本期课程将分为5个部分进行讲解包括 Fink 行, Scala Shell SQL Client Restful API Web

在 fink 安装目录的 bin 目录下面可以看到有link,start-scala-shell-- sh和s -clientsh 等文件,这些都是客户端操作的入口。

 

三、 Flink 命令行

flink 命行参数多,输入 flink-h 能看到完整的说明

+fiink-1.7. bin/fink-h

如果想看某一个命的参数,比如 run 命令,输入:

fink-1.7.2 bin/ run-h

(1)standalone

启动 standalone 集群

flink-1.7 bin/start-cluster .sh

Starting cluster

Starting standalonesession daemon on host zkb-MBP.local

Starting taskexecutor daemon on host zkb-MBP.local.

打开 tt127.0.0.1:8081 能看到 Web 界面。

Run

运行任务

4flink-1..2 bin/fink run-d examples/streaming/TopSpeedWindowing jar

Starting execution of program

Executing TopSpeedWindowing example with default input data set

Use -input to specily file input.

Printing result to stdout. Use-output to specify output path.

Job has been submitted with JoblDe20cb6b357591172eeade

默认是1个开发。

使用官方自带的例子。任务启动后,可以在页面上看到已经启动完成。在任务里面有两个 winters。

点左侧『 Task Manager,然后点 Stdout 能看到输出日志。

在 log 下面点out文件,在这里也可以看到程序的输出。

list 命令,当前一个任务正在运行。

查看任务列表

flink-1.7.2 bin/fink list-m 127.0.0.1:8081

Waiting for response...

------Running/Restarting Jobs ---------

24.03201910:14:06:5e20cb6b0f575911712eeao9de:

------------------------------

CarTopSpeedWindowingExample (RUNNING)

No scheduled jobs.

(2)stop

停止任务

在 stop 任务的时候发现有一个错误,这个任务不能被停止。

一个 job 能够 stop 要求所有的  sourco 都是可以  stoppatle 的,即实现了StoppableFunchon 接口。

需要能 stoppable 的函数必须实现这个接口例如流式任务的 source

stop 方法在任务收到 STOP 信号的时候调用。

source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。

(3)PublicEvolving

public interfacs StoppableFunction

停止 source,与 cance)不同的是:这是一个 source 优停止的请求。

等待中的数据可以继续发送出去,不需要立即停止。

void stop

)

cancel

取消任务如果在 con/ink-cont.yaml 里面配 state savepointsdir会保存 savepoi nt否则不会保存 savepoint

flink-1.7.2 bin/fink cancel -m 127.0.0.1:8081 5e20cb6b0r357591171dfcca2eea09de

Cancelling job 5e20cb6b0f357591171dfcca2eea09de.

Cancelled job 5e20cb6b0f357591171dfcca2eea09de

也可以在停止的时候显示指定 savepoint 目录

可以看到这个任务已经停掉了。把任务再启动,可以看到现在任务在启动,准备一个等级目录,目录已经存在,可以把数据清空,调一下 cancel,可以指定savepoint,已经有数据在里面。

取消和停止(流作业)的区别如下:

cancel 调用立即调用作业算子的 cancel 方法,以快取消它们,如果算子在到 cancel 调用后没有停止 flink 将开始定期中断算子线程的执行,到所有算子停止为止。

stop 调用,是更优雅的停止正在运行流作业的方式。 stop仅适用于 source实现了StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 sourca都将接收 stop 方法调用。直到所有  source 正常关闭时,作业才会正常结束。这种方式使作业正常处理完所有作业。

(4) savepoint

触发 savepoint

flink-1.7.2 bin/ savepoint-m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5f52btb

说明: savepoint 和 checkpoint 的区别(详见文档):

checkpoint是 增量做的每次的时间较短,数据量较小,只要在程序里面后用后会自动触发,用户无须感知; savepoint 是全量做的,每次的时间较长,数据量较大需要用户主动去触发。

checkpoint 是作业  failover 的时候自动使用,不需要用户指定 savepoint 一般用干程序的版本更新(详见文档),bug 修复,A/B Test 等场景,需要用户指定。

从指定的 savepoint 启动

先把任务停掉,指定 savepoint,任务启动,看一下日志文件。这个任务在启动的时候可以看到它在恢复。

查看 JobManager 的日志,能够看到类似这样的 log:

2019-03-2810:30:53,957INFo

org apache.fink. runtime. checkpoint CheckpointC-Starting j

(5)modify

修改任务并行度。

为了方便演示我们修改 conflink-c-conf.yal 将 task slot 数调为

4.并置 savepoint 目录

(modify 参数后面接 -s 指定 savepoint 路径可能有 bug.提示无法识别)

taskmanager. numberOfTaskSlots: 4

State. savepoint. dir file:// tmp /savepoint

修改参数后需要重后集群生效,然后再后动任务

从页面上能看到 Task Slots 变为了4,这时候任务的默认并发度是1。

通过 modify 命并发度修改为4和3.可以看到每次 modily 命都会触发一次 savepoint

查看 JobManager 的日志,可以看到:

2019-03-2810:33:11.179INFo

(6)info

拷贝输出的 json 内容,粘贴到这个网站 ison : http://flink.ap org/visualizer

可以和实际运行的物理执行计划对比:

(7) yarn per-job

单任务 attach 模式

默认是 attach 模式即客户端会一等待值到程序结束才会退出

通过 myam- -cluster 指定 yam 模式

客户端能看到结果输出

yam 上显示 Flink session cluster这个 batch 任务行完会 FINISHED

[adminz17.sqa. zth /home/admin/llink/flink-1.7.2)

Secho SHADOOP CONF_DIR

/etc/hadoop/cont/

[adminez17. sqa. zth /home/admin/ink/llink-1.7.2)

s /bin/ run -m yarn-cluster Jexamplesbatch/WordCount in

单任务 detached

由于是 detached 模式,客户端提交完任务就退出了

yarn 上显示为 Flink per-job- cluster

S. /bin/flink run-yd-m

(8)启动 session

/bin/yam-session.sh -tm 2048-s 3

表示动一个  yarn session 集群,每个 TM 的内存是2G每个 TM 有3个 slot(-n参数不生效)

客户端默认是 attach 模式,不会退出

a.可以 ctr+c 退出,然后再通过 bin/yar--sessionsh-id

application1532332183347_0708连上来。

b.或者启动的时候用 d 则为 detached 模式

yam 上显示为 Flink session cluster

/tmp下生成了一个文件

提交任务

/bin/flink run Jexamples/batch/WordCount.jar

将会根据 /tmp/yam -properties-admin--文件内容提交到了刚动的 session

运行结束后 TM 的资源会释放。

提交到指定的 session

通过 yid 参数来提交到指定的 session

blink 的 session 与 fink 的  session 的区别:

flink session-n 参数不生效,而且不会提前启动 TM

blink 的  session 可以通过-n指定启动多少个 TM 而且 TM 会提前起来

(9) Scala Shell

官方文档 :

https: llci.apache  org/projects/flink/flink-./scala shell. html.org/projectsfikfin-docs-rease-.7oosscalashell.html

Deploy 

任务运行说明

a.Batch 任务内置了 benv 变量,通 过 prit 结果输出到控制台

b. Streaming 任务内置了 senv 变量,通过 env v.executeob name )来提交任务且 Datastream的输出只有在 local 模式下打印到控制台。

remote

先启动一个 yam session cluster

. /bin/yamn-session.sh -Im 2048-s 3

(10) yarn

按 CTRL+C 退出后这个 flink cluster 还会继续运行不会退出

(11) Execute

Dataset

flink--1.7.2bin/stop-cluster-.sh

No taskexecutor daemon to stop on host zkb-MBP.local.

(that,)

(the,1)

to,2)

对 DataSet 任务来说  print() 会触发任务的执行。

也可以将结果输出到文件(先删除 /tmp/ou1, 不然会报错同名文件已经存在),继续执行以

下命令:

scala> counts writeAsText("/mp/out1")

res1: org. apache. flink. api.java.operatorsDataSink[(String. Int)] DataSink'<unnameds'

ITextOutputFormat (/mp/out1)-UTF-8)

scala> benv.execute("batch test")

res2: org apache flink. api. common.JobExecutionResult=

org-apache.fink.api. common.JobExecutionRes@737652a9

查看/tmp/out1文件就能看到输出结果。

4flink-1.7.2 cat /tmp/out1

be,2)

(is,1)

(not,)

(or1)

(question,1)

(that, 1)

(the,1)

(to,2)

(12)DataStream

(13)TableAPI

在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用  btenv sqlQuery提交 sq 查询)

社区版本1.8会支持  tableapi: https: /ssuesssues

radira/browse/FLINK-9555

SQL Client Beta

SOL Client 目前还只是测试版处干开发阶段只能用于 SQL 的原型验证不能在生产环境使用。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
监控 Linux 应用服务中间件
linux查看日志文件tail -f用法
在 Linux 中,查看和监控日志文件是系统管理员和开发者常用的操作之一。tail 命令就是用来查看文件内容的,它默认显示文件的最后部分。tail -f 是 tail 命令的一个非常有用的选项,用于实时查看和跟踪日志文件的更新,尤其是在监控运行中的服务时非常有用。
741 0
|
SQL 消息中间件 存储
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
Java 关系型数据库 数据库连接
实时计算 Flink版操作报错之在使用JDBC连接MySQL数据库时遇到报错,识别不到jdbc了,怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
机器学习/深度学习 搜索推荐 大数据
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
【10月更文挑战第2天】在处理大规模数据集的推荐系统项目时,提高检索模型的召回率成为关键挑战。本文分享了通过改进特征工程(如加入用户活跃时段和物品相似度)和优化模型结构(引入注意力机制)来提升召回率的具体策略与实现代码。严格的A/B测试验证了新模型的有效性,为改善用户体验奠定了基础。这次实践加深了对特征工程与模型优化的理解,并为未来的技术探索提供了方向。
579 2
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
|
前端开发 JavaScript Java
谷粒商城笔记+踩坑(3)——商品服务-三级分类、网关跨域
商品服务-三级分类增删改查、跨域问题、逻辑删除
|
SQL 分布式计算 DataWorks
MaxCompute操作报错合集之表增加字段,提示创建表失败:DDL execute error, OdpsException: ODPS-0130071:[1,60] Semantic analysis exception - column,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
408 2
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之cdc postgres数据库,当表行记录修改后报错,该如何修改
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
监控 NoSQL MongoDB
MongoDB中的TTL索引:自动过期数据的深入解析与使用方式
MongoDB中的TTL索引:自动过期数据的深入解析与使用方式
|
关系型数据库 MySQL Linux
DolphinScheduler2.x 伪分布式部署
DolphinScheduler2.x 伪分布式部署
440 0
|
机器学习/深度学习 人工智能 网络协议
人工智能平台PAI操作报错合集之报错 "curl: (35) TCP connection reset by peer" 表示什么
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。