Flink on Yarn_K8S 原理剖析及实践(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink on Yarn_K8S 原理剖析及实践。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink on Yarn_K8S 原理剖析及实践(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10043


Flink on Yarn_K8S 原理剖析及实践(二)

三、Flink on Kubernetes 原理及实践

重点讲 Flink Kubernetes 的一个原理剖析,相比 Yarn ,它相当于是下载了一个资源管理, Kubernetes 它一般是提供容器的一个编排,但它同时提供了一个资源管理,包括启了很多功能相负载均衡、自动扩缩容、包括这种升级, Kubernetes 可以认为是当前很火的一个。

(1)Kubernetes - 基本概念

第一个是 master ,它有一个 master 节点,它是负责整个管理集群,它有一个集训的资源访问入口,它里面会运行一些 API Server 、Controller Manager 及 Schedule 的服务,其实,它包含一个 Etcd的一个 KV 存储;第二个是 Node 的节点,它集训操作的单元,是Pod 运行的宿主机, Node 节点里面会包含一个 agent 的进程,就是 Kubelet ,它会维护管理 Node 上的所有容器的一个创建和启停等等,还有一个服务就是 kube-proxy ,它是用来做服务发现的,同时可以用于反向代理和负载均衡, docker engine 中有个 docker引擎,它是主要负责容器的创建和管理工作;第三个是 Pod ,是由若干个容器的一个组合,是一个创建、调度和管理的一个最小单位.

(2)Kubernetes - 架构图

通过这个图里面,基本上能看出它的整个运行的一个过程, APIServer 相当于是一个用户的一个请求请求入口,用户通过可以通过命令通过 etcd ,然后提交命令给 APIServer ,它会将这些请求存储在 etcd , KV 存储里面去, Etcd 是一个键值存储,然后Scheduler 会对这个 etcd 这个存储服务进行扫描,然后再分配具体的机器,然后在每个 Node 节点上的 Kubelet 会找到自己需要跑的Container ,因为是用户指定的,然后在本机上去运行;用户在提交RC 的描述,会有一个 replication controller 的主角,它会监视群众的容器。并且它能够保证容器的副本数。这也是 Kubernetes 比较强大的地方,需要指定 Task 的运行,它一定会保证有两个在运行,如果你退出之后,它会帮你重新掉落;最后是用户提交一个 service描述文件,由 kube proxy 负责整个流程的一个转发。这是Kubernetes 的基本架构图,如下图:

image.png

(3)Kubernetes - 核心概念

第一个是 Replication Controller ,俗称为 RC ,是负责 Pod 副本管理,它可以保证集群中存在指定数量的 Pod 副本,就用户指定多少个,就一定会存在多少个;第二个是 service ,它是提供一个统一的服务访问入口,包括服务代理和发行机制,通过 service 可以很方便地构建一个微服务的价格,其实是一个 PV 和 PC ,它主要就是用于那个数据,就是数据要存在哪个地方,因为那个 Pod 容器是可以随时调度的,可以随时掉落在不同的机子上,并且可以自动飞,这时如果用户需要一个指定一个职业化的存储的话,可以定一个 PVC ;第三个是 ConfigMap ,是用户程序里面一些 KV 一些配置,比如要给这个程序传销的 KV 的配置。

(4)Flink on Kubernetes - 架构

在 Flink上运行的时候,它是分着下图几个步骤,第一个就是会启一个 Flint master process ,就是启 master container ,master 启来之后,它里面也包含一个 Flink 的 ResourceManager ,还有 JobManager 和 Program Runner , Program Runner 是负责资源管理的,然后 TaskManager 启来之后,它会向  ResourceManager进行注册,注册完成之后,由 JobManager 具体的任务分给这个container ,由 container 去执行,其实整个流程和 Yarn 也比较类似,只是说它的 container 的启动,是由 KPS 启动的,在 flink里面,它的 master 和 worker ,实际上用的都是一个镜像,启动的 worker 的脚本的命令是一样的,通过这个命令选择是启动 master ,还是启动 worker ?如下图:

image.png

(5)Flink on Kubernetes - JobManager

用户一般需要订一个 JobManager Deployment , Deployment 它是可以保证一个副本,一般是定义一个副本就够,因为是 JobManager ,还可以定一个标签,比如 Flink-jobmanager ;其次,还需要订一个桌,  JobManager Service ,通过 service name 和 port 暴露JobManager 服务,通过标签选择对应的pods,例如flink-jobmanager。

(6)Flink on Kubernetes - TsakManager

也是通过 Deployment 来描述,比如是多少个并发,这里就需要定义成几个副本,就会帮提几个 taskmanager ,同时也需要一个标签,这个标签是用来进行选取的这样 JobManager 才知道哪些TaskManager 是属于它的,还有一个是 JobManager Service ,它通过 service  name 和 port 暴露 JobManager 服务,通过标签选择对应的 pods ,例如 flink-jobmanager 。

(7)Flink on Kubernetes -交互

下图 Flink on Kubernetes 的一个交互图第一个是 JobManager service ,它是通过一个标签来进行选择对应的 JobManager ,JobManager 是运行在 Pod 上然后这里 TaskManager 有三个 Pod , JobManager Deployment 和 TaskManager Deployment 都可以通过副本数 Kubernetes 来保证,有多少个副本同时, Deployment 还有一个特点就是它可以出现自动升级,如果修改 Deployment 配置之后,可以做个自动升级比如改了一些配置文件,改里面的一个配置项,调入内存什么的,然后 Kubernetes 会帮你自动重启接着是 ConfigMap 每个  Pod 上可以挂着一个目录,里面包含一个 flink-conf.yaml  的一个信息,可以通过挂载的方式,如下图:

image.png

(8)Flink on Kubernetes - 实践

会讲一下在 Yarn 和 Kubernetes 上是怎么运行的首先启动一个Session Cluster ,然后通过这个启动命令,比如 kubectl create -f ,可以将这个 Flink 的那个 jobmanager 收起来,后面,是启动做 taskmanager department ,启动完成之后,用户就可以通过这个接口进行访问,这是 service name ,因为是在 Kubernetes 集群里面的,所以说需要通过一个 Powerful word 经端口转发端口转化之后,就可以在 Kubernetes 集群之外,运行这个命令,如果时是在 Kubernetes 里面,可以直接访问,就不需要通过这个转发了;停任务就直接用 flink cancel 就可以停了,如果要将整个资源销毁,可以用 Kubernetes 里的 f ,就将整个资源进行删除从这几个命令可以看出,对 Kubernetes 的操作其实比较简单的用户只要定义出几个资源描述文件然后就可以通过一个简单的命令,将整个Flink Session Cluster 记下来然后整个资源销毁也很简单就一个命令就可以完成。

(9)jobmanager-deployment.yaml

这是 Flink官方提供的一个例子,就是做 JobManagerh 和TaskManager 的配置。上面第一行就是 API 在线,是一个 APP 的版本号版本号里面,比如是以 extensions ,可以制定一个其他的版本,也可以有一个正式版第二就是就是基本类型,实际上分很多种的除了 Deployment 之外,还有 service ,还有其他很多种资源,还有 Job ;接着试一下 metadata 元数据这是一个副本数比如这个任务需要有副本,因为一个副本就够了,所以说一接下来要在metadata 里面要订一份 Label,Label 就是一个标签,它适用于那个Pod 的选取因为在运行前起来之后, Pod 可能会很多,怎么找到对应的 Pod ;接着是 container ,container 里面是有第一个就是它的名字,第二个是 image , image 里面实际上是包含一个仓库和一个当前版的号因为现在写的 flink latest ,后面讲那个仓库里面镜像是怎么上传的然后接下来是一个 args 就是一个启动参数通过启动参数决定要写 jobmanager ,而不是一个其他的程序因为是个 flink 镜像,所以说他拿的是一个 flink 完整镜像,然后通过参数,启动一个 jobmanager 的参数;然后后面是 ports ,就是要暴露的服务端口,比如现在有 RPC 端口,常见就是 8081 端口然后最后面是 env , env 里面就是一些环境变量,比如说我要启动Jobmanager ,可能会有自己的一些参数,比如有一些线下的一些环境,是有很多的一些环境变量都可以传进去,左边这个 JobManager Deployment 的配置文件右边是 TaskManager ,它的那个配置文件有几个不一样的地方,首先是第一个就是的副本数这里是两个副本,比如要取两个 TaskManager ,然后它上面有一个标签,通过标签来说明要在 Pod 上运行TaskManager,它的镜像也同样和JobManager 是一个镜像,就是 Flink ,它的参数就是 TaskManager ,这样那个 flink 里面有一个入口脚本,这个脚本在运行的时候,通过 taskmanager 了解参数来决定启动的是它也会对外报一些服务,比如说像 data 端口RPC , TaskManager 之间也会暴露一些端口进行通信同样的 TaskManager 也可以传递环境变量,比如说像做 Manager RPC ,可以通过这个环境质量

(10)jobmanager-service,yaml

jobmanager的资源类型Service 类型表示要启动的是一个服务,前面这些类型是一个 deployment , service 里面它的配置会相对简单一点,主要的第一个就是 ports ,这个service是包括服务的端口,比如 RPC 端口其实这些端口都是前面定的 JobManager 端口,然后通过审核进行选取这就是 name 标签,就是 APP flink ,主见叫 jobmanager ,它通过标签来选取 flink ,选取 jobmanager Pod ,然后 jobmanager service 都是通过标签,选取到对应的 jobmanager 。

(11)Flink on Kubernetes - 实践

讲一下 Per Job 的方式, Per Job 的方式其实在生产环境使用的比较少,它的原因就是Job的方式,它是将整个 Job 打成一个镜像这样不是很灵活,如果 Job 里面的逻辑有改动,比如说 Job 需要修改这样每次都需要重新打镜像,整个过程非常繁琐,所以在生产环境可能使用的比较少。

第一个说 build 镜像,这是 Flink 源代码目录,复制源代码目录下有一个 flink/flink-container/docker ,它提供了一个脚本叫 bill.sh 这个脚本这个脚本可以指定从哪个 flink版本开始去构建你的镜像还可以进行很多版本这个脚本打完镜像之后,它就会提示一个成功的信息整个打进的过程可能需要几十分钟主要和网络有关系,网速好的话可能会快点第二步是传镜像在http://hub.docker.com/注册账号和创建仓库,然后上传镜像(以我的仓库zkb555为例),其中①docker tag topspeed zkb555/topspeedwindowing②docker push zkb555/topspeedwindowing,在整个上传过程也比较慢也和网速有关系

(12)代码实现

①先实现 SessionCluster :

首先看看 kube.kubectl.get.pods

kube kubectl get nodeskube kubectl get service 如下图:

image.png

kube kubectl get service -n defult ,如下图:

image.png

接下来是 flink

flink-K&S-conf vlm jobmanager-deployment.yaml

flink-K&S-conf vlm jobmanager-service.yaml ,如下图:

link-K&S-conf kubectl create -f jobmanager-deployment.yam

link-K&S-conf kubectl create -f taskmanager-deployment.yam

link-k&s-conf kubectl descirbe pod

flink-k&s-conf kubectl get pod

flink-k&s-conf kubectl descirbe pod

flin-k&s-conf kubectl apply -f jobmanager-service.yaml如下图:

image.png

flink-k&s-conf kubectl get pods ,如下图:

启动完成之后,就能在本地访问到此服务。可以提交任务

这样任务就提交成功了。

接下来实现 Per Job 模式

将任务暂停整个任务就结束了,会发现再进也进不去。

 

四、答疑环节

(1)问: fink on K8S 模式中正在运行的 flink 如果资源不够了,能自动调整志愿吗?

答:其实这个可以 K8S 提供的功能,就是如果资源不够,实际上需要 flink 自己有这种能力才可以。

(2)问: flink 有维特模式吗?

答:现在官方版的还没有,但是在社区有,其他公司已经开发了。

(3)问:flink 这几年是不是很火,是不是因为它具备 spark 和很多的优势的整合福利?

答:这几年火可能因为包括它自己最开始特长是有预算这一块,同时它有很好的知识来源,对于很多业务场景,因为都需要用很多产品,然后用户需要写一份代码, flink 可以同时支持,因为它那个模型比较好支持这些特性的,所以它这几年会发展比较好。

(4)问:公司想做一个多支付的平台,多个用户同时使用同一个 flink集群,在 flink on  yarn 当中有两种方式,第一个是 flink Session ,第二是 flink Per Job ,这两种方式,哪种方式适合生产中的服务平台?

答:这个问题前面也说过了, Per Job 和 Session 的区别主要看服务平台的任务的类型,像 Per Job 一般是任务运行的时间会比较长,比如可能一千或者几千块运行的任务,它不需要频繁申请资源,而 Session 模式比较适合有数据仓库的任务,比如都是临时查询,运行完之后会很快就结束,运行几分钟或者十几分钟就结束,任务特别多的场景就比较适合用 Session 模式,可以提前将资源申请好,所以这个主要是看公司的任务类型。

(5)问:flink 申请之后能不能实现动态?

答:目前这一块社区还在讨论,可能也会有机会做,但是目前是不行的,但是有计划在做,这个地方准确的说 flink Session 可能在Yarn 的 Session 模式和目前阿里这边使用的可能还有一些差距,比如阿里这边 flink 的 Session 模式它启来之后,资源是全都申请好的,然后 flink 社区的 Yarn 的 Session 模式,它启来之后实际上只有一个 JobManager ,然后任务提交之后,它才启 taskManager。

(6)问:flink 的 Per Job 提交多个任务,只能 try 执行,作为多住户平台,是否如上所述?

答:是的, Per Job 现在提交一个任务之后会退出,但是最近往社区推一个新的特性,叫 drivemode ,可以看一下社区邮件列表,就是说 drivemode 可以将任务,调完之后远程执行内函数,然后提交完运行完的第一个任务之后,它可以运行第二个任务,第二个任务结束之后,接着提交第三个任务, flink 的 drivemode 目前社区应该是有一些讨论的那个邮件,正在往社区推这一块儿。

(7)问:flink on yarn的Session模式是否可以查看所有 Session的作业?

答:flink Yarn的 ResourceManager 的页面上是可以看到每一个 SessionCluster ,然后在每个 SessionCluster 点进去之后,可以看到当前正在运行的 Session 的作业,因为结束作业,它那个页面上是看不到的,因为它需要一个参数,配置多长时间过期,好像是两个小时,两个小时之后删除,就结束了。

(8)问:flink on yarn 中利用 Per Job 方式可以同时处理多作业吗?

答:这个前面也说了,Per Job模式现在就是在一个某方面的任务,它只能提一个,可能需要改造一下,才可以提高的,它目前是一个创新的方式。

(9)问:work 数据库中有三张表的数据,当 word 数据通过 OZZ 发送到对面的几个 topic ,想通过 flink 中去清洗对应的 topic 的数据,然后做 join 操作,追踪做完以后的数据,写到一张表,由于存在网络延迟, Word 中的部分数据延迟三到五天才到达数据库,导致部分表数据对不上,需要第二天守城作用,将前几天的数据写入注册表,每几小时去做一下原来的数据。

答:如果用 flink ,应该不存在这样的情况,如果需要,每小时就会一下写入,这个应该没问题,因为它会进行实时的操作。

(10)问:flink on yarn的 JobManager 是随机选择一台ResourceManager,只能获取这个 IP 的端口吗?

答:确实是随机取的,因为是 flink on Yarn的ResourceManager分配的机器,所以是随机分配的,如果想获取这个 IP 的端口办法是什么?首先有两个方法,第一个方法就是如果知道它的 ID ,通过Yarn 的 ID 是可以通过 Yarn 的 SDK  Java API 接口去查到他的IP 和端口;第二个办法就是如果不知道 ID ,可以在提交一项任务的时候,是需要传一个参数 Yarn 上有一个标签的功能,通过标签可以查到是哪个 JobManager ,然后通过 JobManager 的 ID ,再到Yarn 去查它的 IP 端口。

(11)问:flink 实现是在同一个文件吗?

答:目前确实是这样的,因为和 Yarn 的工作机制有关系,它是用 taskManager 进行一个汇总,目前的解决方法一般是 Job 里面会加一些特定的一些 K ,然后自己做一个区分,比如不同的 task ,将自己的一些信息打印,就打印出一份进行区分。

(12)问:flink on yarn使用 Per Job 方式提交,每个

NodeManager 都需要提前安装 flink 吗?

答:不需要提前安装,因为它是个客户端的一个夹包,它会一起上传上去的。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
12天前
|
SQL 存储 Apache
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
419 5
基于 Flink 进行增量批计算的探索与实践
|
26天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
397 2
探索Flink动态CEP:杭州银行的实战案例
|
5天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
112 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
1月前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
12天前
|
缓存 容灾 网络协议
ACK One多集群网关:实现高效容灾方案
ACK One多集群网关可以帮助您快速构建同城跨AZ多活容灾系统、混合云同城跨AZ多活容灾系统,以及异地容灾系统。
|
22天前
|
Kubernetes Ubuntu 网络安全
ubuntu使用kubeadm搭建k8s集群
通过以上步骤,您可以在 Ubuntu 系统上使用 kubeadm 成功搭建一个 Kubernetes 集群。本文详细介绍了从环境准备、安装 Kubernetes 组件、初始化集群到管理和使用集群的完整过程,希望对您有所帮助。在实际应用中,您可以根据具体需求调整配置,进一步优化集群性能和安全性。
90 12
|
25天前
|
Prometheus Kubernetes 监控
OpenAI故障复盘 - 阿里云容器服务与可观测产品如何保障大规模K8s集群稳定性
聚焦近日OpenAI的大规模K8s集群故障,介绍阿里云容器服务与可观测团队在大规模K8s场景下我们的建设与沉淀。以及分享对类似故障问题的应对方案:包括在K8s和Prometheus的高可用架构设计方面、事前事后的稳定性保障体系方面。
|
27天前
|
Kubernetes 网络协议 应用服务中间件
Kubernetes Ingress:灵活的集群外部网络访问的利器
《Kubernetes Ingress:集群外部访问的利器-打造灵活的集群网络》介绍了如何通过Ingress实现Kubernetes集群的外部访问。前提条件是已拥有Kubernetes集群并安装了kubectl工具。文章详细讲解了Ingress的基本组成(Ingress Controller和资源对象),选择合适的版本,以及具体的安装步骤,如下载配置文件、部署Nginx Ingress Controller等。此外,还提供了常见问题的解决方案,例如镜像下载失败的应对措施。最后,通过部署示例应用展示了Ingress的实际使用方法。
60 2
|
1月前
|
存储 Kubernetes 关系型数据库
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
本文源自2024云栖大会苏雅诗的演讲,探讨了K8s集群业务为何需要灾备及其重要性。文中强调了集群与业务高可用配置对稳定性的重要性,并指出人为误操作等风险,建议实施周期性和特定情况下的灾备措施。针对容器化业务,提出了灾备的新特性与需求,包括工作负载为核心、云资源信息的备份,以及有状态应用的数据保护。介绍了ACK推出的备份中心解决方案,支持命名空间、标签、资源类型等维度的备份,并具备存储卷数据保护功能,能够满足GitOps流程企业的特定需求。此外,还详细描述了备份中心的使用流程、控制台展示、灾备难点及解决方案等内容,展示了备份中心如何有效应对K8s集群资源和存储卷数据的灾备挑战。

热门文章

最新文章