21

在 Flask 项目中使用 Celery(with 工厂模式 or not)

 3 years ago
source link: https://greyli.com/use-celery-with-flask-factory-or-not/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在 Flask 项目中使用 Celery(with 工厂模式 or not)

本文隶属于《Flask Web 开发实战》番外系列。这篇文章会介绍如何在 Flask 项目中集成 Celery。

创建 Celery 程序

第一步是创建一个 Celery 程序实例。因为 Flask 程序实例通常会命名为 app,为了避免冲突,我们一般会把 Celery 实例命名为 celery 或 celery_app:

from celery import Celery
celery = Celery(__name__, broker='pyamqp://guest@localhost//')
@celery.task
def add(x, y):
    return x + y

组织和加载配置

大多数教程,包括目前的 Flask 文档里都会介绍用下面的方式加载配置:

celery.conf.update(app.config)  # 这里的 app 是 Flask 程序实例

也就是把 Celery 配置和 Flask 配置写在一起,然后从 Flask 程序实例的配置字典里更新配置。

但问题是,Celery 从 4.0 开始启用新的小写配置名,某些配置被新的名称替换。虽然旧的大写配置仍然支持,但如果你打算使用小写配置名,或是打算在未来进行迁移,这里的配置加载方式就会失效,因为 Flask 在从文件或对象导入配置时只会导入大写形式的配置变量。

因此,我建议将 Celery 配置写在单独的文件里,不要和 Flask 配置混在一起。按照 Celery 文档的示例,你可以在当前目录创建一个 celeryconfig.py 文件(或是其他名字)保存配置:

broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

然后使用下面的方法加载配置(使用其他模块名,或是在其他路径时,记得修改传入的字符串):

celery.config_from_object('celeryconfig')

如果需要在创建 Celery 实例时传入 broker 和 backend 配置,可以直接写出或是从配置模块中导入:

from celeryconfig import broker_url
celery = Celery(__name__, broker=broker_url)

在 Flask 程序中初始化 Celery

你可以单独创建 Celery 程序,但我们通常会需要为它添加 Flask 程序上下文支持,因为有时候你的 Celery 任务函数会用到依赖于 Flask 程序上下文的某些变量。

下面我们为 Celery 创建了一个工厂函数,在工厂函数中创建 Celery 实例,加载配置,并实现 Flask 程序上下文支持:

from flask import Flask
from celery import Celery
from celeryconfig import broker_url
def make_celery(app):
    celery = Celery(__name__, broker=broker_url)
    celery.config_from_object('celeryconfig')
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    celery.Task = ContextTask
    return celery
app = Flask(__name__)
celery = make_celery(app)

在定义 Celery 任务的模块里,比如 tasks.py,你可以导入这个 Celery 程序实例:

from app import celery
@celery.task
def add(x, y):
    return x + y

在使用工厂函数的 Flask 程序中初始化 Celery

当 Flask 程序也使用工厂函数创建时,我们可以全局创建 Celery 程序实例,然后在创建 Flask 程序实例的工厂函数里更新 Celery 程序配置并进行上下文设置:

from flask import Flask
from celery import Celery
from celeryconfig import broker_url
celery = Celery(__name__, broker=broker_url)
def create_app():
    app = Flask(__name__)
    register_celery(app)
    return app
def register_celery(app):
    celery.config_from_object('celeryconfig')
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    celery.Task = ContextTask

同样直接导入 Celery 实例并创建任务:

from app import celery
@celery.task
def add(x, y):
    return x + y

这本来是一个完整的 Celery 入门教程,但因为去年的一次硬盘损坏,对应的示例程序弄丢了,暂时没有毅力重写一遍,所以这篇文章只抽取了其中和 Flask 相关的内容。

因为距离初稿写作的时间已经过去半年多,Celery 的最新版本也已经是 4.3.0,如果文中有什么疏漏,或是有更好的实现方式,欢迎评论指出。

本条目发布于2019年4月24日。属于计算机与编程分类,被贴了 CeleryFlask 标签。 ← 为已存在的数据库生成 SQLAlchemy / Flask-SQLAlchemy 模型类 如何有效防止久坐、用眼过度和关节损伤? →

《在 Flask 项目中使用 Celery(with 工厂模式 or not)》上有2条评论

  1. 头像季子禾 2020年5月29日下午3:50

    ‘No application found. Either work inside a view function or push’
    RuntimeError: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.

    回复
    1. 头像季子禾 2020年5月29日下午3:51

      可以运行,但是上下文没联上,请辉大神指导下

      回复

撰写评论 取消回复

电子邮件地址不会被公开,必填项已用*标出。

评论

姓名 *

电子邮件 *

站点

在此浏览器中保存我的名字、电邮和网站。

当有人回复我时,发送电子邮件提醒我。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK