2

Django 如何使用 Celery 完成异步任务或定时任务 - Désiré

 1 year ago
source link: https://www.cnblogs.com/desireroot7/p/17352598.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

以前版本的 Celery 需要一个单独的库(django-celery)才能与 Django 一起工作, 但从 Celery 3.1 开始,情况便不再如此,我们可以直接通过 Celery 库来完成在 Django 中的任务。

安装 Redis 服务端

以 Docker 安装为例,安装一个密码为 mypassword 的 Redis 服务端

docker run -itd --name redis -p 127.0.0.1:6379:6379 redis:alpine redis-server --requirepass mypassword

在 Python 中安装 Celery 和 Redis

pip install celery redis

在 Django 项目中添加 Celery 配置

在 Django 项目中创建一个 celery.py 文件,并配置 Celery 应用程序。这个文件应该与 settings.py 文件位于同一目录下:

import os

from celery import Celery

# 设置 Django 的默认环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 使用 Django 的 settings.py 文件配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 从所有已安装的应用中自动发现并加载任务模块
app.autodiscover_tasks()

然后在 settings.py 文件中添加配置:

# 使用 Redis 作为消息代理(broker)来传递任务消息,连接地址为 localhost:6379/0,并提供密码 mypassword 进行身份验证。
CELERY_BROKER_URL = 'redis://:mypassword@localhost:6379/0'

# 使用 Redis 作为结果存储后端,连接地址同上,使用相同的密码进行身份验证。
CELERY_RESULT_BACKEND = 'redis://:mypassword@localhost:6379/0'

# 指定发送到代理(broker)的任务消息序列化格式为 JSON 格式。
CELERY_TASK_SERIALIZER = 'json'

# 指定从结果后端获取的结果序列化格式为 JSON 格式。
CELERY_RESULT_SERIALIZER = 'json'

# 指定支持接收的内容类型为 JSON 格式。
CELERY_ACCEPT_CONTENT = ['json']

# 将时区设置为亚洲/上海时区。
CELERY_TIMEZONE = 'Asia/Shanghai'

# 启用 UTC 时间。
CELERY_ENABLE_UTC = True

在 Django 应用程序中创建一个 tasks.py 文件,并编写要运行的任务函数。例如,此处我们将编写一个名为 send_email() 的任务,来定期发送电子邮件:

from django.core.mail import send_mail
from celery import shared_task

@shared_task
def send_email():
    # 发送电子邮件的代码
    pass

如果想要实现异步任务的功能,在 Django 项目中的任何位置调用任务函数即可。例如,在 views.py 文件中,我们可以从视图函数中启动任务,如下所示:

from myapp.tasks import send_email

def my_view(request):
    send_email.delay()
    return HttpResponse('任务已经在后台执行。')

如果想要实现定时任务的功能,可以在 Celery 的配置文件中设置定时任务的调度方式。例如,要每小时运行一次 send_email() 任务,我们可以添加以下代码:

from celery.task.schedules import crontab

app.conf.beat_schedule = {
    'send-email-every-hour': {
        'task': 'myapp.tasks.send_email',
        'schedule': crontab(minute=0, hour='*/1'),
    },
}

定时任务的具体写法可以参考官方文档:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab

运行 Celery-worker 与 Celery-beat

Celery是一个分布式任务队列,由三个主要组件组成:Celery worker、Celery beat 和消息代理(例如 Redis 或 RabbitMQ)。这些组件一起协作,让开发者能够轻松地执行异步任务和定时任务。

Celery worker:负责接收任务请求并执行任务。当您在 Django 应用程序中调用 apply_async 方法时,任务将被发送到 Celery worker,然后由 worker 执行。

Celery beat:负责调度定时任务。它会根据定义的规则定期触发任务,并将其发送到 Celery worker 处理。

所以,对于需要运行定时任务的情况,我们需要同时启动 Celery worker 和 Celery beat 进程来确保所有任务都可以被正确地处理和执行。

如果只需要使用 Celery 来执行异步任务,那么只需启动 Celery worker 即可。但如果需要周期性地执行任务,那么需要启动 Celery beat 来帮助完成调度这些任务。

# 运行 worker 与 beat
celery -A proj worker --loglevel=info --detach --pidfile=worker.pid --logfile=./logs/worker.log
celery -A proj beat --loglevel=info --detach --pidfile=beat.pid --logfile=./logs/beat.log
  • -A proj:指定 Celery 应用程序所在的模块或包,这里假设其名为 proj。
  • worker 或 beat:启动的进程名称,分别对应 worker 和 beat 两种类型的 Celery 进程。
  • --loglevel=info:设置日志级别为 info,即只记录 info 级别及以上的日志信息。
  • --detach:以守护进程(daemonized)方式启动 Celery 进程,使其在后台运行。
  • --pidfile=worker.pid 或 --pidfile=beat.pid:将进程 ID(PID)写入指定的 PID 文件,方便后续管理和监控。
  • --logfile=./logs/worker.log 或 --logfile=./logs/beat.log:指定日志文件路径,所有日志信息都会输出到该文件中。

随后我们设定的定时任务便会按规则执行,可以通过指定的日志文件查看执行结果。当我们需要停止 Celery worker 与 Celery beat 时,可以执行以下操作:

kill -TERM $(cat worker.pid)
kill -TERM $(cat beat.pid)

[1] [Using Celery with Django]: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#using-celery-with-django
[2] [Periodic Tasks]: https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK