Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排

简介: Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

背景介绍

单机模式

在之前的章节中,已经验证过,但是实际运行的时候,我们是需要提交到服务器去运行的。


集群模式

在之前章节中,已经验证过,(方便测试) 使用了 Docker 的方式进行部署。同时也利用容器编排工具docker-compose的方式,进行集群的模式部署:


简单测试 1 x JobManager + 1 x TaskManager

集群模式 1 x JobManager + 3 x TaskManager

HA模式 2 x JobManager + Zookeeper + 3 x TaskManager

但是对于上述的 docker-compose 编排的方式,我们依然是在一台机器上的。对于实际的生产情况,我们可能需要面对多台机器进行部署,同时最好的方案是:动态扩容、动态缩容,最大程度的利用资源。


云原生部署

首先确保你有 Kubernetes

然后为了简化 这里借助 Rancher 可视化进行操作

等熟悉了,你可以使用 yaml尝试部署。

最后,生产环境可借助 Helm + Flink Operator 极大简化过程。

JobManager

需要开放的端口如下:

完整的Yaml 如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  annotations:
    field.cattle.io/creatorId: u-s4mkr
    field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31120,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30985,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30166,"protocol":"TCP","serviceName":"flink-server:jobmanager-nodeport","allNodes":true}]'
  creationTimestamp: "2023-06-08T08:49:14Z"
  generation: 21
  labels:
    cattle.io/creator: norman
    workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
  managedFields:
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:field.cattle.io/creatorId: {}
          f:field.cattle.io/publicEndpoints: {}
        f:labels:
          .: {}
          f:cattle.io/creator: {}
          f:workload.user.cattle.io/workloadselector: {}
      f:spec:
        f:podManagementPolicy: {}
        f:replicas: {}
        f:revisionHistoryLimit: {}
        f:selector: {}
        f:serviceName: {}
        f:template:
          f:metadata:
            f:annotations:
              .: {}
              f:cattle.io/timestamp: {}
              f:field.cattle.io/ports: {}
            f:labels:
              .: {}
              f:workload.user.cattle.io/workloadselector: {}
          f:spec:
            f:containers:
              k:{"name":"jobmanager"}:
                .: {}
                f:args: {}
                f:env:
                  .: {}
                  k:{"name":"JOB_MANAGER_RPC_ADDRESS"}:
                    .: {}
                    f:name: {}
                    f:value: {}
                f:image: {}
                f:imagePullPolicy: {}
                f:name: {}
                f:ports:
                  .: {}
                  k:{"containerPort":6123,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":6124,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":8081,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                f:resources: {}
                f:securityContext:
                  .: {}
                  f:allowPrivilegeEscalation: {}
                  f:capabilities: {}
                  f:privileged: {}
                  f:readOnlyRootFilesystem: {}
                  f:runAsNonRoot: {}
                f:stdin: {}
                f:terminationMessagePath: {}
                f:terminationMessagePolicy: {}
                f:tty: {}
            f:dnsConfig: {}
            f:dnsPolicy: {}
            f:restartPolicy: {}
            f:schedulerName: {}
            f:securityContext: {}
            f:terminationGracePeriodSeconds: {}
        f:updateStrategy:
          f:type: {}
    manager: rancher
    operation: Update
    time: "2023-06-08T11:20:03Z"
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:collisionCount: {}
        f:currentReplicas: {}
        f:currentRevision: {}
        f:observedGeneration: {}
        f:readyReplicas: {}
        f:replicas: {}
        f:updateRevision: {}
        f:updatedReplicas: {}
    manager: kube-controller-manager
    operation: Update
    time: "2024-06-27T01:24:45Z"
  name: jobmanager
  namespace: flink-server
  resourceVersion: "126393476"
  uid: 461079aa-69f6-423e-85b7-36fdc5c513e7
spec:
  podManagementPolicy: OrderedReady
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
  serviceName: jobmanager
  template:
    metadata:
      annotations:
        cattle.io/timestamp: "2023-06-08T11:18:06Z"
        field.cattle.io/ports: '[[{"containerPort":8081,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm81","protocol":"TCP","sourcePort":31120},{"containerPort":6123,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm23","protocol":"TCP"},{"containerPort":6124,"dnsName":"jobmanager-nodeport","hostPort":0,"kind":"NodePort","name":"jm24","protocol":"TCP"}]]'
      creationTimestamp: null
      labels:
        workload.user.cattle.io/workloadselector: statefulSet-flink-server-jobmanager
    spec:
      containers:
      - args:
        - jobmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: jobmanager
        image: 10.10.52.8/flink/flink
        imagePullPolicy: Always
        name: jobmanager
        ports:
        - containerPort: 8081
          name: jm81
          protocol: TCP
        - containerPort: 6123
          name: jm23
          protocol: TCP
        - containerPort: 6124
          name: jm24
          protocol: TCP
        resources: {}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities: {}
          privileged: false
          readOnlyRootFilesystem: false
          runAsNonRoot: false
        stdin: true
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        tty: true
      dnsConfig: {}
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
status:
  collisionCount: 0
  currentReplicas: 1
  currentRevision: jobmanager-6ddbf8767b
  observedGeneration: 21
  readyReplicas: 1
  replicas: 1
  updateRevision: jobmanager-6ddbf8767b
  updatedReplicas: 1

TaskManager

需要开放的端口如下:

完整Yaml 如下:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  annotations:
    field.cattle.io/creatorId: u-s4mkr
    field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"port":31538,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true},{"addresses":["10.10.52.11"],"port":30067,"protocol":"TCP","serviceName":"flink-server:taskmanager-nodeport","allNodes":true}]'
  creationTimestamp: "2023-06-08T08:49:30Z"
  generation: 10
  labels:
    cattle.io/creator: norman
    workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
  managedFields:
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:field.cattle.io/creatorId: {}
          f:field.cattle.io/publicEndpoints: {}
        f:labels:
          .: {}
          f:cattle.io/creator: {}
          f:workload.user.cattle.io/workloadselector: {}
      f:spec:
        f:podManagementPolicy: {}
        f:replicas: {}
        f:revisionHistoryLimit: {}
        f:selector: {}
        f:serviceName: {}
        f:template:
          f:metadata:
            f:annotations:
              .: {}
              f:cattle.io/timestamp: {}
              f:field.cattle.io/ports: {}
              f:field.cattle.io/publicEndpoints: {}
            f:labels:
              .: {}
              f:workload.user.cattle.io/workloadselector: {}
          f:spec:
            f:containers:
              k:{"name":"taskmanager"}:
                .: {}
                f:args: {}
                f:env:
                  .: {}
                  k:{"name":"JOB_MANAGER_RPC_ADDRESS"}:
                    .: {}
                    f:name: {}
                    f:value: {}
                f:image: {}
                f:imagePullPolicy: {}
                f:name: {}
                f:ports:
                  .: {}
                  k:{"containerPort":6121,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                  k:{"containerPort":6122,"protocol":"TCP"}:
                    .: {}
                    f:containerPort: {}
                    f:name: {}
                    f:protocol: {}
                f:resources: {}
                f:securityContext:
                  .: {}
                  f:allowPrivilegeEscalation: {}
                  f:privileged: {}
                  f:readOnlyRootFilesystem: {}
                  f:runAsNonRoot: {}
                f:stdin: {}
                f:terminationMessagePath: {}
                f:terminationMessagePolicy: {}
                f:tty: {}
            f:dnsPolicy: {}
            f:restartPolicy: {}
            f:schedulerName: {}
            f:securityContext: {}
            f:terminationGracePeriodSeconds: {}
        f:updateStrategy:
          f:type: {}
    manager: rancher
    operation: Update
    time: "2023-06-08T11:20:40Z"
  - apiVersion: apps/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:collisionCount: {}
        f:currentReplicas: {}
        f:currentRevision: {}
        f:observedGeneration: {}
        f:readyReplicas: {}
        f:replicas: {}
        f:updateRevision: {}
        f:updatedReplicas: {}
    manager: kube-controller-manager
    operation: Update
    time: "2024-06-27T01:24:58Z"
  name: taskmanager
  namespace: flink-server
  resourceVersion: "126393537"
  uid: 9ddfa3b3-b497-43af-9bb6-af83ffa0aed3
spec:
  podManagementPolicy: OrderedReady
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
  serviceName: taskmanager
  template:
    metadata:
      annotations:
        cattle.io/timestamp: "2023-06-08T11:19:53Z"
        field.cattle.io/ports: '[[{"containerPort":6121,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm21","protocol":"TCP"},{"containerPort":6122,"dnsName":"taskmanager-nodeport","kind":"NodePort","name":"tm22","protocol":"TCP"}]]'
        field.cattle.io/publicEndpoints: '[{"addresses":["10.10.52.11"],"allNodes":true,"port":31538,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"},{"addresses":["10.10.52.11"],"allNodes":true,"port":30067,"protocol":"TCP","serviceId":"flink-server:taskmanager-nodeport"}]'
      creationTimestamp: null
      labels:
        workload.user.cattle.io/workloadselector: statefulSet-flink-server-taskmanager
    spec:
      containers:
      - args:
        - taskmanager
        env:
        - name: JOB_MANAGER_RPC_ADDRESS
          value: jobmanager
        image: 10.10.52.8/flink/flink
        imagePullPolicy: Always
        name: taskmanager
        ports:
        - containerPort: 6121
          name: tm21
          protocol: TCP
        - containerPort: 6122
          name: tm22
          protocol: TCP
        resources: {}
        securityContext:
          allowPrivilegeEscalation: false
          privileged: false
          readOnlyRootFilesystem: false
          runAsNonRoot: false
        stdin: true
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        tty: true
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
status:
  collisionCount: 0
  currentReplicas: 1
  currentRevision: taskmanager-6fd48fdb8d
  observedGeneration: 10
  readyReplicas: 1
  replicas: 1
  updateRevision: taskmanager-6fd48fdb8d
  updatedReplicas: 1

测试效果

控制台

(根据自己的情况)

http://10.10.52.11:31120/#/overview

扩容TaskManager

当我们对 TaskManager 进行扩容的时候,这里我扩展了 5个,正在逐步扩容,完毕之后 ,我们可以对应观察到控制台中的:

HA模式

同样的操作,你需要在 K8s 集群中用如下的方案来做高可用方案:

  • 3 x JobManager
  • N x TaskManager
  • Zookeeper
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
4月前
|
Kubernetes 安全 网络协议
Kubernetes实用指令:通过dry-run生成部署与服务的YAML配置
总结起来, 使用 ` -- dry—run = client `- o yam l' 参数能够帮助用户预览 Kubernetes 资源定义并且确保它们符合预期效果且没有立即影响现有集群断层结构. 这种做法对于新手学习 K8s 资源规范、测试新策略或者审核现有策略都非常有效率与安全.
415 4
|
5月前
|
存储 Kubernetes 网络安全
关于阿里云 Kubernetes 容器服务(ACK)添加镜像仓库的快速说明
本文介绍了在中国大陆地区因网络限制无法正常拉取 Docker 镜像的解决方案。作者所在的阿里云 Kubernetes 集群使用的是较旧版本的 containerd(1.2x),且无法直接通过 SSH 修改节点配置,因此采用了一种无需更改 Kubernetes 配置文件的方法。通过为 `docker.io` 添加 containerd 的镜像源,并使用脚本自动修改 containerd 配置文件中的路径错误(将错误的 `cert.d` 改为 `certs.d`),最终实现了通过多个镜像站点拉取镜像。作者还提供了一个可重复运行的脚本,用于动态配置镜像源。虽然该方案能缓解镜像拉取问题,
625 2
|
8月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
436 0
|
11月前
|
存储 监控 对象存储
ACK 容器监控存储全面更新:让您的应用运行更稳定、更透明
ACK 容器监控存储全面更新:让您的应用运行更稳定、更透明
334 0
ACK 容器监控存储全面更新:让您的应用运行更稳定、更透明
|
12月前
|
存储 监控 对象存储
ACK 容器监控存储全面更新:让您的应用运行更稳定、更透明
ACK 容器监控存储全面更新:让您的应用运行更稳定、更透明
259 1
|
12月前
|
监控 Kubernetes Cloud Native
基于阿里云容器服务Kubernetes版(ACK)的微服务架构设计与实践
本文介绍了如何基于阿里云容器服务Kubernetes版(ACK)设计和实现微服务架构。首先概述了微服务架构的优势与挑战,如模块化、可扩展性及技术多样性。接着详细描述了ACK的核心功能,包括集群管理、应用管理、网络与安全、监控与日志等。在设计基于ACK的微服务架构时,需考虑服务拆分、通信、发现与负载均衡、配置管理、监控与日志以及CI/CD等方面。通过一个电商应用案例,展示了用户服务、商品服务、订单服务和支付服务的具体部署步骤。最后总结了ACK为微服务架构提供的强大支持,帮助应对各种挑战,构建高效可靠的云原生应用。
|
11月前
|
存储 运维 Kubernetes
容器数据保护:基于容器服务 Kubernetes 版(ACK)备份中心实现K8s存储卷一键备份与恢复
阿里云ACK备份中心提供一站式容器化业务灾备及迁移方案,减少数据丢失风险,确保业务稳定运行。
|
12月前
|
监控 Cloud Native Java
基于阿里云容器服务(ACK)的微服务架构设计与实践
本文介绍如何利用阿里云容器服务Kubernetes版(ACK)构建高可用、可扩展的微服务架构。通过电商平台案例,展示基于Java(Spring Boot)、Docker、Nacos等技术的开发、容器化、部署流程,涵盖服务注册、API网关、监控日志及性能优化实践,帮助企业实现云原生转型。
|
Kubernetes 调度 Apache
Docker 编排工具比较:Kubernetes、Docker Swarm 和 Mesos,选择最适合你的容器编排方案
Docker 编排工具比较:Kubernetes、Docker Swarm 和 Mesos,选择最适合你的容器编排方案
567 0
|
存储 Kubernetes Linux
在Linux中,如何使用Docker和Kubernetes管理容器?
在Linux中,如何使用Docker和Kubernetes管理容器?

推荐镜像

更多