大家好~我是米洛
!
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整
教程
,希望大家多多支持。
回顾
上一节我们设计好了测试计划
表并编写了CRUD接口,还没来得及测试,我们就要马不停蹄地编写定时任务相关内容了。今天我们就来编写一个完善的demo,可以定期执行测试计划
的用例,并生成报告。
至于后面的通知,要等到以后完善了。
调整Executor类里面的部分方法
之前我们虽然支持了多条case
异步运行并写入测试报告,但为了支持测试计划
,我们还需要进行一些调整。
- 修改run_multiple方法
我们的核心
还是这个方法,因为里面已经包装好了运行多条case的方法。但我们要做一些改动:
变更参数
首先我们要调整它的参数,因为我们运行case的模式,有很多种,按照之前定义的mode字段:
这个是测试报告表中定义的
调整一下
这样,当我们调用run_mutiple方法的时候,就可以知道case的模式是什么模式了,究竟是隶属于测试计划
呢,还是属于普通执行的case
。我们调整一下,改成图2的数据。
plan_id可以为空,如果有plan_id的话,可以直接跳转到测试计划
相关页面。方便编辑测试计划,也方便排查问题。
ordered这个方法区分同步还是异步,因为我们先前的run_multiple都是默认异步的,但我们测试计划又新增了同步模式,所以需要定义这样一个参数。
- 修改ReportDao
image
也是添加这2个参数,透传进来。方便报告里面表明,这是什么模式的执行,以及是否有测试计划id。
- 调整执行逻辑
image
根据ordered字段判断是否是同步执行
,如果不是则和以前一致,是的话则用for的方式执行,保证用例执行的顺序性。
新增run_test_plan方法
@staticmethod async def run_test_plan(plan_id: int): """ 通过测试计划id执行测试计划 :param plan_id: :return: """ plan = await PityTestPlanDao.query_test_plan(plan_id) if plan is None: Executor.log.info(f"测试计划: [{plan_id}]不存在") return # 设置为running plan.state = 1 await PityTestPlanDao.update_test_plan(plan, plan.update_user) # if plan.disabled: # # 说明测试计划已禁用 # Executor.log.info(f"测试计划: [{plan.name}]未开启") # return env = list(map(int, plan.env.split(","))) case_list = list(map(int, plan.case_list.split(","))) await asyncio.gather( *(Executor.run_multiple(0, int(e), case_list, mode=1, plan_id=plan.id, ordered=plan.ordered) for e in env)) # TODO 后续通知部分 plan.state = 0 await PityTestPlanDao.update_test_plan(plan, plan.update_user)
先通过query方法查到测试计划数据,如果plan是None,说明没有这个测试计划,可能被删掉
了。
否则我们走执行逻辑: 根据plan的env字段取出环境,并转换为[1, 2]这样的数组。
这里可能有点绕,因为我们数据库存的env和case_list都是这样的形式:
1,2,3,4
通过逗号分开,我们split以后,会得到: ["1", "2", "3", "4"]这样的数组,所以我们需要批量转为int。
最后,由于一般环境之间会隔离
,所以我们的ordered根据环境走就可以了,多环境执行我们可以按照异步处理。
所以最后也选用了gather。
编写Scheduler类
我们在utils目录下新建scheduler.py文件:
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from app.utils.executor import Executor class Scheduler(object): scheduler: AsyncIOScheduler = None @staticmethod def init(scheduler): Scheduler.scheduler = scheduler @staticmethod def configure(**kwargs): Scheduler.scheduler.configure(**kwargs) @staticmethod def start(): Scheduler.scheduler.start() @staticmethod def add_test_plan(plan_id, plan_name, cron): return Scheduler.scheduler.add_job(func=Executor.run_test_plan, args=(plan_id,), name=plan_name, id=str(plan_id), trigger=CronTrigger.from_crontab(cron)) @staticmethod def edit(): pass @staticmethod def remove(): pass @staticmethod def list(): job_list = Scheduler.scheduler.get_jobs() return job_list
这里我们对scheduler进行了非常简单的封装,然后定义了add_test_plan方法。
这个方法可以添加Executor.run_test_plan
到定时任务,他接受id,name和cron3个参数,这个数据最终会落到定时任务表
里面去。
在添加测试计划接口调用add_test_plan方法
当测试计划添加成功的时候,我们自动添加定时任务
测试一下
FastApi深度结合了Swagger,所以我们只需要打开: http://localhost:7777/docs便可以找到我们的测试方法:
填入token和测试计划相关信息即可
由于我已经添加过了,并且是每分钟一次,所以我们只需要启动服务,静静等待
即可。
启动服务的时候,提示我这个每分钟执行的任务被miss了,我们再等一分钟
可以看到,环境1和环境2保持每分钟都在写入测试报告到数据库
image
那今天的简单demo就完成到这里,下一节我们完善删改查的内容。