1

基于Celery的后台任务

 2 years ago
source link: https://yinwc.github.io/2021/01/12/Python-Celery/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Celery简介

Celery 是一个异步任务队列/基于分布式消息传递的作业队列,Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常使用它来实现异步任务(async task)和定时任务(crontab)

应用程序可能需要执行任何消耗资源的任务都可以交给任务队列,让应用程序自由和快速地响应客户端请求。

Celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。

Celery主要包括以下四个模块:

  • 任务模块 Task
    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
  • 消息中间件 Broker
    • Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
    • Broker部分负责任务消息的分发以及任务结果的存储这部分任务主要由中间数据存储系统完成,比如消息队列服务器RabbitMQ、redis、Amazon SQS、MongoDB、IronMQ等或者关系型数据库,使用关系型数据库依赖sqlalchemy或者django的ORM
  • 任务执行单元 Worker
    • Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。在我的理解中工作线程就是写的python代码,当然还包括python调用系统工具功能
  • 任务结果存储 Backend
    • Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

使用 Celery 实现异步任务主要包含三个步骤:

  1. 创建一个 Celery 实例
  2. 启动 Celery Worker
  3. 应用程序调用异步任务

创建Celery 实例

创建test.py文件,写入:

# -*- coding: utf-8 -*-
# Author:V1ZkRA
# Time:2021/4/12

import time
from celery import Celery

# 指定消息中间件用 redis,URL 为 redis://127.0.0.1:6379;
broker = 'redis://127.0.0.1:6379'

# 指定存储用 redis,URL 为 redis://127.0.0.1:6379/0;

backend = 'redis://127.0.0.1:6379/0'

# 创建一个 Celery 实例 app,名称为 my_task;
app = Celery('my_task', broker=broker, backend=backend)

# 创建了一个 Celery 任务 add,当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务;
@app.task
def add(x, y):
time.sleep(5) # 模拟耗时操作
return x + y

启动Celery Worker
在当前目录,使用如下方式启动 Celery Worker

celery worker -A tasks --loglevel=info

Flask中使用Celery

flask-celery-example

示例代码:

from flask import Flask

app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


@celery.task()
def add_together(a, b):
return a + b

在后台进行调度:

>>> result = add_together.delay(23, 42)
>>> result.wait()
65

Reference

异步任务神器 Celery 快速入门教程


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK