本文介绍了任务状态轮询和 Serverless 工作流实现的具体步骤。
简介 在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本示例通过一个假设场景:用户调用函数计算提交了一个多媒体处理任务,该任务耗时从 1 分钟到几小时不等,任务执行状态可以通过 API 查询,介绍如何使用 Serverless 工作流实现一个通用可靠的任务轮询工作流。
Serverless 工作流实现 下面的教程会将两个 FC 函数编排成一个任务轮询工作流,该示例需要以下 3 个步骤:
创建 FC 函数 创建 Serverless 工作流流程 开始执行并查看结果 步骤1:创建 FC 函数 首先创建一个名为 fnf-demo 的 FC 服务,并在该服务下创建两个 Python2.7 的函数,详细步骤请参见 FC 文档。 StartJob 函数:模拟通过调用 API 开始一个长时间的任务,返回一个任务 ID。 import logging import uuid
def handler(event, context): logger = logging.getLogger() id = uuid.uuid4() logger.info('Started job with ID %s' % id) return {"job_id": str(id)}
GetJobStatus 函数: 模拟通过调用 API 获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中 delay 的值,返回不同的状态:“success” 或 “running”。 import logging import uuid import time import json
start_time = int(time.time())
def handler(event, context): evt = json.loads(event) logger = logging.getLogger() job_id = evt["job_id"] logger.info('Started job with ID %s' % job_id)
now = int(time.time()) status = "running"
delay = 60 if "delay" in evt: delay = evt["delay"]
if now - start_time > delay: status = "success"
try_count = 0 if "try_count" in evt: try_count = evt["try_count"]
try_count = try_count + 1 logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count)) return {"job_id": job_id, "job_status":status, "try_count":try_count}
步骤 2:创建 Serverless 工作流流程 该流程的主要逻辑描述如下:
StartJob 步骤: 调用 StartJob 函数开始一个任务。 Wait10s 步骤: 等待 10 秒。 GetJobStatus 步骤: 调用 GetJobStatus 函数查询当前任务状态。 CheckJobComplete 步骤: 检查 GetJobStatus 函数返回的结果: 如果返回 "success" 整个流程执行成功。 如果轮询尝试次数大于 3 次,认为任务执行失败,流程执行失败。 如果返回 "running" 则跳回到 Wait10s 步骤,继续执行。 version: v1 type: flow steps: - type: task name: StartJob resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/StartJob - type: pass name: Init outputMappings: - target: try_count source: 0 - type: wait name: Wait10s duration: 10 - type: task name: GetJobStatus resourceArn: acs:fc:cn-hangzhou:{accountID}:services/fnf-demo/functions/GetJobStatus inputMappings: - target: job_id source: $local.job_id - target: delay source: $input.delay - target: try_count source: $local.try_count - type: choice name: CheckJobComplete inputMappings: - target: status source: $local.job_status - target: try_count source: $local.try_count choices: - condition: $.status == "success" goto: JobSucceeded - condition: $.try_count > 3 goto: JobFailed - condition: $.status == "running" goto: Wait10s - type: succeed name: JobSucceeded - type: fail name: JobFailed
步骤 3:开始执行并查看结果 在控制台创建好的流程中单击 新执行 并提供以下 JSON 对象作为输入,其中 delay 字段的值模拟任务完成需要的时间,这里预期任务在开始 20秒 后, GetJobStatus 函数返回 “success”,在此之前均返回 “running”,您可以调整 delay 的值观察不同的执行结果。
{ "delay": 20 }
下图展示的是轮询从开始到结束的流程执行可视化。Screen Shot 2019-06-26 at 12.30.01 PM 下图展示的是任务需要 20 秒完成,可以看到流程执行历史中第一次 GetJobStatus 返回 “running” 因此 CheckJobComplete 的后续步骤眺回到 Wait10s 进行等待和下一次查询,第二次查询返回 “success”,流程执行结束。Screen Shot 2019-06-26 at 12.39.26 PM
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。