【阅读原文】戳:批处理系统:Batch批量计算与云原生Serverless Argo Workflows
随着自动驾驶、科学计算等领域对技术需求的不断深化,以及Kubernetes生态系统日趋丰富,容器化已成为批处理任务执行的主流模式。面对这一趋势,市场提供了两大类解决方案:一类是以Batch批量计算为代表的云服务商自主研发的封闭式平台,另一类则是围绕开源项目Argo Workflows构建的开放兼容平台。
对于企业研发团队而言,明智地选择符合自身业务需求的批处理任务平台至关重要,这直接关系到开发效率、成本控制及未来技术的可扩展性。本文将以一个典型的数据处理应用场景为案例,深入对比Batch批量计算与Argo Workflows的核心特性及适用场景,辅助技术决策者做出更加贴切的选择。
一、案例
如下图所示是一个典型的数据处理任务,第一步使用64个Pod进行数据处理,将128个文件合并成成64个文件,第二步使用32个Pod进行第二步的数据处理,将64个文件合并成32个文件,最后一步启动一个Pod进行最终结果的计算并输出到对象存储中。
架构图:
二、通过Batch批量计算实现
1. 原理
Batch批量计算通常是一项完全托管服务,可让您以任何规模运行批处理式工作负载。以下流程描述 Batch批量计算如何运行每个作业。
1. 创建作业定义,其指定如何运行作业,同时提供权限、内存和CPU要求以及其他配置选项。
2. 将作业提交到托管的Batch批量计算作业队列,作业将一直驻留在该队列中,直到被安排在计算环境中进行处理。
3. Batch批量计算计算队列中每个作业的CPU、内存和GPU要求,并在计算环境中调度计算资源以处理作业。
4. Batch批量计算调度程序将作业放入相应的Batch计算环境进行处理。
5. 作业成功或失败退出,将输出结果写入用户定义的存储空间。
2. 创建任务定义
第一步任务定义,构建process-data、merge-data等任务定义,需要准备镜像,启动参数,所需资源等。Batch批量计算服务通常提供友好的控制台交互,为易于编程和避免频繁的控制台演示,我们直接使用Json的方式定义任务。
a. process-data
{ "type": "container", "containerProperties": { # 执行命令 "command": [ "python", "process.py" ], "image": "python:3.11-amd", # 镜像地址 "resourceRequirements": [ # 资源需要 { "type": "VCPU", "value": "1.0" }, { "type": "MEMORY", "value": "2048" } ], "runtimePlatform": { "cpuArchitecture": "X86_64", # cpu架构 "operatingSystemFamily": "LINUX" }, "networkConfiguration": { "assignPublicIp": "DISABLED" }, "executionRoleArn": "role::xxxxxxx", # 权限 }, "platformCapabilities": [ # 后端资源:服务器或者Serverless Container "Serverless Container" ], "jobDefinitionName": "process-data" # Job名称 }
b. merge-data:
{ "type": "container", "containerProperties": { # 执行命令,merge.py "command": [ "python", "merge.py" ], "image": "python:3.11-amd", # 镜像 "resourceRequirements": [ # 资源需要 { "type": "VCPU", "value": "1.0" }, { "type": "MEMORY", "value": "2048" } ], "runtimePlatform": { "cpuArchitecture": "X86_64", "operatingSystemFamily": "LINUX" }, "networkConfiguration": { "assignPublicIp": "ENABLED" }, "executionRoleArn": "role::xxxx", # 权限 "repositoryCredentials": {}, }, "platformCapabilities": [ # 后端资源:服务器或者Serverless Container "Serverless Container" ], "jobDefinitionName": "merge-data" # Job名称 }
3. 提交任务并构建依赖关系
a. 定义并提交process-data-l1 Job
Job定义:
{ "jobName": "process-data-l1", "jobDefinition": "arn::xxxx:job-definition/process-data:1", # Job使用的定义 "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列 "dependsOn": [], "arrayProperties": { # 启动任务数 "size": 64 }, "retryStrategy": {}, "timeout": {}, "parameters": {}, "containerOverrides": { "resourceRequirements": [], "environment": [] } }
提交获取Job id:
# batch submit process-data-l1 | get job-id job-id: b617f1a3-6eeb-4118-8142-1f855053b347
b. 提交process-data-l2 Job
该Job依赖process-data-l1 Job。
{ "jobName": "process-data-l2", "jobDefinition": "arn::xxxx:job-definition/process-data:2", # Job使用的定义 "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列 "dependsOn": [ { "jobId": "b617f1a3-6eeb-4118-8142-1f855053b347" # process-data-l1的job Id } ], "arrayProperties": { # 启动任务数 "size": 32 }, "retryStrategy": {}, "timeout": {}, "parameters": {}, "containerOverrides": { "resourceRequirements": [], "environment": [] } }
提交获取Job id:
# batch submit process-data-l2 | get job-id job-id: 6df68b3e-4962-4e4f-a71a-189be25b189c
c. 提交merge-data Job
该Job依赖process-data-l2 Job。
{ "jobName": "merge-data", "jobDefinition": "arn::xxxx:job-definition/merge-data:1", # Job使用的定义 "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Merge Job使用的队列 "dependsOn": [ { "jobId": "6df68b3e-4962-4e4f-a71a-189be25b189c" # process-data-l2的job Id } ], "arrayProperties": {}, "retryStrategy": {}, "timeout": {}, "parameters": {}, "containerOverrides": { "resourceRequirements": [], "environment": [] } }
提交Job:
batch submit merge-data
4. 观察任务运行
所有任务均按序正常运行。
三、通过Argo Workflows实现
1. 原理
Serverless Argo Workflows是阿里云的一项全托管服务,基于开源Argo Workflow项目构建,完全符合开源工作流标准,可以让您在Kubernetes上运行任何规模的批处理负载,采用无服务器模式,使用阿里云弹性容器实例ECI运行工作流,通过优化Kubernetes集群参数,实现大规模工作流的高效弹性调度,同时配合抢占式ECI实例,优化成本。下面流程介绍Serverless Argo如何运行每个作业:
1)作业定义,指定如何运行作业,包括每个作业的CPU、内存、镜像、执行命令等。作业依赖关系定义,包括串行、并行循环、重试等。
2)提交Workflow到Serverless Argo集群。
3)Serverless Argo评估每个作业的资源,并调度弹性实例运行作业。
2. 创建任务定义并构建依赖关系
在构建Workflow时,定义Job和其依赖关系在同一个文件中定义。
第一步,定义process-data、和merge-data任务Template,描述每个任务的的镜像和启动参数。
第二步,定义Step/DAG Template,描述任务的并、串行执行关系。
第三步,将Template、依赖关系,存储、输入参数等整合成Workflow。
构建Workflow通常有两种方式,一种是通过Yaml方式构建,另一种是通过Python SDK构建,下边分别展示这两种方式的构建方法。
a. 通过Yaml方式构建
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: process-data- # 数据处理工作流 spec: entrypoint: main volumes: # 对象存储挂载 - name: workdir persistentVolumeClaim: claimName: pvc-oss arguments: parameters: - name: numbers value: "64" templates: - name: main steps: - - name: process-data-l1 # 第一级处理,启动64个Pods,Merge 128 个files template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "1" withSequence: count: "{{workflow.parameters.numbers}}" - - name: process-data-l2 # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动 template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "2" withSequence: count: "{{=asInt(workflow.parameters.numbers)/2}}" - - name: merge-data # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动 template: merge-data arguments: parameters: - name: number value: "{{=asInt(workflow.parameters.numbers)/2}}" - name: process-data # process-data 任务定义 inputs: parameters: - name: file_number - name: level container: image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd imagePullPolicy: Always command: [python3] # command args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# 接收输入的参数,启动多少个pod进行处理 volumeMounts: - name: workdir mountPath: /mnt/vol - name: merge-data # merge-data 任务定义 inputs: parameters: - name: number container: image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd imagePullPolicy: Always command: [python3] args: ["merge.py", "0", "{{inputs.parameters.number}}"] # 接收输入的参数,处理多少个文件。 volumeMounts: - name: workdir mountPath: /mnt/vol
提交Workflow:
argo submit process-data.yaml
b. 通过Python SDK构建
from hera.workflows import Container, Parameter, Steps, Workflow, Volume import urllib3 urllib3.disable_warnings() # 配置访问地址和token global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746" global_config.token = "abcdefgxxxxxx" # 填入集群的token global_config.verify_ssl = "" with Workflow( generate_name="process-data-", # 数据处理工作流 entrypoint="main", volumes=[m.Volume(name="workdir", persistent_volume_claim={"claim_name": "pvc-oss"})], # 对象存储挂载 arguments=[Parameter(name="numbers", value="64")] ) as w: process-data = Container( # process-data 任务定义 name="process-data", inputs=[Parameter(name="file_number"), Parameter(name="level")], image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd", command=["python3"], args=["process.py","{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"], volume_mounts=[ m.VolumeMount(name="workdir", mount_path="/mnt/vol"), ], ) merge-data = Container( # merge-data 任务定义 name="merge-data", inputs=[Parameter(name="number")], image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd", command=["python3"], args=["merge.py", "0", "{{inputs.parameters.number}}"], volume_mounts=[ m.VolumeMount(name="workdir", mount_path="/mnt/vol"), ], ) with Steps(name="main") as s: process-data( name="process-data-l1", arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="1")], ) # 第一级处理,启动64个Pods,Merge 128 个files process-data( name="process-data-l2", arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="2")], ) # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动 merge-data( name="merge-data", arguments=[Parameter(name="number", value="{{=asInt(workflow.parameters.numbers)/2}}")], ) # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动 # 创建workflow w.create()
提交任务:
python process.py
3. 提交并观察任务运行
通过Yaml或者Python SDK方式构建并提交后,即可在Argo Server控制台查看工作流运行状态。
工作流按序正常执行成功。
四、对比
可以看出Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持。尽管它们的核心目标相似,但在任务定义、使用场景、灵活性以及资源管理等方面存在一些关键差异。以下是一个简要的对比。
五、总结
Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持,选择Argo Workflows还是Batch批量计算主要取决于您的的技术栈、对云供应商的依赖程度、工作流的复杂性和对控制权的需求。如果您的团队熟悉Kubernetes并且需要高度定制化的工作流,Argo Workflows可能是更好的选择。相反,如果你在云厂商生态系统内运作,寻求简单易用且能与云厂商其他服务紧密集成的解决方案,Batch批量计算可能更适合您。
ACK One Serverles Argo团队是国内最早使用和维护Argo Workflows的团队之一,在Argo Workflows使用方面积累众多的最佳实践,如果您需要使用Argo Workflows调度大规模工作流,欢迎加入钉钉群号一同交流:35688562。
参考:
全托管Serverless Argo工作流:
Argo Workflows:
https://github.com/argoproj/argo-workflows
创建工作流集群:
PythonSDK助力大规模Argo Workflows构建:
https://mp.weixin.qq.com/s/_2Glhuy6YJEM4ZRMDwE7JA
我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。
获取关于我们的更多信息~