批处理系统:Batch批量计算与云原生Serverless Argo Workflows

简介: 本文对比了Batch批量计算与Serverless Argo Workflows在容器化批处理任务中的应用,分析了两者在任务定义、依赖关系、规模并发、高级编排、可移植性等方面的异同,帮助技术决策者根据自身需求选择合适的平台。

【阅读原文】戳:批处理系统:Batch批量计算与云原生Serverless Argo Workflows

随着自动驾驶、科学计算等领域对技术需求的不断深化,以及Kubernetes生态系统日趋丰富,容器化已成为批处理任务执行的主流模式。面对这一趋势,市场提供了两大类解决方案:一类是以Batch批量计算为代表的云服务商自主研发的封闭式平台,另一类则是围绕开源项目Argo Workflows构建的开放兼容平台。

 

对于企业研发团队而言,明智地选择符合自身业务需求的批处理任务平台至关重要,这直接关系到开发效率、成本控制及未来技术的可扩展性。本文将以一个典型的数据处理应用场景为案例,深入对比Batch批量计算与Argo Workflows的核心特性及适用场景,辅助技术决策者做出更加贴切的选择。

 

 

 

 

一、案例

 

 

 

如下图所示是一个典型的数据处理任务,第一步使用64个Pod进行数据处理,将128个文件合并成成64个文件,第二步使用32个Pod进行第二步的数据处理,将64个文件合并成32个文件,最后一步启动一个Pod进行最终结果的计算并输出到对象存储中。

 

架构图:

 

 

 

 

 

二、通过Batch批量计算实现

 

 

1. 原理

 

 

Batch批量计算通常是一项完全托管服务,可让您以任何规模运行批处理式工作负载。以下流程描述 Batch批量计算如何运行每个作业。

 

1. 创建作业定义,其指定如何运行作业,同时提供权限、内存和CPU要求以及其他配置选项。

 

2. 将作业提交到托管的Batch批量计算作业队列,作业将一直驻留在该队列中,直到被安排在计算环境中进行处理。

 

3. Batch批量计算计算队列中每个作业的CPU、内存和GPU要求,并在计算环境中调度计算资源以处理作业。

 

4. Batch批量计算调度程序将作业放入相应的Batch计算环境进行处理。

 

5. 作业成功或失败退出,将输出结果写入用户定义的存储空间。

 

 

 

2. 创建任务定义

 

 

第一步任务定义,构建process-data、merge-data等任务定义,需要准备镜像,启动参数,所需资源等。Batch批量计算服务通常提供友好的控制台交互,为易于编程和避免频繁的控制台演示,我们直接使用Json的方式定义任务。

 

a. process-data

 

{
  "type": "container",
  "containerProperties": { # 执行命令
    "command": [
      "python",
      "process.py"
    ],
    "image": "python:3.11-amd", # 镜像地址
    "resourceRequirements": [ # 资源需要
      {
        "type": "VCPU", 
        "value": "1.0"
      },
      {
        "type": "MEMORY",
        "value": "2048"
      }
    ],
    "runtimePlatform": {
      "cpuArchitecture": "X86_64", # cpu架构
      "operatingSystemFamily": "LINUX"
    },
    "networkConfiguration": {
      "assignPublicIp": "DISABLED"
    },
    "executionRoleArn": "role::xxxxxxx", # 权限
  },
  "platformCapabilities": [ # 后端资源:服务器或者Serverless Container
    "Serverless Container"
  ],
  "jobDefinitionName": "process-data" # Job名称
}

 

b. merge-data:

 

{
  "type": "container",
  "containerProperties": { # 执行命令,merge.py
    "command": [
      "python",
      "merge.py"
    ],
    "image": "python:3.11-amd", # 镜像
    "resourceRequirements": [ # 资源需要
      {
        "type": "VCPU",
        "value": "1.0"
      },
      {
        "type": "MEMORY",
        "value": "2048"
      }
    ],
    "runtimePlatform": {
      "cpuArchitecture": "X86_64",
      "operatingSystemFamily": "LINUX"
    },
    "networkConfiguration": {
      "assignPublicIp": "ENABLED"
    },
    "executionRoleArn": "role::xxxx", # 权限
    "repositoryCredentials": {},
  },
  "platformCapabilities": [ # 后端资源:服务器或者Serverless Container
    "Serverless Container"
  ],
  "jobDefinitionName": "merge-data" # Job名称
}

 

 

3. 提交任务并构建依赖关系

 

a. 定义并提交process-data-l1 Job

 

Job定义:

 

{
  "jobName": "process-data-l1",
  "jobDefinition": "arn::xxxx:job-definition/process-data:1", # Job使用的定义
  "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列
  "dependsOn": [],
  "arrayProperties": { # 启动任务数
    "size": 64
  },
  "retryStrategy": {},
  "timeout": {},
  "parameters": {},
  "containerOverrides": {
    "resourceRequirements": [],
    "environment": []
  }
}

 

提交获取Job id:

 

# batch submit process-data-l1 | get job-id
job-id: b617f1a3-6eeb-4118-8142-1f855053b347

 

b. 提交process-data-l2 Job

 

该Job依赖process-data-l1 Job。

 

{
  "jobName": "process-data-l2",
  "jobDefinition": "arn::xxxx:job-definition/process-data:2", # Job使用的定义
  "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列
  "dependsOn": [
    {
      "jobId": "b617f1a3-6eeb-4118-8142-1f855053b347" # process-data-l1的job Id
    }
  ],
  "arrayProperties": { # 启动任务数
    "size": 32
  },
  "retryStrategy": {},
  "timeout": {},
  "parameters": {},
  "containerOverrides": {
    "resourceRequirements": [],
    "environment": []
  }
}

 

提交获取Job id:

 

# batch submit process-data-l2 | get job-id
job-id: 6df68b3e-4962-4e4f-a71a-189be25b189c

 

c. 提交merge-data Job

 

该Job依赖process-data-l2 Job。

 

{
  "jobName": "merge-data",
  "jobDefinition": "arn::xxxx:job-definition/merge-data:1", # Job使用的定义
  "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Merge Job使用的队列
  "dependsOn": [
    {
      "jobId": "6df68b3e-4962-4e4f-a71a-189be25b189c" # process-data-l2的job Id
    }
  ],
  "arrayProperties": {},
  "retryStrategy": {},
  "timeout": {},
  "parameters": {},
  "containerOverrides": {
    "resourceRequirements": [],
    "environment": []
  }
}

 

提交Job:

 

batch submit merge-data

 

 

4. 观察任务运行

 

 

所有任务均按序正常运行。

 

 

 

 

三、通过Argo Workflows实现

 

 

 

1. 原理

 

 

Serverless Argo Workflows是阿里云的一项全托管服务,基于开源Argo Workflow项目构建,完全符合开源工作流标准,可以让您在Kubernetes上运行任何规模的批处理负载,采用无服务器模式,使用阿里云弹性容器实例ECI运行工作流,通过优化Kubernetes集群参数,实现大规模工作流的高效弹性调度,同时配合抢占式ECI实例,优化成本。下面流程介绍Serverless Argo如何运行每个作业:

 

1)作业定义,指定如何运行作业,包括每个作业的CPU、内存、镜像、执行命令等。作业依赖关系定义,包括串行、并行循环、重试等。

 

2)提交Workflow到Serverless Argo集群。

 

3)Serverless Argo评估每个作业的资源,并调度弹性实例运行作业。

 

 

 

 

2. 创建任务定义并构建依赖关系

 

 

 

在构建Workflow时,定义Job和其依赖关系在同一个文件中定义。

 

第一步,定义process-data、和merge-data任务Template,描述每个任务的的镜像和启动参数。

 

第二步,定义Step/DAG Template,描述任务的并、串行执行关系。

 

第三步,将Template、依赖关系,存储、输入参数等整合成Workflow。

 

构建Workflow通常有两种方式,一种是通过Yaml方式构建,另一种是通过Python SDK构建,下边分别展示这两种方式的构建方法。

 

a. 通过Yaml方式构建

 

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: process-data- # 数据处理工作流
spec:
  entrypoint: main
  volumes: # 对象存储挂载
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  arguments:
    parameters:
      - name: numbers
        value: "64"
  templates:
    - name: main
      steps:
        - - name: process-data-l1 # 第一级处理,启动64个Pods,Merge 128 个files
            template: process-data
            arguments:
              parameters:
                - name: file_number
                  value: "{{item}}"
                - name: level
                  value: "1"
            withSequence:
              count: "{{workflow.parameters.numbers}}"
        - - name: process-data-l2 # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动
            template: process-data
            arguments:
              parameters:
                - name: file_number
                  value: "{{item}}"
                - name: level
                  value: "2"
            withSequence:
              count: "{{=asInt(workflow.parameters.numbers)/2}}"
        - - name: merge-data # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动
            template: merge-data
            arguments:
              parameters:
                - name: number
                  value: "{{=asInt(workflow.parameters.numbers)/2}}"
    - name: process-data # process-data 任务定义
      inputs:
        parameters:
          - name: file_number
          - name: level
      container:
        image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd
        imagePullPolicy: Always
        command: [python3] # command
        args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# 接收输入的参数,启动多少个pod进行处理
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    - name: merge-data # merge-data 任务定义
      inputs:
        parameters:
          - name: number
      container:
        image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd
        imagePullPolicy: Always
        command: [python3]
        args: ["merge.py", "0", "{{inputs.parameters.number}}"] # 接收输入的参数,处理多少个文件。
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol

 

提交Workflow:

 

argo submit process-data.yaml

 

b. 通过Python SDK构建

 

from hera.workflows import Container, Parameter, Steps, Workflow, Volume
import urllib3
urllib3.disable_warnings()
# 配置访问地址和token
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx" # 填入集群的token
global_config.verify_ssl = ""
with Workflow(
    generate_name="process-data-", # 数据处理工作流
    entrypoint="main",
    volumes=[m.Volume(name="workdir", persistent_volume_claim={"claim_name": "pvc-oss"})], # 对象存储挂载
    arguments=[Parameter(name="numbers", value="64")]
) as w:
    process-data = Container( # process-data 任务定义
        name="process-data",
        inputs=[Parameter(name="file_number"), Parameter(name="level")],
        image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd",
        command=["python3"],
        args=["process.py","{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"],
        volume_mounts=[
            m.VolumeMount(name="workdir", mount_path="/mnt/vol"),
        ],
    )
    merge-data = Container( # merge-data 任务定义
        name="merge-data",
        inputs=[Parameter(name="number")],
        image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd",
        command=["python3"],
        args=["merge.py", "0", "{{inputs.parameters.number}}"],
        volume_mounts=[
            m.VolumeMount(name="workdir", mount_path="/mnt/vol"),
        ],
    )
    with Steps(name="main") as s:
        process-data( 
            name="process-data-l1",
            arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="1")],
        ) # 第一级处理,启动64个Pods,Merge 128 个files
        process-data(
            name="process-data-l2",
            arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="2")],
        ) # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动
        merge-data(
            name="merge-data",
            arguments=[Parameter(name="number", value="{{=asInt(workflow.parameters.numbers)/2}}")],
        ) # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动
# 创建workflow
w.create()

 

提交任务:

 

python process.py

 

 

 

3. 提交并观察任务运行

 

 

通过Yaml或者Python SDK方式构建并提交后,即可在Argo Server控制台查看工作流运行状态。

 

 

工作流按序正常执行成功。

 

 

 

 

四、对比

 

 

 

可以看出Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持。尽管它们的核心目标相似,但在任务定义、使用场景、灵活性以及资源管理等方面存在一些关键差异。以下是一个简要的对比。

 

 

 

 

 

五、总结

 

 

 

Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持,选择Argo Workflows还是Batch批量计算主要取决于您的的技术栈、对云供应商的依赖程度、工作流的复杂性和对控制权的需求。如果您的团队熟悉Kubernetes并且需要高度定制化的工作流,Argo Workflows可能是更好的选择。相反,如果你在云厂商生态系统内运作,寻求简单易用且能与云厂商其他服务紧密集成的解决方案,Batch批量计算可能更适合您。

 

ACK One Serverles Argo团队是国内最早使用和维护Argo Workflows的团队之一,在Argo Workflows使用方面积累众多的最佳实践,如果您需要使用Argo Workflows调度大规模工作流,欢迎加入钉钉群号一同交流:35688562

 

 

参考:

 

全托管Serverless Argo工作流:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12

 

Argo Workflows:

https://github.com/argoproj/argo-workflows

 

创建工作流集群:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster

 

PythonSDK助力大规模Argo Workflows构建:

https://mp.weixin.qq.com/s/_2Glhuy6YJEM4ZRMDwE7JA




我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。

欢迎关注 “阿里云基础设施”同名微信微博知乎

获取关于我们的更多信息~

相关文章
|
2天前
|
编解码 Java 程序员
写代码还有专业的编程显示器?
写代码已经十个年头了, 一直都是习惯直接用一台Mac电脑写代码 偶尔接一个显示器, 但是可能因为公司配的显示器不怎么样, 还要接转接头 搞得桌面杂乱无章,分辨率也低,感觉屏幕还是Mac自带的看着舒服
|
4天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1540 5
|
1月前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
7天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
585 22
|
4天前
|
存储 SQL 关系型数据库
彻底搞懂InnoDB的MVCC多版本并发控制
本文详细介绍了InnoDB存储引擎中的两种并发控制方法:MVCC(多版本并发控制)和LBCC(基于锁的并发控制)。MVCC通过记录版本信息和使用快照读取机制,实现了高并发下的读写操作,而LBCC则通过加锁机制控制并发访问。文章深入探讨了MVCC的工作原理,包括插入、删除、修改流程及查询过程中的快照读取机制。通过多个案例演示了不同隔离级别下MVCC的具体表现,并解释了事务ID的分配和管理方式。最后,对比了四种隔离级别的性能特点,帮助读者理解如何根据具体需求选择合适的隔离级别以优化数据库性能。
201 3
|
10天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
11天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
581 5
|
23天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
7天前
|
XML 安全 Java
【Maven】依赖管理,Maven仓库,Maven核心功能
【Maven】依赖管理,Maven仓库,Maven核心功能
235 3
|
9天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
327 2