#coding:utf-8importrequests,json,time,queueimportgeventfromlocustimportHttpUser, TaskSet, task# 定义每个用户的任务集合classperformance(TaskSet): # 任务A--GET示例1) (defManagement_entry(self): # 管理人员录入sstimestr=time.strftime("%Y%m%d%H%M%S") sstime=time.strftime("%Y-%m-%d %H:%M:%S") #print("获取:",self.parent.queue_data.get())for_num="guanliry"+str(self.parent.Management_entry_data.get()) url="接口地址"data= { "appid": "10086", "timestamp": "1627378948605", "sign": "qwer", "persons": [{ "personName": for_num, "effTime": sstime, "phone": "15110102020", "email": "7777777@qq.com", "picBase64": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaxxxxxxxxxxcasdcascassacasdcawdsadasdsaccc", "personCode": for_num }] } withself.client.post(url=url, json=data, verify=False, catch_response=True) asresponse: ifresponse.status_code!=200: response.failure('Failed!') print("管理人员录入失败", response.json()) else: response.success() print("管理人员录入", response.json()) 1) (defmmmmmmm(self): # 隔离人员录入sstimestr=time.strftime("%Y%m%d%H%M%S") sstime=time.strftime("%Y-%m-%d %H:%M:%S") #print("获取:",self.parent.queue_data.get())for_num="gelirenyuan"+str(self.parent.Household_entry_data.get()) url="接口地址"data= { "appid": "10086", "timestamp": "1627378948605", "sign": "qwer", "persons": [{ "personName": for_num, "effTime": sstime, "phone": "15110102020", "email": "7777777@qq.com", "picBase64": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaxxxxxxxxxxcasdcascassacasdcawdsadasdsaccc", "personCode": for_num, "grantType": 0 }] } print("测试",data) withself.client.post(url=url, json=data, verify=False, catch_response=True) asresponse: ifresponse.status_code!=200: response.failure('Failed!') print("隔离录入失败", response.json()) else: response.success() print("隔离人员录入", response.json()) # 继承Locust类# 生成的每一个虚拟的HTTP用户# 用来发送请求到进行负载测试的系统classLogin(HttpUser): tasks= [performance] # 权重weight=1# 执行事务之间用户最小等待时间min_wait=1000# 执行事务之间用户最大等待时间max_wait=3000# 超时时间stop_timeout=5host="ip:端口"#这里是填写需要测试的ip地址Management_entry_data=queue.Queue()#queue函数参数化测试数据Household_entry_data=queue.Queue()queue函数参数化测试数据foriinrange(100000): user=inum=i#print(user)Management_entry_data.put_nowait(user) Household_entry_data.put_nowait(num) if__name__=='__main__': importosos.system("locust -f locust_test.py --web-host=0.0.0.0")
主要是搜索全网现在不怎么能看到locust的博客了,所以写一份最新的供大家探讨研究,咱们写代码的测试还是要有拿得出手的性能测试脚本,如果写的不好还望大家多多包涵
@task()装饰器是用来控制测试接口的如需要测试多个task则使用多个task即可
打开对应地址生成页面,第一行是用户总数,第二行是生成率,第三行是你代码中填写的测试地址。
点击start开始运行
下方图片显示运行中的数据,这就跟jmeter一致
测试报告输出:在最后一栏里可以根据需求导出测试报告,也可以在线查看只要能访问你的端口即可
如果单台电脑测试无法满足大数量人员,可以采用分布式压测。详情请看我的另一个博客