批处理系统:Batch批量计算与云原生Serverless Argo Workflows

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
函数计算FC,每月15万CU 3个月
简介: 本文对比了Batch批量计算与Serverless Argo Workflows在容器化批处理任务中的应用,分析了两者在任务定义、依赖关系、规模并发、高级编排、可移植性等方面的异同,帮助技术决策者根据自身需求选择合适的平台。

【阅读原文】戳:批处理系统:Batch批量计算与云原生Serverless Argo Workflows

随着自动驾驶、科学计算等领域对技术需求的不断深化,以及Kubernetes生态系统日趋丰富,容器化已成为批处理任务执行的主流模式。面对这一趋势,市场提供了两大类解决方案:一类是以Batch批量计算为代表的云服务商自主研发的封闭式平台,另一类则是围绕开源项目Argo Workflows构建的开放兼容平台。

 

对于企业研发团队而言,明智地选择符合自身业务需求的批处理任务平台至关重要,这直接关系到开发效率、成本控制及未来技术的可扩展性。本文将以一个典型的数据处理应用场景为案例,深入对比Batch批量计算与Argo Workflows的核心特性及适用场景,辅助技术决策者做出更加贴切的选择。

 

 

 

 

一、案例

 

 

 

如下图所示是一个典型的数据处理任务,第一步使用64个Pod进行数据处理,将128个文件合并成成64个文件,第二步使用32个Pod进行第二步的数据处理,将64个文件合并成32个文件,最后一步启动一个Pod进行最终结果的计算并输出到对象存储中。

 

架构图:

 

 

 

 

 

二、通过Batch批量计算实现

 

 

1. 原理

 

 

Batch批量计算通常是一项完全托管服务,可让您以任何规模运行批处理式工作负载。以下流程描述 Batch批量计算如何运行每个作业。

 

1. 创建作业定义,其指定如何运行作业,同时提供权限、内存和CPU要求以及其他配置选项。

 

2. 将作业提交到托管的Batch批量计算作业队列,作业将一直驻留在该队列中,直到被安排在计算环境中进行处理。

 

3. Batch批量计算计算队列中每个作业的CPU、内存和GPU要求,并在计算环境中调度计算资源以处理作业。

 

4. Batch批量计算调度程序将作业放入相应的Batch计算环境进行处理。

 

5. 作业成功或失败退出,将输出结果写入用户定义的存储空间。

 

 

 

2. 创建任务定义

 

 

第一步任务定义,构建process-data、merge-data等任务定义,需要准备镜像,启动参数,所需资源等。Batch批量计算服务通常提供友好的控制台交互,为易于编程和避免频繁的控制台演示,我们直接使用Json的方式定义任务。

 

a. process-data

 

{
  "type": "container",
  "containerProperties": { # 执行命令
    "command": [
      "python",
      "process.py"
    ],
    "image": "python:3.11-amd", # 镜像地址
    "resourceRequirements": [ # 资源需要
      {
        "type": "VCPU", 
        "value": "1.0"
      },
      {
        "type": "MEMORY",
        "value": "2048"
      }
    ],
    "runtimePlatform": {
      "cpuArchitecture": "X86_64", # cpu架构
      "operatingSystemFamily": "LINUX"
    },
    "networkConfiguration": {
      "assignPublicIp": "DISABLED"
    },
    "executionRoleArn": "role::xxxxxxx", # 权限
  },
  "platformCapabilities": [ # 后端资源:服务器或者Serverless Container
    "Serverless Container"
  ],
  "jobDefinitionName": "process-data" # Job名称
}

 

b. merge-data:

 

{
  "type": "container",
  "containerProperties": { # 执行命令,merge.py
    "command": [
      "python",
      "merge.py"
    ],
    "image": "python:3.11-amd", # 镜像
    "resourceRequirements": [ # 资源需要
      {
        "type": "VCPU",
        "value": "1.0"
      },
      {
        "type": "MEMORY",
        "value": "2048"
      }
    ],
    "runtimePlatform": {
      "cpuArchitecture": "X86_64",
      "operatingSystemFamily": "LINUX"
    },
    "networkConfiguration": {
      "assignPublicIp": "ENABLED"
    },
    "executionRoleArn": "role::xxxx", # 权限
    "repositoryCredentials": {},
  },
  "platformCapabilities": [ # 后端资源:服务器或者Serverless Container
    "Serverless Container"
  ],
  "jobDefinitionName": "merge-data" # Job名称
}

 

 

3. 提交任务并构建依赖关系

 

a. 定义并提交process-data-l1 Job

 

Job定义:

 

{
  "jobName": "process-data-l1",
  "jobDefinition": "arn::xxxx:job-definition/process-data:1", # Job使用的定义
  "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列
  "dependsOn": [],
  "arrayProperties": { # 启动任务数
    "size": 64
  },
  "retryStrategy": {},
  "timeout": {},
  "parameters": {},
  "containerOverrides": {
    "resourceRequirements": [],
    "environment": []
  }
}

 

提交获取Job id:

 

# batch submit process-data-l1 | get job-id
job-id: b617f1a3-6eeb-4118-8142-1f855053b347

 

b. 提交process-data-l2 Job

 

该Job依赖process-data-l1 Job。

 

{
  "jobName": "process-data-l2",
  "jobDefinition": "arn::xxxx:job-definition/process-data:2", # Job使用的定义
  "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列
  "dependsOn": [
    {
      "jobId": "b617f1a3-6eeb-4118-8142-1f855053b347" # process-data-l1的job Id
    }
  ],
  "arrayProperties": { # 启动任务数
    "size": 32
  },
  "retryStrategy": {},
  "timeout": {},
  "parameters": {},
  "containerOverrides": {
    "resourceRequirements": [],
    "environment": []
  }
}

 

提交获取Job id:

 

# batch submit process-data-l2 | get job-id
job-id: 6df68b3e-4962-4e4f-a71a-189be25b189c

 

c. 提交merge-data Job

 

该Job依赖process-data-l2 Job。

 

{
  "jobName": "merge-data",
  "jobDefinition": "arn::xxxx:job-definition/merge-data:1", # Job使用的定义
  "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Merge Job使用的队列
  "dependsOn": [
    {
      "jobId": "6df68b3e-4962-4e4f-a71a-189be25b189c" # process-data-l2的job Id
    }
  ],
  "arrayProperties": {},
  "retryStrategy": {},
  "timeout": {},
  "parameters": {},
  "containerOverrides": {
    "resourceRequirements": [],
    "environment": []
  }
}

 

提交Job:

 

batch submit merge-data

 

 

4. 观察任务运行

 

 

所有任务均按序正常运行。

 

 

 

 

三、通过Argo Workflows实现

 

 

 

1. 原理

 

 

Serverless Argo Workflows是阿里云的一项全托管服务,基于开源Argo Workflow项目构建,完全符合开源工作流标准,可以让您在Kubernetes上运行任何规模的批处理负载,采用无服务器模式,使用阿里云弹性容器实例ECI运行工作流,通过优化Kubernetes集群参数,实现大规模工作流的高效弹性调度,同时配合抢占式ECI实例,优化成本。下面流程介绍Serverless Argo如何运行每个作业:

 

1)作业定义,指定如何运行作业,包括每个作业的CPU、内存、镜像、执行命令等。作业依赖关系定义,包括串行、并行循环、重试等。

 

2)提交Workflow到Serverless Argo集群。

 

3)Serverless Argo评估每个作业的资源,并调度弹性实例运行作业。

 

 

 

 

2. 创建任务定义并构建依赖关系

 

 

 

在构建Workflow时,定义Job和其依赖关系在同一个文件中定义。

 

第一步,定义process-data、和merge-data任务Template,描述每个任务的的镜像和启动参数。

 

第二步,定义Step/DAG Template,描述任务的并、串行执行关系。

 

第三步,将Template、依赖关系,存储、输入参数等整合成Workflow。

 

构建Workflow通常有两种方式,一种是通过Yaml方式构建,另一种是通过Python SDK构建,下边分别展示这两种方式的构建方法。

 

a. 通过Yaml方式构建

 

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: process-data- # 数据处理工作流
spec:
  entrypoint: main
  volumes: # 对象存储挂载
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  arguments:
    parameters:
      - name: numbers
        value: "64"
  templates:
    - name: main
      steps:
        - - name: process-data-l1 # 第一级处理,启动64个Pods,Merge 128 个files
            template: process-data
            arguments:
              parameters:
                - name: file_number
                  value: "{{item}}"
                - name: level
                  value: "1"
            withSequence:
              count: "{{workflow.parameters.numbers}}"
        - - name: process-data-l2 # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动
            template: process-data
            arguments:
              parameters:
                - name: file_number
                  value: "{{item}}"
                - name: level
                  value: "2"
            withSequence:
              count: "{{=asInt(workflow.parameters.numbers)/2}}"
        - - name: merge-data # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动
            template: merge-data
            arguments:
              parameters:
                - name: number
                  value: "{{=asInt(workflow.parameters.numbers)/2}}"
    - name: process-data # process-data 任务定义
      inputs:
        parameters:
          - name: file_number
          - name: level
      container:
        image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd
        imagePullPolicy: Always
        command: [python3] # command
        args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# 接收输入的参数,启动多少个pod进行处理
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    - name: merge-data # merge-data 任务定义
      inputs:
        parameters:
          - name: number
      container:
        image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd
        imagePullPolicy: Always
        command: [python3]
        args: ["merge.py", "0", "{{inputs.parameters.number}}"] # 接收输入的参数,处理多少个文件。
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol

 

提交Workflow:

 

argo submit process-data.yaml

 

b. 通过Python SDK构建

 

from hera.workflows import Container, Parameter, Steps, Workflow, Volume
import urllib3
urllib3.disable_warnings()
# 配置访问地址和token
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx" # 填入集群的token
global_config.verify_ssl = ""
with Workflow(
    generate_name="process-data-", # 数据处理工作流
    entrypoint="main",
    volumes=[m.Volume(name="workdir", persistent_volume_claim={"claim_name": "pvc-oss"})], # 对象存储挂载
    arguments=[Parameter(name="numbers", value="64")]
) as w:
    process-data = Container( # process-data 任务定义
        name="process-data",
        inputs=[Parameter(name="file_number"), Parameter(name="level")],
        image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd",
        command=["python3"],
        args=["process.py","{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"],
        volume_mounts=[
            m.VolumeMount(name="workdir", mount_path="/mnt/vol"),
        ],
    )
    merge-data = Container( # merge-data 任务定义
        name="merge-data",
        inputs=[Parameter(name="number")],
        image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd",
        command=["python3"],
        args=["merge.py", "0", "{{inputs.parameters.number}}"],
        volume_mounts=[
            m.VolumeMount(name="workdir", mount_path="/mnt/vol"),
        ],
    )
    with Steps(name="main") as s:
        process-data( 
            name="process-data-l1",
            arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="1")],
        ) # 第一级处理,启动64个Pods,Merge 128 个files
        process-data(
            name="process-data-l2",
            arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="2")],
        ) # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动
        merge-data(
            name="merge-data",
            arguments=[Parameter(name="number", value="{{=asInt(workflow.parameters.numbers)/2}}")],
        ) # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动
# 创建workflow
w.create()

 

提交任务:

 

python process.py

 

 

 

3. 提交并观察任务运行

 

 

通过Yaml或者Python SDK方式构建并提交后,即可在Argo Server控制台查看工作流运行状态。

 

 

工作流按序正常执行成功。

 

 

 

 

四、对比

 

 

 

可以看出Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持。尽管它们的核心目标相似,但在任务定义、使用场景、灵活性以及资源管理等方面存在一些关键差异。以下是一个简要的对比。

 

 

 

 

 

五、总结

 

 

 

Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持,选择Argo Workflows还是Batch批量计算主要取决于您的的技术栈、对云供应商的依赖程度、工作流的复杂性和对控制权的需求。如果您的团队熟悉Kubernetes并且需要高度定制化的工作流,Argo Workflows可能是更好的选择。相反,如果你在云厂商生态系统内运作,寻求简单易用且能与云厂商其他服务紧密集成的解决方案,Batch批量计算可能更适合您。

 

ACK One Serverles Argo团队是国内最早使用和维护Argo Workflows的团队之一,在Argo Workflows使用方面积累众多的最佳实践,如果您需要使用Argo Workflows调度大规模工作流,欢迎加入钉钉群号一同交流:35688562

 

 

参考:

 

全托管Serverless Argo工作流:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12

 

Argo Workflows:

https://github.com/argoproj/argo-workflows

 

创建工作流集群:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster

 

PythonSDK助力大规模Argo Workflows构建:

https://mp.weixin.qq.com/s/_2Glhuy6YJEM4ZRMDwE7JA




我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。

欢迎关注 “阿里云基础设施”同名微信微博知乎

获取关于我们的更多信息~

相关文章
|
26天前
|
Kubernetes Cloud Native 调度
云原生批量任务编排引擎Argo Workflows发布3.6,一文解析关键新特性
Argo Workflows是CNCF毕业项目,最受欢迎的云原生工作流引擎,专为Kubernetes上编排批量任务而设计,本文主要对最新发布的Argo Workflows 3.6版本的关键新特性做一个深入的解析。
|
5月前
|
Prometheus Kubernetes Cloud Native
云原生周刊:Argo Rollouts 支持 Kubernetes Gateway API 1.0 | 2024.7.1
探索开源世界:Kubetools的推荐系统[Krs](https://github.com/kubetoolsca/krs)助力K8s优化,追踪K8s组件清单,指引IAC集成。阅读建议: Prometheus与Thanos的进化故事,Adidas容器平台管理经验,K8s请求实现详解。关注云原生:Argo Rollouts支持Gateway API 1.0,Kubewarden v1.14强化策略与镜像安全。
|
4月前
|
运维 Kubernetes Serverless
Serverless Argo Workflows荣获信通院标杆实践案例,引领大规模离线任务处理新方法
阿里云容器服务Serverless Argo Workflows大规模离线计算工作流平台荣获2024信通院Serveless实践标杆案例。本文介绍其应用场景、平台特性以及领域实践。
|
4月前
|
Kubernetes 安全 Serverless
Kubernetes云原生问题之在Serverless Container中,Pod运行如何解决
Kubernetes云原生问题之在Serverless Container中,Pod运行如何解决
73 5
|
4月前
|
Cloud Native Serverless 云计算
云原生时代的技术演进:从微服务到Serverless
在数字化转型的浪潮中,云原生技术正成为推动企业IT架构现代化的重要力量。本文将探讨云原生技术的关键组成部分—微服务与Serverless架构—如何助力企业实现敏捷开发和高效运维。通过深入分析这两种架构模式的优势与挑战,我们旨在为读者揭示云原生环境下的最佳实践和未来发展趋势。
|
6月前
|
人工智能 运维 Cloud Native
云原生技术实践营 · 深圳站——Serverless + AI 专场邀您参会!
云原生技术实践营深圳站聚焦Serverless+AI,6月21日探索技术结合新思路,助力降低GPU成本,简化开发运维。限额80人,报名审核制,活动涵盖Serverless趋势、AIGC应用、AI算力优化等议题,还有实操体验与现场抽奖。
云原生技术实践营 · 深圳站——Serverless + AI 专场邀您参会!
|
6月前
|
人工智能 Cloud Native Serverless
云原生技术实践营 · 深圳站:Serverless + AI 专场开启报名!
“云原生技术实践营 · 深圳站 ——Serverless + AI 应用开发专场”是一场以 Serverless 为主题的技术活动,通过一个下午的时间增进对 Serverless 技术的理解,快速上手,活动受众以关注 Serverless 技术的开发者、企业决策人、云原生领域创业者为主,活动形式为演讲、动手实操。
|
6月前
|
人工智能 运维 Cloud Native
活动回顾丨云原生技术实践营 Serverless + AI 专场 (深圳站) 回顾 & PPT 下载
云原生技术实践营 Serverless + AI 专场 (深圳站) 回顾。
|
6月前
|
Kubernetes Cloud Native jenkins
云原生时代:从Jenkins到Argo Workflows,构建高效CI Pipeline
基于Argo Workflows可以构建大规模、高效率、低成本的CI流水线
|
7月前
|
运维 监控 JavaScript
【阿里云云原生专栏】Serverless架构下的应用部署与运维:阿里云Function Compute深度探索
【5月更文挑战第21天】阿里云Function Compute是事件驱动的无服务器计算服务,让用户无需关注基础设施,专注业务逻辑。本文详述了在FC上部署应用的步骤,包括创建函数、编写代码和部署,并介绍了运维功能:监控告警、日志管理、版本管理和授权管理,提供高效低成本的计算服务。
308 6