Kubernetes fror Flink 硬气功实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kubernetes fror Flink 硬气功实践


Kubernetes fror Flink 硬气功实践


CTO 技术共享以前做的笔记,拿出来大家分享分享

一、概述


Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。

二、Flink 运行模式

官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/overview/

FLink on yarn 有三种运行模式:

  • yarn-session 模式(Seesion Mode)
  • yarn-cluster 模式(Per-Job Mode)
  • Application 模式(Application Mode)

网络异常,图片无法展示
|

【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在 FLINK-26000 中。

三、Flink on k8s 实战操作

网络异常,图片无法展示
|

1)flink 下载

下载地址:https://flink.apache.org/downloads.html

wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

2)构建基础镜像

docker pull apache/flink:1.14.6-scala_2.12docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

3)session 模式

Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。 Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署
  • TaskManagers池的部署
  • 暴露JobManager 的 REST 和 UI 端口的服务

1、Native Kubernetes 模式

参数配置: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace

【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN export LANG=zh_CN.UTF-8

开始构建镜像

docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
【2】创建命名空间和 serviceaccount
# 创建namespacekubectl create ns flink# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建 flink 集群
./bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=my-first-flink-cluster  \ -Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dkubernetes.rest-service.exposed.type=NodePort


网络异常,图片无法展示
|


网络异常,图片无法展示
|


【4】提交任务
./bin/flink run \    --target kubernetes-session \    -Dkubernetes.cluster-id=my-first-flink-cluster \    -Dkubernetes.namespace=flink \    -Dkubernetes.jobmanager.service-account=flink-service-account \    ./examples/streaming/TopSpeedWindowing.jar            #   参数配置    ./examples/streaming/WordCount.jar    -Dkubernetes.taskmanager.cpu=2000m \    -Dexternal-resource.limits.kubernetes.cpu=4000m \ -Dexternal-resource.limits.kubernetes.memory=10Gi \ -Dexternal-resource.requests.kubernetes.cpu=2000m \ -Dexternal-resource.requests.kubernetes.memory=8Gi \ -Dkubernetes.taskmanager.cpu=2000m \

【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。

网络异常,图片无法展示
|

【5】查看
kubectl get pods -n flinkkubectl logs -f my-first-flink-cluster-taskmanager-1-1


网络异常,图片无法展示
|


网络异常,图片无法展示
|

【6】删除 flink 集群
kubectl delete deployment/my-first-flink-cluster -n flinkkubectl delete ns flink --force

2、Standalone 模式

【1】构建镜像

默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。

启动脚本 docker-entrypoint.sh
#!/usr/bin/env bash
################################################################################  Licensed to the Apache Software Foundation (ASF) under one#  or more contributor license agreements.  See the NOTICE file#  distributed with this work for additional information#  regarding copyright ownership.  The ASF licenses this file#  to you under the Apache License, Version 2.0 (the#  "License"); you may not use this file except in compliance#  with the License.  You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0##  Unless required by applicable law or agreed to in writing, software#  distributed under the License is distributed on an "AS IS" BASIS,#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#  See the License for the specific language governing permissions and# limitations under the License.###############################################################################
COMMAND_STANDALONE="standalone-job"COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager addressJOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {    if [ $(id -u) != 0 ]; then        # Don't need to drop privs if EUID != 0        return    elif [ -x /sbin/su-exec ]; then        # Alpine        echo su-exec admin    else        # Others        echo gosu admin    fi}
copy_plugins_if_required() {  if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then    return 0  fi
  echo "Enabling required built-in plugins"  for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do    echo "Linking ${target_plugin} to plugin directory"    plugin_name=${target_plugin%.jar}
    mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"    if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then      echo "Plugin ${target_plugin} does not exist. Exiting."      exit 1    else      ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"      echo "Successfully enabled ${target_plugin}"    fi  done}
set_config_option() {  local option=$1  local value=$2
  # escape periods for usage in regular expressions  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  # either override an existing entry, or append a new one  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"  else        echo "${option}: ${value}" >> "${CONF_FILE}"  fi}
prepare_configuration() {    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}    set_config_option blob.server.port 6124    set_config_option query.server.port 6125
    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}    fi
    if [ -n "${FLINK_PROPERTIES}" ]; then        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"    fi    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"}
maybe_enable_jemalloc() {    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then        JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"        JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"        if [ -f "$JEMALLOC_PATH" ]; then            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH        elif [ -f "$JEMALLOC_FALLBACK" ]; then            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK        else            if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then                MSG_PATH=$JEMALLOC_PATH            else                MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"            fi            echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."        fi    fi}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")if [ "$1" = "help" ]; then    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"    printf "    Or $(basename "$0") help\n\n"    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"    exit 0elif [ "$1" = "jobmanager" ]; then    args=("${args[@]:1}")
    echo "Starting Job Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"elif [ "$1" = ${COMMAND_STANDALONE} ]; then    args=("${args[@]:1}")
    echo "Starting Job Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then    args=("${args[@]:1}")
    echo "Starting History Server"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"elif [ "$1" = "taskmanager" ]; then    args=("${args[@]:1}")
    echo "Starting Task Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"fi
args=("${args[@]}")
# Running command in pass-through modeexec $(drop_privs_cmd) "${args[@]}"

编排 Dockerfile

FROM myharbor.com/bigdata/centos:7.9.2009
USER root
# 安装常用工具RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
# 设置时区,默认是UTC时区RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/
ENV FLINK_HOME /opt/apache/flink-1.14.6ENV JAVA_HOME /opt/apache/jdk1.8.0_212ENV PATH $JAVA_HOME/bin:$PATH
# 创建用户应用jar目录RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir homeCOPY docker-entrypoint.sh /opt/apache/RUN chmod +x /opt/apache/docker-entrypoint.sh
RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUN chown -R admin:admin /opt/apache
#设置的工作目录WORKDIR $FLINK_HOME
# 对外暴露端口EXPOSE 6123 8081
# 执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]CMD ["help"]

开始构建镜像

docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
# 删除镜像docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
【2】创建命名空间和 serviceaccount
# 创建namespacekubectl create ns flink# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

【3】编排 yaml 文件 flink-configuration-configmap.yamlapiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2


log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender


# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO
# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
  • jobmanager-service.yaml可选服务,仅非 HA 模式需要。
apiVersion: v1kind: Servicemetadata:  name: flink-jobmanagerspec:  type: ClusterIP  ports:  - name: rpc    port: 6123  - name: blob-server    port: 6124  - name: webui    port: 8081  selector:    app: flink    component: jobmanager
  • jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。
apiVersion: v1kind: Servicemetadata:  name: flink-jobmanager-restspec:  type: NodePort  ports:  - name: rest    port: 8081    targetPort: 8081    nodePort: 30081  selector:    app: flink    component: jobmanager
  • taskmanager-query-state-service.yaml 可选服务,公开 TaskManager
  • 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
apiVersion: v1kind: Servicemetadata:  name: flink-taskmanager-query-statespec:  type: NodePort  ports:  - name: query-state    port: 6125    targetPort: 6125    nodePort: 30025  selector:    app: flink    component: taskmanager

以上几个配置文件是公共的

  • jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1kind: Deploymentmetadata:  name: flink-jobmanagerspec:  replicas: 1  selector:    matchLabels:      app: flink      component: jobmanager  template:    metadata:      labels:        app: flink        component: jobmanager    spec:      containers:      - name: jobmanager        image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12        args: ["jobmanager"]        ports:        - containerPort: 6123          name: rpc        - containerPort: 6124          name: blob-server        - containerPort: 8081          name: webui        livenessProbe:          tcpSocket:            port: 6123          initialDelaySeconds: 30          periodSeconds: 60        volumeMounts:        - name: flink-config-volume          mountPath: /opt/apache/flink-1.14.6/conf/        securityContext:          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary      volumes:      - name: flink-config-volume        configMap:          name: flink-config          items:          - key: flink-conf.yaml            path: flink-conf.yaml          - key: log4j-console.properties            path: log4j-console.properties
  • taskmanager-session-deployment.yaml
apiVersion: apps/v1kind: Deploymentmetadata:  name: flink-taskmanagerspec:  replicas: 2  selector:    matchLabels:      app: flink      component: taskmanager  template:    metadata:      labels:        app: flink        component: taskmanager    spec:      containers:      - name: taskmanager        image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12        args: ["taskmanager"]        ports:        - containerPort: 6122          name: rpc        - containerPort: 6125          name: query-state        livenessProbe:          tcpSocket:            port: 6122          initialDelaySeconds: 30          periodSeconds: 60        volumeMounts:        - name: flink-config-volume          mountPath: /opt/apache/flink-1.14.6/conf/        securityContext:          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary      volumes:      - name: flink-config-volume        configMap:          name: flink-config          items:          - key: flink-conf.yaml            path: flink-conf.yaml          - key: log4j-console.properties            path: log4j-console.properties
【4】创建 flink 集群
kubectl create ns flink# Configuration and service definitionkubectl create -f flink-configuration-configmap.yaml -n flink
# servicekubectl create -f jobmanager-service.yaml -n flinkkubectl create -f jobmanager-rest-service.yaml -n flinkkubectl create -f taskmanager-query-state-service.yaml -n flink
# Create the deployments for the clusterkubectl create -f jobmanager-session-deployment-non-ha.yaml -n flinkkubectl create -f taskmanager-session-deployment.yaml -n flink

镜像逆向解析 dockerfile

alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"whaler flink:1.14.6-scala_2.12

查看

kubectl get pods,svc -n flink -owide


网络异常,图片无法展示
|

web:http://192.168.182.110:30081/#/overview

网络异常,图片无法展示
|

【5】提交任务
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar


网络异常,图片无法展示
|


kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink


网络异常,图片无法展示
|


网络异常,图片无法展示
|

【6】删除 flink 集群
kubectl delete -f jobmanager-service.yaml -n flinkkubectl delete -f flink-configuration-configmap.yaml -n flinkkubectl delete -f taskmanager-session-deployment.yaml -n flinkkubectl delete -f jobmanager-session-deployment.yaml -n flinkkubectl delete ns flink --force
【7】访问 flink web

端口就是jobmanager-rest-service.yaml文件中的 NodePort

http://192.168.182.110:30081/#/overview

网络异常,图片无法展示
|

4)application 模式(推荐)

Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:

  • 运行JobManager的应用程序
  • TaskManagers池的部署
  • 暴露JobManager 的 REST 和 UI 端口的服务

1、Native Kubernetes 模式(常用)

【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN export LANG=zh_CN.UTF-8RUN mkdir -p $FLINK_HOME/usrlibCOPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/

开始构建镜像

docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
# 删除镜像docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
【2】创建命名空间和 serviceacount
# 创建namespacekubectl create ns flink# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建 flink 集群并提交任务
./bin/flink run-application \    --target kubernetes-application \    -Dkubernetes.cluster-id=my-first-application-cluster  \ -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \ -Dkubernetes.jobmanager.replicas=1 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dexternal-resource.limits.kubernetes.cpu=2000m \ -Dexternal-resource.limits.kubernetes.memory=2Gi \ -Dexternal-resource.requests.kubernetes.cpu=1000m \ -Dexternal-resource.requests.kubernetes.memory=1Gi \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/TopSpeedWindowing.jar

【注意】 local是应用模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

查看

kubectl get pods pods,svc -n flink


网络异常,图片无法展示
|


kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink


网络异常,图片无法展示
|


网络异常,图片无法展示
|


网络异常,图片无法展示
|


【4】删除flink集群kubectl delete deployment/my-first-application-cluster -n flinkkubectl delete ns flink --force2、Standalone模式【1】构建镜像 Dockerfile启动脚本 docker-entrypoint.sh
#!/usr/bin/env bash
################################################################################  Licensed to the Apache Software Foundation (ASF) under one#  or more contributor license agreements.  See the NOTICE file#  distributed with this work for additional information#  regarding copyright ownership.  The ASF licenses this file#  to you under the Apache License, Version 2.0 (the#  "License"); you may not use this file except in compliance#  with the License.  You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0##  Unless required by applicable law or agreed to in writing, software#  distributed under the License is distributed on an "AS IS" BASIS,#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#  See the License for the specific language governing permissions and# limitations under the License.###############################################################################
COMMAND_STANDALONE="standalone-job"COMMAND_HISTORY_SERVER="history-server"
# If unspecified, the hostname of the container is taken as the JobManager addressJOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {    if [ $(id -u) != 0 ]; then        # Don't need to drop privs if EUID != 0        return    elif [ -x /sbin/su-exec ]; then        # Alpine        echo su-exec admin    else        # Others        echo gosu admin    fi}
copy_plugins_if_required() {  if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then    return 0  fi
  echo "Enabling required built-in plugins"  for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do    echo "Linking ${target_plugin} to plugin directory"    plugin_name=${target_plugin%.jar}
    mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"    if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then      echo "Plugin ${target_plugin} does not exist. Exiting."      exit 1    else      ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"      echo "Successfully enabled ${target_plugin}"    fi  done}
set_config_option() {  local option=$1  local value=$2
  # escape periods for usage in regular expressions  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  # either override an existing entry, or append a new one  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"  else        echo "${option}: ${value}" >> "${CONF_FILE}"  fi}
prepare_configuration() {    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}    set_config_option blob.server.port 6124    set_config_option query.server.port 6125
    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}    fi
    if [ -n "${FLINK_PROPERTIES}" ]; then        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"    fi    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"}
maybe_enable_jemalloc() {    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then        JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"        JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"        if [ -f "$JEMALLOC_PATH" ]; then            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH        elif [ -f "$JEMALLOC_FALLBACK" ]; then            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK        else            if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then                MSG_PATH=$JEMALLOC_PATH            else                MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"            fi            echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."        fi    fi}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")if [ "$1" = "help" ]; then    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"    printf "    Or $(basename "$0") help\n\n"    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"    exit 0elif [ "$1" = "jobmanager" ]; then    args=("${args[@]:1}")
    echo "Starting Job Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"elif [ "$1" = ${COMMAND_STANDALONE} ]; then    args=("${args[@]:1}")
    echo "Starting Job Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then    args=("${args[@]:1}")
    echo "Starting History Server"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"elif [ "$1" = "taskmanager" ]; then    args=("${args[@]:1}")
    echo "Starting Task Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"fi
args=("${args[@]}")
# Running command in pass-through modeexec $(drop_privs_cmd) "${args[@]}"

编排Dockerfile

FROM myharbor.com/bigdata/centos:7.9.2009
USER root
# 安装常用工具RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
# 设置时区,默认是UTC时区RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/
ENV FLINK_HOME /opt/apache/flink-1.14.6ENV JAVA_HOME /opt/apache/jdk1.8.0_212ENV PATH $JAVA_HOME/bin:$PATH
# 创建用户应用jar目录RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir homeCOPY docker-entrypoint.sh /opt/apache/
RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUN chown -R admin:admin /opt/apacheRUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh
#设置的工作目录WORKDIR $FLINK_HOME
# 对外暴露端口EXPOSE 6123 8081
# 执行脚本,构建镜像时不执行,运行实例才会执行ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]CMD ["help"]docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
# 上传镜像docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
# 删除镜像docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
【2】创建命名空间和 serviceacount
# 创建namespacekubectl create ns flink# 创建serviceaccountkubectl create serviceaccount flink-service-account -n flink# 用户授权kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件

flink-configuration-configmap.yamlapiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2


log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender


# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO
# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF

jobmanager-service.yaml可选服务,仅非 HA 模式需要。

apiVersion: v1kind: Servicemetadata:  name: flink-jobmanagerspec:  type: ClusterIP  ports:  - name: rpc    port: 6123  - name: blob-server    port: 6124  - name: webui    port: 8081  selector:    app: flink    component: jobmanager

jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

apiVersion: v1kind: Servicemetadata:  name: flink-jobmanager-restspec:  type: NodePort  ports:  - name: rest    port: 8081    targetPort: 8081    nodePort: 30081  selector:    app: flink    component: jobmanager

taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

apiVersion: v1kind: Servicemetadata:  name: flink-taskmanager-query-statespec:  type: NodePort  ports:  - name: query-state    port: 6125    targetPort: 6125    nodePort: 30025  selector:    app: flink    component: taskmanager

jobmanager-application-non-ha.yaml ,非高可用

apiVersion: batch/v1kind: Jobmetadata:  name: flink-jobmanagerspec:  template:    metadata:      labels:        app: flink        component: jobmanager    spec:      restartPolicy: OnFailure      containers:        - name: jobmanager          image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12          env:          args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]          ports:            - containerPort: 6123              name: rpc            - containerPort: 6124              name: blob-server            - containerPort: 8081              name: webui          livenessProbe:            tcpSocket:              port: 6123            initialDelaySeconds: 30            periodSeconds: 60          volumeMounts:            - name: flink-config-volume              mountPath: /opt/apache/flink-1.14.6/conf            - name: job-artifacts-volume              mountPath: /opt/apache/flink-1.14.6/usrlib          securityContext:            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary      volumes:        - name: flink-config-volume          configMap:            name: flink-config            items:              - key: flink-conf.yaml                path: flink-conf.yaml              - key: log4j-console.properties                path: log4j-console.properties        - name: job-artifacts-volume          hostPath:            path: /mnt/nfsdata/flink/application/job-artifacts

【温馨提示】注意这里的挂载/mnt/bigdata/flink/usrlib,最好这里使用共享目录。

taskmanager-job-deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata:  name: flink-taskmanagerspec:  replicas: 2  selector:    matchLabels:      app: flink      component: taskmanager  template:    metadata:      labels:        app: flink        component: taskmanager    spec:      containers:      - name: taskmanager        image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12        env:        args: ["taskmanager"]        ports:        - containerPort: 6122          name: rpc        - containerPort: 6125          name: query-state        livenessProbe:          tcpSocket:            port: 6122          initialDelaySeconds: 30          periodSeconds: 60        volumeMounts:        - name: flink-config-volume          mountPath: /opt/apache/flink-1.14.6/conf        - name: job-artifacts-volume          mountPath: /opt/apache/flink-1.14.6/usrlib        securityContext:          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary      volumes:      - name: flink-config-volume        configMap:          name: flink-config          items:          - key: flink-conf.yaml            path: flink-conf.yaml          - key: log4j-console.properties            path: log4j-console.properties      - name: job-artifacts-volume        hostPath:          path: /mnt/nfsdata/flink/application/job-artifacts
【4】创建 flink 集群并提交任务
kubectl create ns flink# Configuration and service definitionkubectl create -f flink-configuration-configmap.yaml -n flink
# servicekubectl create -f jobmanager-service.yaml -n flinkkubectl create -f jobmanager-rest-service.yaml -n flinkkubectl create -f taskmanager-query-state-service.yaml -n flink
# Create the deployments for the clusterkubectl create -f  jobmanager-application-non-ha.yaml -n flinkkubectl create -f  taskmanager-job-deployment.yaml -n flink

查看

kubectl get pods,svc -n flink


网络异常,图片无法展示
|

【5】删除 flink 集群
kubectl delete -f flink-configuration-configmap.yaml -n flinkkubectl delete -f jobmanager-service.yaml -n flinkkubectl delete -f jobmanager-rest-service.yaml -n flinkkubectl delete -f taskmanager-query-state-service.yaml -n flinkkubectl delete -f jobmanager-application-non-ha.yaml -n flinkkubectl delete -f taskmanager-job-deployment.yaml -n flink
kubectl delete ns flink --force
【6】查看
kubectl get pods,svc -n flinkkubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash


网络异常,图片无法展示
|



相关文章
|
2月前
|
Kubernetes Cloud Native Docker
云原生时代的容器化实践:Docker和Kubernetes入门
【10月更文挑战第37天】在数字化转型的浪潮中,云原生技术成为企业提升敏捷性和效率的关键。本篇文章将引导读者了解如何利用Docker进行容器化打包及部署,以及Kubernetes集群管理的基础操作,帮助初学者快速入门云原生的世界。通过实际案例分析,我们将深入探讨这些技术在现代IT架构中的应用与影响。
134 2
|
2月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
12天前
|
SQL 存储 Apache
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
419 5
基于 Flink 进行增量批计算的探索与实践
|
26天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
397 2
探索Flink动态CEP:杭州银行的实战案例
|
5天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
112 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
1月前
|
人工智能 运维 监控
阿里云ACK容器服务生产级可观测体系建设实践
本文整理自2024云栖大会冯诗淳(花名:行疾)的演讲,介绍了阿里云容器服务团队在生产级可观测体系建设方面的实践。冯诗淳详细阐述了容器化架构带来的挑战及解决方案,强调了可观测性对于构建稳健运维体系的重要性。文中提到,阿里云作为亚洲唯一蝉联全球领导者的容器管理平台,其可观测能力在多项关键评测中表现优异,支持AI、容器网络、存储等多个场景的高级容器可观测能力。此外,还介绍了阿里云容器服务在多云管理、成本优化等方面的最新进展,以及即将推出的ACK AI助手2.0,旨在通过智能引擎和专家诊断经验,简化异常数据查找,缩短故障响应时间。
阿里云ACK容器服务生产级可观测体系建设实践
|
1月前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
阿里云容器服务ACK提供强大的产品能力,支持弹性、调度、可观测、成本治理和安全合规。针对拥有IDC或三方资源的企业,ACK One分布式云容器平台能够有效解决资源管理、多云多集群管理及边缘计算等挑战,实现云上云下统一管理,提升业务效率与稳定性。
|
1月前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
Kubernetes Cloud Native 微服务
云原生入门与实践:Kubernetes的简易部署
云原生技术正改变着现代应用的开发和部署方式。本文将引导你了解云原生的基础概念,并重点介绍如何使用Kubernetes进行容器编排。我们将通过一个简易的示例来展示如何快速启动一个Kubernetes集群,并在其上运行一个简单的应用。无论你是云原生新手还是希望扩展现有知识,本文都将为你提供实用的信息和启发性的见解。
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。

热门文章

最新文章