开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Flink on Yarn_K8S 原理剖析及实践(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10043
Flink on Yarn_K8S 原理剖析及实践(二)
三、Flink on Kubernetes 原理及实践
重点讲 Flink Kubernetes 的一个原理剖析,相比 Yarn ,它相当于是下载了一个资源管理, Kubernetes 它一般是提供容器的一个编排,但它同时提供了一个资源管理,包括启了很多功能相负载均衡、自动扩缩容、包括这种升级, Kubernetes 可以认为是当前很火的一个。
(1)Kubernetes - 基本概念
第一个是 master ,它有一个 master 节点,它是负责整个管理集群,它有一个集训的资源访问入口,它里面会运行一些 API Server 、Controller Manager 及 Schedule 的服务,其实,它包含一个 Etcd的一个 KV 存储;第二个是 Node 的节点,它集训操作的单元,是Pod 运行的宿主机, Node 节点里面会包含一个 agent 的进程,就是 Kubelet ,它会维护管理 Node 上的所有容器的一个创建和启停等等,还有一个服务就是 kube-proxy ,它是用来做服务发现的,同时可以用于反向代理和负载均衡, docker engine 中有个 docker引擎,它是主要负责容器的创建和管理工作;第三个是 Pod ,是由若干个容器的一个组合,是一个创建、调度和管理的一个最小单位.
(2)Kubernetes - 架构图
通过这个图里面,基本上能看出它的整个运行的一个过程, APIServer 相当于是一个用户的一个请求请求入口,用户通过可以通过命令通过 etcd ,然后提交命令给 APIServer ,它会将这些请求存储在 etcd , KV 存储里面去, Etcd 是一个键值存储,然后Scheduler 会对这个 etcd 这个存储服务进行扫描,然后再分配具体的机器,然后在每个 Node 节点上的 Kubelet 会找到自己需要跑的Container ,因为是用户指定的,然后在本机上去运行;用户在提交RC 的描述,会有一个 replication controller 的主角,它会监视群众的容器。并且它能够保证容器的副本数。这也是 Kubernetes 比较强大的地方,需要指定 Task 的运行,它一定会保证有两个在运行,如果你退出之后,它会帮你重新掉落;最后是用户提交一个 service描述文件,由 kube proxy 负责整个流程的一个转发。这是Kubernetes 的基本架构图,如下图:
(3)Kubernetes - 核心概念
第一个是 Replication Controller ,俗称为 RC ,是负责 Pod 副本管理,它可以保证集群中存在指定数量的 Pod 副本,就用户指定多少个,就一定会存在多少个;第二个是 service ,它是提供一个统一的服务访问入口,包括服务代理和发行机制,通过 service 可以很方便地构建一个微服务的价格,其实是一个 PV 和 PC ,它主要就是用于那个数据,就是数据要存在哪个地方,因为那个 Pod 容器是可以随时调度的,可以随时掉落在不同的机子上,并且可以自动飞,这时如果用户需要一个指定一个职业化的存储的话,可以定一个 PVC ;第三个是 ConfigMap ,是用户程序里面一些 KV 一些配置,比如要给这个程序传销的 KV 的配置。
(4)Flink on Kubernetes - 架构
在 Flink上运行的时候,它是分着下图几个步骤,第一个就是会启一个 Flint master process ,就是启 master container ,master 启来之后,它里面也包含一个 Flink 的 ResourceManager ,还有 JobManager 和 Program Runner , Program Runner 是负责资源管理的,然后 TaskManager 启来之后,它会向 ResourceManager进行注册,注册完成之后,由 JobManager 具体的任务分给这个container ,由 container 去执行,其实整个流程和 Yarn 也比较类似,只是说它的 container 的启动,是由 KPS 启动的,在 flink里面,它的 master 和 worker ,实际上用的都是一个镜像,启动的 worker 的脚本的命令是一样的,通过这个命令选择是启动 master ,还是启动 worker ?如下图:
(5)Flink on Kubernetes - JobManager
用户一般需要订一个 JobManager Deployment , Deployment 它是可以保证一个副本,一般是定义一个副本就够,因为是 JobManager ,还可以定一个标签,比如 Flink-jobmanager ;其次,还需要订一个桌, JobManager Service ,通过 service name 和 port 暴露JobManager 服务,通过标签选择对应的pods,例如flink-jobmanager。
(6)Flink on Kubernetes - TsakManager
它也是通过 Deployment 来描述,比如是多少个并发,这里就需要定义成几个副本,就会帮提几个 taskmanager ,同时它也需要一个标签,这个标签是用来进行选取的,这样 JobManager 才知道哪些TaskManager 是属于它的,还有一个是 JobManager Service ,它通过 service name 和 port 暴露 JobManager 服务,通过标签选择对应的 pods ,例如 flink-jobmanager 。
(7)Flink on Kubernetes -交互
下图是 Flink on Kubernetes 的一个交互图,第一个是 JobManager service ,它是通过一个标签来进行选择对应的 JobManager ,JobManager 是运行在 Pod 上,然后这里 TaskManager 有三个 Pod , JobManager Deployment 和 TaskManager Deployment 都可以通过副本数,由 Kubernetes 来保证,有多少个副本,同时, Deployment 还有一个特点就是它可以出现自动升级,如果修改 Deployment 配置之后,它可以做个自动升级,比如改了一些配置文件,改里面的一个配置项,调入内存什么的,然后 Kubernetes 会帮你自动重启;接着是 ConfigMap ,在每个 Pod 上可以挂着一个目录,里面包含一个 flink-conf.yaml 的一个信息,可以通过挂载的方式,如下图:
(8)Flink on Kubernetes - 实践
会讲一下在 Yarn 和 Kubernetes 上是怎么运行的,首先启动一个Session Cluster ,然后通过这个启动命令,比如 kubectl create -f ,可以将这个 Flink 的那个 jobmanager 收起来,后面,是启动做 taskmanager department ,启动完成之后,用户就可以通过这个接口进行访问,这是 service name ,因为它是在 Kubernetes 集群里面的,所以说需要通过一个 Powerful word 经端口转发,端口转化之后,就可以在 Kubernetes 集群之外,运行这个命令,如果这时是在 Kubernetes 里面,可以直接访问,就不需要通过这个转发了;停任务就直接用 flink cancel 就可以停了,如果要将整个资源销毁,可以用 Kubernetes 里的 f ,就将整个资源进行删除,从这几个命令可以看出,对 Kubernetes 的操作其实比较简单的,用户只要定义出几个资源描述文件,然后就可以通过一个简单的命令,将整个Flink Session Cluster 记下来;然后整个资源销毁也很简单,就一个命令就可以完成。
(9)jobmanager-deployment.yaml
这是 Flink官方提供的一个例子,就是做 JobManagerh 和TaskManager 的配置。上面第一行就是 API 在线,是一个 APP 的版本号,版本号里面,比如是以 extensions ,可以制定一个其他的版本,也可以有一个正式版;第二就是就是基本类型,实际上分很多种的,除了 Deployment 之外,还有 service ,还有其他很多种资源,还有 Job ;接着试一下 metadata 元数据,这是一个副本数,比如这个任务需要有副本,因为一个副本就够了,所以说一,接下来要在metadata 里面要订一份 Label,Label 就是一个标签,它适用于那个Pod 的选取,因为在运行前起来之后, Pod 可能会很多,怎么找到对应的 Pod ;接着是 container ,container 里面是有第一个就是它的名字,第二个是 image , image 里面实际上是包含一个仓库和一个当前版的号,因为现在写的 flink latest ,后面讲那个仓库里面镜像是怎么上传的;然后接下来是一个 args ,它就是一个启动参数,通过启动参数决定要写 jobmanager ,而不是一个其他的程序,因为是个 flink 镜像,所以说他拿的是一个 flink 完整镜像,然后通过参数,启动一个 jobmanager 的参数;然后后面是 ports ,就是要暴露的服务端口,比如现在有 RPC 端口,常见就是 8081 端口;然后最后面是 env , env 里面就是一些环境变量,比如说我要启动Jobmanager ,可能会有自己的一些参数,比如有一些线下的一些环境,是有很多的一些环境变量都可以传进去,左边这个 JobManager Deployment 的配置文件,右边是 TaskManager ,它的那个配置文件有几个不一样的地方,首先是第一个就是它的副本数,这里是两个副本,比如要取两个 TaskManager ,然后它上面有一个标签,通过标签来说明要在 Pod 上运行TaskManager,它的镜像也同样和JobManager 是一个镜像,就是 Flink ,它的参数就是 TaskManager ,这样那个 flink 里面有一个入口脚本,这个脚本在运行的时候,通过 taskmanager 了解参数来决定启动的是,它也会对外报一些服务,比如说像 data 端口、 RPC , TaskManager 之间也会暴露一些端口进行通信,同样的 TaskManager 也可以传递环境变量,比如说像做 Manager 的 RPC ,可以通过这个环境质量。
(10)jobmanager-service,yaml
jobmanager的资源类型是 Service 类型,表示它要启动的是一个服务,前面这些类型是一个 deployment ,在 service 里面它的配置会相对简单一点,主要的第一个就是 ports ,这个service是包括服务的端口,比如 RPC 端口,其实这些端口都是前面定的 JobManager 端口,然后它通过审核进行选取,这就是 name 标签,就是 APP flink ,主见叫 jobmanager ,它通过标签来选取 flink ,选取 jobmanager 的 Pod ,然后 jobmanager service 都是通过标签,选取到对应的 jobmanager 。
(11)Flink on Kubernetes - 实践
讲一下 Per Job 的方式, Per Job 的方式其实在生产环境使用的比较少,它的原因就是Job的方式,它是将整个 Job 打成一个镜像,这样不是很灵活,如果 Job 里面的逻辑有改动,比如说 Job 需要修改,这样它每次都需要重新打镜像,整个过程非常繁琐,所以在生产环境可能使用的比较少。
第一个说 build 镜像,这是 Flink 源代码目录,复制源代码目录下有一个 flink/flink-container/docker ,它提供了一个脚本叫 bill.sh 这个脚本,这个脚本可以指定从哪个 flink版本开始去构建你的镜像,还可以进行很多版本,这个脚本打完镜像之后,它就会提示一个成功的信息,整个打进的过程可能需要几十分钟,主要和网络有关系,网速好的话可能会快点;第二步是传镜像,在http://hub.docker.com/注册账号和创建仓库,然后上传镜像(以我的仓库zkb555为例),其中①docker tag topspeed zkb555/topspeedwindowing②docker push zkb555/topspeedwindowing,在整个上传过程也比较慢,也和网速有关系。
(12)代码实现
①先实现 SessionCluster :
首先看看 kube.kubectl.get.pods
看 kube kubectl get nodes
和 kube kubectl get service
如下图:
看 kube kubectl get service -n defult
,如下图:
接下来是 flink
flink-K&S-conf vlm jobmanager-deployment.yaml
flink-K&S-conf vlm jobmanager-service.yaml ,
如下图:
link-K&S-conf kubectl create -f jobmanager-deployment.yam
link-K&S-conf kubectl create -f taskmanager-deployment.yam
link-k&s-conf kubectl descirbe pod
flink-k&s-conf kubectl get pod
flink-k&s-conf kubectl descirbe pod
flin-k&s-conf kubectl apply -f jobmanager-service.yaml
如下图:
看 flink-k&s-conf kubectl get pods
,如下图:
启动完成之后,就能在本地访问到此服务。可以提交任务
这样任务就提交成功了。
②接下来实现 Per Job 模式
将任务暂停整个任务就结束了,会发现再进也进不去。
四、答疑环节
(1)问: fink on K8S 模式中正在运行的 flink 如果资源不够了,能自动调整志愿吗?
答:其实这个可以 K8S 提供的功能,就是如果资源不够,实际上需要 flink 自己有这种能力才可以。
(2)问: flink 有维特模式吗?
答:现在官方版的还没有,但是在社区有,其他公司已经开发了。
(3)问:flink 这几年是不是很火,是不是因为它具备 spark 和很多的优势的整合福利?
答:这几年火可能因为包括它自己最开始特长是有预算这一块,同时它有很好的知识来源,对于很多业务场景,因为都需要用很多产品,然后用户需要写一份代码, flink 可以同时支持,因为它那个模型比较好支持这些特性的,所以它这几年会发展比较好。
(4)问:公司想做一个多支付的平台,多个用户同时使用同一个 flink集群,在 flink on yarn 当中有两种方式,第一个是 flink Session ,第二是 flink Per Job ,这两种方式,哪种方式适合生产中的服务平台?
答:这个问题前面也说过了, Per Job 和 Session 的区别主要看服务平台的任务的类型,像 Per Job 一般是任务运行的时间会比较长,比如可能一千或者几千块运行的任务,它不需要频繁申请资源,而 Session 模式比较适合有数据仓库的任务,比如都是临时查询,运行完之后会很快就结束,运行几分钟或者十几分钟就结束,任务特别多的场景就比较适合用 Session 模式,可以提前将资源申请好,所以这个主要是看公司的任务类型。
(5)问:flink 申请之后能不能实现动态?
答:目前这一块社区还在讨论,可能也会有机会做,但是目前是不行的,但是有计划在做,这个地方准确的说 flink Session 可能在Yarn 的 Session 模式和目前阿里这边使用的可能还有一些差距,比如阿里这边 flink 的 Session 模式它启来之后,资源是全都申请好的,然后 flink 社区的 Yarn 的 Session 模式,它启来之后实际上只有一个 JobManager ,然后任务提交之后,它才启 taskManager。
(6)问:flink 的 Per Job 提交多个任务,只能 try 执行,作为多住户平台,是否如上所述?
答:是的, Per Job 现在提交一个任务之后会退出,但是最近往社区推一个新的特性,叫 drivemode ,可以看一下社区邮件列表,就是说 drivemode 可以将任务,调完之后远程执行内函数,然后提交完运行完的第一个任务之后,它可以运行第二个任务,第二个任务结束之后,接着提交第三个任务, flink 的 drivemode 目前社区应该是有一些讨论的那个邮件,正在往社区推这一块儿。
(7)问:flink on yarn的Session模式是否可以查看所有 Session的作业?
答:flink Yarn的 ResourceManager 的页面上是可以看到每一个 SessionCluster ,然后在每个 SessionCluster 点进去之后,可以看到当前正在运行的 Session 的作业,因为结束作业,它那个页面上是看不到的,因为它需要一个参数,配置多长时间过期,好像是两个小时,两个小时之后删除,就结束了。
(8)问:flink on yarn 中利用 Per Job 方式可以同时处理多作业吗?
答:这个前面也说了,Per Job模式现在就是在一个某方面的任务,它只能提一个,可能需要改造一下,才可以提高的,它目前是一个创新的方式。
(9)问:work 数据库中有三张表的数据,当 word 数据通过 OZZ 发送到对面的几个 topic ,想通过 flink 中去清洗对应的 topic 的数据,然后做 join 操作,追踪做完以后的数据,写到一张表,由于存在网络延迟, Word 中的部分数据延迟三到五天才到达数据库,导致部分表数据对不上,需要第二天守城作用,将前几天的数据写入注册表,每几小时去做一下原来的数据。
答:如果用 flink ,应该不存在这样的情况,如果需要,每小时就会一下写入,这个应该没问题,因为它会进行实时的操作。
(10)问:flink on yarn的 JobManager 是随机选择一台ResourceManager,只能获取这个 IP 的端口吗?
答:确实是随机取的,因为是 flink on Yarn的ResourceManager分配的机器,所以是随机分配的,如果想获取这个 IP 的端口办法是什么?首先有两个方法,第一个方法就是如果知道它的 ID ,通过Yarn 的 ID 是可以通过 Yarn 的 SDK Java API 接口去查到他的IP 和端口;第二个办法就是如果不知道 ID ,可以在提交一项任务的时候,是需要传一个参数 Yarn 上有一个标签的功能,通过标签可以查到是哪个 JobManager ,然后通过 JobManager 的 ID ,再到Yarn 去查它的 IP 端口。
(11)问:flink 实现是在同一个文件吗?
答:目前确实是这样的,因为和 Yarn 的工作机制有关系,它是用 taskManager 进行一个汇总,目前的解决方法一般是 Job 里面会加一些特定的一些 K ,然后自己做一个区分,比如不同的 task ,将自己的一些信息打印,就打印出一份进行区分。
(12)问:flink on yarn使用 Per Job 方式提交,每个
NodeManager 都需要提前安装 flink 吗?
答:不需要提前安装,因为它是个客户端的一个夹包,它会一起上传上去的。