目前我有一个用django运行的芹菜批次,如下所示:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, \*kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
我的目的是每当芹菜文件被编辑时,它将重新启动所有任务-删除尚未执行的排队任务(我这样做是因为recursion_function
将在完成后再次运行,这是检查表的每条记录的工作在我的数据库中,因此我不必担心它会在中途停止)。
check_loop
函数将调用具有分页功能的api以返回对象列表,我将通过表中的记录与之进行比较,如果匹配则创建另一个模型的新自定义记录
我的问题是,当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果check_loop
函数停止在api列表中途循环,则它将再次运行循环,而我将创建新的重复记录,而我不想
例:
在check_loop()
的销毁任务期间,它在途中创建了对象(在api列表中,从元素id = 2到id = 5),服务器重启->再次运行,现在check_loop()
从头开始运行(在元素的api列表上) id = 2到id = 5),然后再次从该列表中创建对象(我不希望100%)
这是怎么运行的?我只需要一个确认
编辑:
https://docs.celeryproject.org/zh-CN/4.4.1/faq.html#how-do-i-purge-all-waiting- 任务
我添加了app.control.purge(),因为当我重新启动时,recursion_function在setup_periodic_tasks中再次被调用,而recursion_function.apply_async(countdown = 30)中的上一个recursion_function也被执行,因此它自身也会倍增
问题来源:stackoverflow
是,除非重新启动工作程序,否则工作程序将继续执行当前正在运行的任务。
另外,* _ Celery Way_ 总是希望*任务在并发环境中运行,并具有以下注意事项:
即使您确定在您的环境中只有一个工人手动启动/停止并且这些工人都不适用,这些工人也不适用-应该以这种方式创建任务以允许所有这些事情发生。
一些有用的技术:
即recursion_function
也可以是定期任务
。成为周期性任务将确保它在每个间隔都运行,即使先前的间隔由于某种原因而失败(因此也无法像常规非周期性任务一样再次将自身排队)。使用锁定,可以确保一次只运行一个。
*check_loop()
:
首先,建议将结果保存在数据库中的一个事务中,以确保所有或不保存或修改数据库中的所有内容。
您还可以保存一些标记,该标记指示已保存对象的数量/状态,因此以后的任务可以仅检查此标记,而不是每个对象。
或者以某种方式在创建每个元素之前检查每个元素是否已经存在于数据库中。
回答来源:stackoverflow
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。