6

celery笔记五之消息队列的介绍 - XHunter

 1 year ago
source link: https://www.cnblogs.com/hunterxiong/p/17497326.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文首发于公众号:Hunter后端
原文链接:celery笔记五之消息队列的介绍

前面我们介绍过 task 的处理方式,将 task 发送到队列 queue,然后 worker 从 queue 中一个个的获取 task 进行处理。

task 的队列 queue 可以是多个,处理 task 的 worker 也可以是多个,worker 可以处理任意 queue 的 task,也可以处理指定 queue 的 task,这个我们在介绍 queue 的时候再做介绍。

这一篇我们来介绍一下存储 task 的队列 queue。

  1. 默认队列 task_default_queue
  2. 将 task 指定到队列 queue 消费

以下的操作都是在 Django 系统的配置中使用。

1、默认队列 task_default_queue

当我们运行一个最简单的延时任务比如 add.delay(1, 2) 时,并没有设置一个消息队列,因为如果我们没有指定,系统会为我们创建一个默认队列。

这个默认的队列被命名为 celery,值在 app.conf.task_default_queue,我们可以查看一下:

from hunter.celery import app
app.conf.task_default_queue

# 输出为 'celery'

2、定义队列

我们可以设想一下这个场景,我们只有一个 worker 处理 task,每个 task 需要处理的时间很长,因为 worker 被占用,这样在我们的任务队列里就会积压很多的 task。

有一些需要即时处理的任务则会被推迟处理,这样的情况下,我们理想的设计是设置多个 worker,多个 worker 分别处理指定队列的 task。

关于 worker 的设置,比如添加多个 worker,给 worker 消费指定队列的 task,我们在 worker 的笔记中再介绍,这里我们介绍一下如何定义队列。

任务队列的定义如下:

# hunter/celery.py

from kombu import Queue

app.conf.task_queues = (
    Queue('blog_tasks', ),
)

当我们定义了任务队列之后,我们可以将 task 指定输出到对应的 queue,假设 blog/tasks.py 下有这样一个 task:

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

接下来我们调用这个 task 的时候,需要指定队列:

from blog.tasks import add
add.apply_async((1, 2), queue='blog_tasks')

如果我们就这样配置 celery,这个时候如果我们直接再调用 delay() 函数,也就是不指定 queue 的话,会发现我们发出的 task 是不能被 worker 处理的。

也就是说,下面的操作是不起作用的:

from blog.tasks import add
add.delay(1, 2)  # 此时,我们的调用不会被队列接收到

如果需要在调用 task 的时候不指定队列,使用系统默认的队列,这个时候我们需要额外来指定一个 task_default_queue,celery 的配置如下:

# hunter/celery.py

app.conf.task_queues = (
    Queue('blog_tasks'),
    Queue('default_queue'),
)
app.conf.task_default_queue = 'default_queue'

这样,我们在使用延时任务的时候,就不需要指定 queue 参数了,都会走我们的默认 task 队列:

from blog.tasks import add
add.delay(1, 2)  # 队列会被 default_queue 接收到

而如果我们想实现 add 的延时任务走的是 blog_tasks 这个队列,但是我们在调用的时候不想那么麻烦每次都指定 queue 参数,这个就需要用到 task_routes 配置项了。

3、将 task 指定到队列 queue 消费

如果我们想某些函数使用指定的 queue,我们可以使用 task_routes 配置项来操作。

现在我们有两个 application,blog 和 polls,这两个 application 下都有各自的 tasks,文件的内容如下:

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def minus(x, y):
    return x - y
# polls/tasks.py
from celery import shared_task

@shared_task
def multi(x, y):
    return x * y

我们想要实现的最终的目的是在调用延时任务的时候,可以直接使用 delay() 的方式,不需要使用 apply_async(queue='xx')。

我们想要实现的功能是,polls/tasks.py 下的所有的延时任务以及 blog/tasks.py 下的 add() 函数进入 queue_1 队列

blog 下的 minus() 函数进入 queue_2 队列

其他所有的 task 都走默认的队列,default_queue。

我们可以如下配置:

app.conf.task_queues = (
    Queue('queue_1'),
    Queue('queue_2'),
    Queue('default_queue'),
)

app.conf.task_routes = {
    'polls.tasks.*': {
        'queue': 'queue_1',
    },
    'blog.tasks.add': {
        'queue': 'queue_1',
    },
    'blog.tasks.minus': {
        'queue': 'queue_2',
    },
}

app.conf.task_default_queue = 'default_queue'

如果想获取更多后端相关文章,可扫码关注阅读:

image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK