Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

简介: Celery Beat 是 Celery 的定时任务调度组件,支持固定间隔、CRON 表达式等规则,可实现任务的周期性执行。本文详解了其原理、配置方式、与 Django 的集成步骤及实战代码示例,适用于构建具备定时任务管理功能的后台系统。

一、Celery定时任务

Celery Beat 介绍

Celery Beat 是 Celery 框架的一个内置组件,专门用于定时任务调度。它可以按照预设的时间规则(如固定间隔、特定时间点、CRON 表达式等)自动触发 Celery 任务,广泛应用于需要周期性执行的场景(如定时数据备份、日志清理、报表生成等)。

工作原理

  • Beat 进程:独立运行的调度进程,负责解析定时规则并生成任务消息。
  • 任务发送:当到达预设时间,Beat 进程将任务发送到 Celery 的消息队列(如 Redis)。
  • 任务执行:Celery Worker 进程从队列中获取任务并执行。

简单来说,Celery Beat 是 “定时发令枪”,而 Worker 是 “执行者”。

Celery Beat 配置持久化

默认情况下,任务配置存储在内存中,重启后会丢失。需要通过后端存储(如数据库)实现持久化,确保任务配置不丢失。

项目名称 说明
django-celery-beat 通过数据库实现任务配置持久化
django-celery-results 通过数据库实现任务结果持久化
django-celery 只支持Celery 3.X版本(不推荐)

二、Celery Beat与Django集成

安装配置

安装

pip install django-celery-beat django-celery-results

在Django项目settings.py中添加

INSTALLED_APPS = [
    ...
    'django_celery_beat',
    'django_celery_results'
]

### Celery 配置
CELERY_BROKER_URL = f"{REDIS_URL}/{REDIS_DB}"  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = "django-db"  # 使用数据库存储结果
CELERY_BEAT_SCHEDULER = (
    "django_celery_beat.schedulers:DatabaseScheduler"  # 使用数据库保存定时任务
)
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
CELERY_RESULT_EXTENDED = True  # 启用后才会记录 task_name、date_started 等字段
CELERY_TASK_TRACK_STARTED = True  # 记录任务开始时间

定义 Celery 实例:创建文件mysite\mysite\celery.py

"""定义和配置 Celery 实例"""

import os
from celery import Celery
from django.conf import settings


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
# 创建 Celery 实例
app = Celery("mysite")
# 加载配置文件中的 Celery 配置
app.config_from_object("django.conf:settings", namespace="CELERY")
# 自动发现并加载任务
app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True)

配置 Django 启动时会加载应用:修改文件mysite\mysite\__init__.py

"""Django 启动时加载Celery实例"""

from .celery import app as celery_app

__all__ = ("celery_app",)

数据库迁移

执行数据库迁移,创建相关数据库表。其中:

  • django_celery_beat_periodictask:用于存储任务名称、任务路径、任务参数等元数据。
  • django_celery_beat_crontabschedule:用于存储CRON表达式。
# 在Django项目根目录(包括manage.py的目录)执行
python manage.py migrate django_celery_beat
python manage.py migrate django_celery_results

image-20250416091830705.png

三、Celery Beat项目实战

定义Celery任务

发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构

- myapp_system/
    - tasks.py
    - models.py
- myapp_infra/
    - tasks.py
    - models.py

定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务

"""定义 Celery 任务"""

from time import sleep
from celery import shared_task
from django.utils import timezone


@shared_task
def send_daily_report():
    # 示例:发送日报
    print(f"开始发送日报,现在时间:{timezone.now()}")
    sleep(30)
    print("发送成功")
    return "发送成功"


@shared_task
def cleanup_expired_data():
    # 示例:清理过期数据
    print("清理过期数据")
    sleep(15)
    print("清理完成")
    return "清理完成"

通过视图集动态管理定时任务

下面是通过 DRF 视图集,动态管理定时任务示例,实现对定时任务的增删改查、手动触发、开启暂停等操作

  • 定义视图:myapp_infra\job\views.py
import json
from celery import current_app
from django_celery_beat.models import PeriodicTask
from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404

from mars_framework.viewsets.base import CustomModelViewSetNoSimple
from mars_framework.permissions.base import HasPermission
from mars_framework.response.base import CommonResponse
from .serializers import JobSaveSerializer, JobSerializer
from .filters import JobFilter
from .services import infra_job_service


@extend_schema(tags=["管理后台-infra-定时任务"])
class JobViewSet(CustomModelViewSetNoSimple):
    queryset = PeriodicTask.objects.all()
    serializer_class = JobSerializer
    filterset_class = JobFilter
    action_serializers = {
   
        "create": JobSaveSerializer,
        "update": JobSaveSerializer,
    }
    action_permissions = {
   
        "create": [HasPermission("infra:job:create")],
        "destroy": [HasPermission("infra:job:delete")],  # TODO 是否需要删除对应shedule
        "update": [HasPermission("infra:job:update")],
        "retrieve": [HasPermission("infra:job:query")],
        "list": [HasPermission("infra:job:query")],
        "export": [HasPermission("infra:job:export")],
        "update_status": [HasPermission("infra:job:update")],
        "trigger": [HasPermission("infra:job:trigger")],
        "sync": [HasPermission("infra:job:create")],
        "get_next_times": [HasPermission("infra:job:query")],
    }
    action_querysets = {
   
        # 排除name=celery.backend_cleanup
        "list": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),
        "export": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),
    }

    export_name = "定时任务列表"
    export_fields_labels = {
   
        "id": "任务编号",
        "name": "任务名称",
        "task": "处理器名字",
        "kwargs": "处理器参数",
        "cron_expression": "CRON表达式",
        "status": "任务状态",
    }
    export_data_map = {
   
        "status": {
   1: "开启", 2: "暂停"},
    }

    @extend_schema(summary="新增")
    def create(self, request, *args, **kwargs):
        """创建定时任务"""
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        # CRON表达式
        cron_expression = serializer.validated_data.pop("cron_expression")
        schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)
        # 创建任务
        task_data = {
   
            "name": serializer.validated_data.get("name"),
            "task": serializer.validated_data.get("task"),
            "kwargs": serializer.validated_data.get("kwargs"),
            "crontab": schedule,
            "enabled": False,  # 默认禁用
        }
        PeriodicTask.objects.create(**task_data)
        return CommonResponse.success()

    @extend_schema(summary="更新")
    def update(self, request, *args, **kwargs):
        """更新定时任务"""
        instance = self.get_object()
        serializer = self.get_serializer(instance, data=request.data)
        serializer.is_valid(raise_exception=True)
        # 任务CRON表达式
        cron_expression = serializer.validated_data.pop("cron_expression")
        schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)
        # 更新任务
        task_data = {
   
            "name": serializer.validated_data.get("name"),
            "task": serializer.validated_data.get("task"),
            "kwargs": serializer.validated_data.get("kwargs"),
            "crontab": schedule,
        }
        PeriodicTask.objects.filter(id=instance.id).update(**task_data)
        return CommonResponse.success()

    @extend_schema(summary="触发定时任务")
    @action(
        methods=["put"],
        detail=True,
        url_path="trigger",
    )
    def trigger(self, request, *args, **kwargs):
        """触发定时任务"""
        instance = self.get_object()
        # 获取任务函数并手动触发
        task_name = instance.task  # 任务路径如 "myapp_infra.tasks.send_daily_report"
        task_kwargs = json.loads(instance.kwargs or "{}")

        try:
            # 动态加载任务函数
            task = current_app.tasks[task_name]
            task.delay(**task_kwargs)
            return CommonResponse.success()
        except KeyError:
            return CommonResponse.error(
                code=121101,
                msg=f"找不到 {task_name}  任务,或该任务未注册",
            )
        except Exception as e:
            return CommonResponse.error(
                code=121102,
                msg=f"触发任务 {task_name} 失败,错误信息:{e}",
            )

    @extend_schema(summary="更新定时任务状态")
    @action(
        methods=["put"],
        detail=True,
        url_path="status",
    )
    def update_status(self, request, *args, **kwargs):
        """更新定时任务状态"""
        status = request.query_params.get("status")
        if status is None or status not in ["1", "2"]:  # 1:开启 2:暂停
            return CommonResponse.error(code=121104, msg="任务状态值错误")
        instance = get_object_or_404(PeriodicTask, pk=kwargs.get("pk"))
        instance.enabled = status == "1"
        instance.save()
        return CommonResponse.success()

    @extend_schema(summary="获取定时任务的下 n 次执行时间")
    @action(
        methods=["get"],
        detail=True,
        url_path="next-times",
    )
    def get_next_times(self, request, *args, **kwargs):
        """获取定时任务的下 n 次执行时间"""
        count = int(request.query_params.get("count", 5))
        task = self.get_object()
        # 生成CORN 表达式
        crontab = task.crontab
        cron_expression = f"{crontab.minute} {crontab.hour} {crontab.day_of_month} {crontab.month_of_year} {crontab.day_of_week}"
        try:
            data = infra_job_service.get_next_times(cron_expression, count)
        except Exception as e:
            return CommonResponse.error(code=121102, msg=str(e))
        return CommonResponse.success(data=data)
  • 配置路由:myapp_infra\urls.py
from .job.views import JobViewSet

# 管理后台 - 定时任务
router.register(r"job", JobViewSet, basename="job")

启动Celery Beat

  • 启动Celery Worker和Celery Beat调度器
# 在项目目录(与manage.py同级),启动Celery Worker
celery -A mysite worker -l info -P solo

# 新建另一个终端窗口,在项目目录(与manage.py同级),启动Celery Beat
celery -A mysite beat -l info -S django_celery_beat.schedulers:DatabaseScheduler

# 新建另一个终端窗口,在项目目录(与manage.py同级),启动Django
python manage.py runserver

四、实战效果

通过上面定义的DRF视图集API,配合 Vue3 前端界面实现效果

  • 定时任务的增、删、开启暂停功能

image-20250715095809854.png

  • 定时任务的修改功能
    • 处理器名字:填写定义任务的全路径名称
    • CRON表达式:填写标准的CRON表达式

image-20250715095918553.png

  • 定时任务执行结果查询功能:能看到定时任务的执行时间、状态、返回结果等信息

image-20250715100221840.png

点击查看完整代码


您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

相关文章
|
5月前
|
数据挖掘 数据库 Python
Django实战:基于Django和openpyxl实现Excel导入导出功能
`openpyxl` 是用于处理 Excel 文件的 Python 库。本文详解其在 Django 项目中的实战应用,涵盖 Excel 文件的生成、下载、上传与解析。
181 0
Django实战:基于Django和openpyxl实现Excel导入导出功能
|
5月前
|
监控 NoSQL 数据可视化
Django+Celery 进阶:Flower可视化监控与排错
本文介绍了Celery命令行工具与图形监控工具的使用,涵盖查看Worker状态、任务信息及集成至Django项目的方法,同时提供Redis监控与常见问题排错方案。
482 1
|
5月前
|
监控 NoSQL 网络协议
Django 实时通信实战:WebSocket 与 ASGI 全解析(上)
WebSocket 是一种全双工通信协议,支持实时数据传输,适用于聊天、协作、监控等场景。ASGI 是异步 Web 标准,配合 Uvicorn 服务器和 Django Channels,可实现 Django 的 WebSocket 功能,提升实时应用性能。
288 0
|
5月前
|
自然语言处理 开发者 Python
Django 实战:I18N 国际化与本地化配置、翻译与切换一步到位
Django国际化与本地化指南,涵盖i18n和l10n的定义、配置、视图与模型中的翻译使用、消息文件生成与编译,以及多语言登录实战。助你打造多语言支持的Web应用。
276 0
|
5月前
|
人工智能 开发工具 数据库
Django实战:Python代码规范指南
PEP 8 是 Python 官方代码风格指南,提升代码可读性与团队协作效率。本文详解命名规范、注释写法、常用工具(如 Black、flake8)、编程实践与代码优化技巧,助力写出规范、易维护的 Python 代码。
320 7
|
4月前
|
缓存 监控 中间件
Django中间件自定义开发指南:从原理到实战的深度解析
Django中间件是Web应用的“交通警察”,在请求与响应过程中进行全局处理,适用于身份验证、日志记录、性能监控等功能。本文详解中间件的工作原理、开发步骤及实战案例,帮助开发者掌握自定义中间件的构建方法,提升Django应用的可维护性与扩展性。
284 0
|
5月前
|
存储 前端开发 应用服务中间件
Django 实战:静态文件与媒体文件从开发配置到生产部署
Django项目中,静态文件(Static Files)和媒体文件(Media Files)是两类不同用途的文件。本文详细介绍了它们的区别、配置方法以及在开发与生产环境中的处理方式,并结合用户头像上传功能进行实战演示,最后讲解了如何通过Nginx或OpenResty部署静态与媒体文件服务。
276 1
|
5月前
|
消息中间件 存储 NoSQL
Django 实战:Celery 异步任务从环境搭建到调用全掌握
本文详解 Celery 核心概念、架构组成及工作流程,并实战演示如何在 Django 项目中集成 Celery,实现异步任务调用与事务提交控制,助你掌握从配置到部署的全流程开发技巧。
532 3
|
4月前
|
缓存 NoSQL 数据库
Django缓存机制详解:从配置到实战应用
本文全面解析Django缓存技术,涵盖配置方法与六大缓存后端,结合实战场景演示四种典型应用方式,帮助开发者提升Web应用性能,应对高并发挑战。
142 0
|
4月前
|
存储 缓存 数据库
Django模型开发全解析:字段、元数据与继承的实战指南
Django模型是业务逻辑与数据库的核心桥梁,本文详解模型开发三大核心:字段类型选择、元数据配置与继承模式应用,涵盖实战技巧与常见问题解决方案,助你构建高效可维护的数据模型。
162 0