2

Celery多队列解决生产环境下的任务优先级问题

 6 months ago
source link: https://blog.ops-coffee.cn/s/django-celery-beat-queues-priority
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Celery多队列解决生产环境下的任务优先级问题

运维自动化系统需要处理大量的周期任务,例如上篇文章运维自动化之域名系统中提到的域名临期提醒、域名解析绑定资源下线提醒、域名证书扫描及临期提醒等,同时也会有大量的异步任务和定时任务需要执行,例如异步的手动资源同步、定时的系统发布任务等等,这些周期/定时/异步任务的执行都依赖Celery这个Python环境下的知名工具,关于Celery我之前也有介绍过他的基本使用,通常情况下这些用法已经能够满足我们的需求,但随着系统对功能强大用户增多,就会用到Celery的一些进阶的用法,例如今天这篇文章要写的多队列

为什么会用到多队列?实际情况是系统中有大量的定时资源同步任务在周期运行,同时也会有一些临时性的发布部署任务要执行,但这些任务是有优先级的,定时资源同步任务优先级较低,发布部署任务优先级较高,往往会出现定时任务占用进程而发布部署任务等待的情况

出现这个问题如何解决呢,首先最简单的是调整同时可以执行的任务数量,同一时间可以执行更多的任务,我调整了控制并发线程的concurrency参数,由20增加一倍到40,高优先级任务执行等待的情况有所缓解,但仍能不时遇到,偷懒的方法有用但不多。于是就要找找更好的方法了,既然是优先级,Celery用RabbitMQ或是Redis作为Broker时本身就是支持消息优先级的,只是两个不同的Broker对优先级的实现方式差异巨大,RabbitMQ本身支持优先级,Celery也直接使用了RabbitMQ的优先级来实现,而Redis并没有优先级的概念,Redis作为Broker时Celery对优先级的支持是通过为每个队列创建n个列表来实现的。虽然都是支持,但这里有个非常重要的问题,不同broker对优先级的级别x-max-priority定义不同,一个是越大越优先一个是越小越优先,这要是更换Broker那就涉及到代码的修改,维护难度增加,理解起来也不容易,能用但不是最优解。相比之下多队列的解决方案就很合适了,启动多个队列,不同任务路由到不同级别的队列,然后启动多个worker分别去消费不同的队列,即便是同步资源的低优先级队列阻塞也不会影响发布部署的高优先级队列任务执行,就很完美了

20231121.01.png

配置也很简单,没有太多代码的修改

Celery默认会生成一个名为celery的队列,这个队列我们保持不变,让其作为低优先级队列,同时配置一个名为priroity的队列作为高优先级队列

from kombu import Exchange, Queue

CELERY_QUEUES = (
    Queue("priority", Exchange("priority"), routing_key="priority"),
)

然后添加一条路由策略将位于englne.tasks下的所有任务,也就是发布部署任务路由到队列priority中去处理,其他默认没有配置的任务将依然会由默认队列celery处理

CELERY_ROUTES = {
    'engine.tasks.*': {"queue": "priority", "routing_key": "priority"},
}

配置engine.tasks.*表示的是engine.tasks下的所有任务,当然也可以仅指定一条任务engine.tasks.build或者通过正则的方式re.compile(r'(video|image)\.tasks\..*')去匹配多条任务

这里要明白并不是我们起了名字叫priority它就是高优先级队列,我们自己定义的priority队列和默认的celery队列并没有优先级的区别,我们需要启动两个worker来分别消费这两个队列,这样就能做到单独处理了

celery -A devops worker -n celery.%h -Q celery --loglevel=warning
celery -A devops worker -n priority.%h -Q priority --loglevel=warning

其中通过-Q来指定队列名字,-n则定义新worker的名称,名称中的%h表示运行worker的包含域名的完整主机名称,除了%h外还可以用%n来表示主机名,或是%d表示domain域名。想要控制不同队列消费的进程数量的话可以通过--concurrency参数来灵活配置

至此,问题解决。当然除了解决优先级的问题外,对于自动化运维系统的更新和维护也有了一定的优化,资源同步任务的更新只需要重启消费celery的worker即可,对任务将没有影响,一举两得


能看到这里一定是真爱,关注一下吧

wx.sou1.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK