3

Django+Celery+Rabbitmq+Flower使用小记

 2 years ago
source link: https://www.hi-roy.com/posts/django-celery-rabbitmq-flower%E4%BD%BF%E7%94%A8%E5%B0%8F%E8%AE%B0/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Django+Celery+Rabbitmq+Flower使用小记

2016-02-23

之前的博客中简单的介绍了celery的安装配置以及如何在python程序中使用,这里记录一下我使用django结合celery以及rabbitmq提供web服务,同时使用flower进行监控的过程。至于这几样东西是什么、怎么安装这里就不再细说了。

涉及到的关键点如下:

  1. 如何在django中使用celery?
  2. 如何触发定时任务?
  3. 如何为不同的任务绑定不同的消息队列?
  4. 如何重试出错的任务?
  5. 如何发送报错邮件?
  6. 如何使用flower进行监控?

首先,项目依赖:

celery==3.1.20
Django==1.9.2
flower==0.8.4

注意版本,有可能你看到本文的时候相关配置会有所变化。

首先创建一个空的django项目,我这里就叫djce好了:

tree djce
djce
├── djce
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── manage.py

接下来在settings.py的同级目录中新建celery.py:

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djce.settings')
app = Celery('djce')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

修改__init__.py添加:

from __future__ import absolute_import
from .celery import app as celery_app

修改settings.py添加:

# 添加celery配置
BROKER_URL ='amqp://[email protected]//'
CELERY_RESULT_BACKEND = 'amqp://[email protected]//'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IGNORE_RESULT = True
# 启用报错邮件
CELERY_SEND_TASK_ERROR_EMAILS = True
# 分离2个任务队列
from kombu import Exchange, Queue
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('for_task_a', Exchange('for_task_a'), routing_key='for_task_a'),#这个是主动任务的队列
    Queue('for_task_b', Exchange('for_task_b'), routing_key='for_task_b'),#这个是定时任务的队列
)
CELERY_ROUTES = {
    'task_a': {'queue': 'for_task_a', 'routing_key': 'for_task_a'},
    'task_b': {'queue': 'for_task_b', 'routing_key': 'for_task_b'},
}
# 收件人
ADMINS = (
    ('xxxx', '[email protected]'),
)
SERVER_EMAIL = '[email protected]'
EMAIL_HOST = 'xxxx'
EMAIL_PORT = 25
EMAIL_HOST_USER = '[email protected]'
EMAIL_HOST_PASSWORD = 'asdasd'

在上面的设置中,我们启用了报错邮件并设置了2个队列用于接收不同的消息,记得把配置信息修改成你使用的环境相应配置。接下来我们新建一个app,这里就叫mytask好了,并在mytask中新建tasks.py:

from celery.utils.log import get_task_logger
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.decorators import task
logger = get_task_logger(__name__)
@task(name="task_a", default_retry_delay=5, max_retries=3, bind=True)
def task_a(self):
    try:
        print "in task_a"
    except Exception as e:
        # 隔5S重试,最多3次
        logger.info(str(e))
        raise self.retry(exc=e)
@periodic_task(
    run_every=(crontab(minute='*/1')),
    name="task_b",
    ignore_result=True, bind=True)
def task_b(self):
    #每1分钟执行一次
    print "in task b"

在views.py中写一个最简单的逻辑即可:

from django.http import HttpResponse
from .tasks import task_a
class MyTask(View):
    def get(self, request, *args, **kwargs):
        task_a.delay() #发送消息,触发后台任务
        return HttpResponse("django and celery!")

编写相应的urls.py并在settings.py中引入我们的mytask后(很简单,这里就不贴出来了),确定rabbitmq服务成功运行后,首先我们打开第一个终端运行django程序: python manage.py runserver

然后打开第二个终端,运行worker处理task_a: celery -A djce worker -l info -Q for_task_a

再然后打开第三个终端,运行worker处理task_b: celery -A djce worker -l info -Q for_task_b

注意使用-Q参数绑定对应的队列。

再再然后打开第四个终端,用于触发我们的计划任务task_b: celery -A djce beat -l info

至此,一个没卵用但是说明了整个逻辑的程序就完成了,只要我们访问相应的url,task_a的终端就会有输出,并且每分钟task_b的终端也会有输出。

接下来说说监控的flower,想使用这个,首先我们需要在rabbitmq中启用rabbitmq_management并重启服务

rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

其次我们需要新增一个管理员用户,怎么添加请看rabbitmq的文档。上述工作没问题的话打开第五个终端: flower -A djce --port=5555 --broker_api=http://rbtuser:[email protected]:15672/api/

其中rbtuser,passwd以及服务器ip根据真实情况修改,这里需要注意的是,必须先启用worker后再启用flower,否则点击worker后会提示找不到对应的worker。

没问题的话,使用浏览器访问5555端口就可以看见flower的界面了。

其它问题:

  1. 如果修改了task的逻辑,必须重启worker才能生效。
  2. 想清空某个队列的话,可以直接访问rabbitmq服务器的15672端口并登录,选择“Queues”后点击“Purge”,注意:“delete”是删除这个队列的意思!
  3. 默认情况下,机器有多少个cpu则worker可以同时处理多少任务,可以在同一台机器上启动多个worker进程。
  4. 需要的终端有点多?请使用supervisor!

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK