11

Django动态添加定时任务之django-celery的使用

 2 years ago
source link: https://blog.ops-coffee.cn/s/h3lpkehj4-bwefrhnqsfcq
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Django动态添加定时任务之django-celery的使用

定时任务和周期任务在我们日常工作中应用广泛,例如定时发布、周期巡检等,通常我们会借助Linux下的Crontab来实现,但如何将这一功能搬进我们自研的运维系统呢?借助django-celery即可轻松完成,本篇文章就通过自定义任务引擎Probius中计划任务的实现来介绍django-celery的使用

20201229.01.gif

Celery是基于Python开发的一个分布式任务队列框架,主要用来实现异步任务,具体介绍和用法可以看我之前写的这篇文章:Django配置Celery执行异步任务和定时任务,Django本身不支持异步,要想实现异步的话借助Celery是个不错的选择,上边这篇文章就提供了django集成celery的方法,但其配置稍微复杂,且不支持动态添加定时任务,django-celery的出现很好的解决了这个问题

django-celery为django提供了celery集成,同时支持将结果存储在django的orm或cache中,最重要的是支持从数据库动态读取计划任务并执行,这也就是说我们只需要将需要执行的加护任务插入数据库,django-celery就可以自动发现并执行了,接下来就具体看下如何实现

1.安装django-celery

pip install django-celery

2.settings.pyINSTALLED_APPS中加入djcelery

INSTALLED_APPS = [
    ...
    'djcelery',
]

3.settings.py中添加如下配置

import djcelery
djcelery.setup_loader()

# BROKER和BACKEND配置,这里用了本地的redis,其中1和2表示分别用redis的第一个和第二个db
BROKER_URL = 'redis://localhost/1'
CELERY_RESULT_BACKEND = 'redis://localhost/2'

# celery 关闭UTC时区
CELERY_ENABLE_UTC = False

# celery 并发数设置,最多可以有20个任务同时运行
CELERYD_CONCURRENCY = 20
CELERYD_MAX_TASKS_PER_CHILD = 4

# celery开启数据库调度器,数据库修改后即时生效
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

from celery import platforms

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

4.启动celery

python manage.py celery worker -l info

至此就可以使用djcelery来处理异步任务了

具体的用法为在app根目录下添加一个tasks.py文件,在文件中编写函数,给函数添加上shared_task装饰器即可

假设一个项目有如下目录结构

project
    - coffee
        - __init__.py
        - admin.py
        - app.py
        - models.py
        - tasks.py
        - tests.py
        - views.py
    - webapp
        - __init__.py
        - settings.py
        - urls.py
        - wsgi.py
    - manage.py

我们在名为coffee的app下添加文件tasks.pytasks.py文件内容如下

from celery import shared_task


@shared_task
def welcome():
    print('Welcome to ops-coffee.cn')

    return True

然后就可以在view或其他地方异步调用这个welcome函数了

from django.http import JsonResponse
from coffee.tasks import welcome


def index(request):
    welcome.delay()

    return JsonResponse({"state": 1, "message": "welcome"})

以上只是将任务变成了异步,如果我们想要周期执行welcome任务,该如何操作呢?

1.首先需要执行migrate命令在数据库创建表

python manage.py migrate

2.然后修改settings.py文件中添加CELERYBEAT_SCHEDULE配置

CELERYBEAT_SCHEDULE = {
    'ops-coffee-1': {
        'task': 'coffee.tasks.welcome',
        'schedule': timedelta(seconds=20)
    },
    'ops-coffee-2': {
        'task': 'coffee.tasks.welcome',
        'schedule': crontab(hour=17, minute=30),
    }
}

以上配置详细的解释可以看文章:Django配置Celery执行异步任务和定时任务

3.最后启动beat

python manage.py celery beat -l info

至此welcome任务就会在设置的时间执行了

PS: 启动beat时可能会有如下报错

TypeError: can't subtract offset-naive and offset-aware datetimes

这主要是因为时区引起的,请修改时区相关的配置

TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False

至此已经可以添加周期任务或定时任务了,但操作方式比较麻烦,还需要改动配置文件,说好的动态添加呢,别急这就来了,打开数据库会看到几张以djcelery_开头的表

20201229.02.png

其中对于动态添加计划任务有用的是计划任务时间定义表djcelery_crontabschedule,循环任务时间定义表djcelery_crontabschedule,以及任务表djcelery_periodictask

只需要在对应的表里插入数据就ok了,以我在自定义任务引擎Probius中的使用为例,前端会传递时间到view,view中关于定时任务的的大概处理逻辑如下

from djcelery.models import CrontabSchedule, PeriodicTask

with transaction.atomic():
    save_id = transaction.savepoint()

    try:
        _c, created = CrontabSchedule.objects.get_or_create(
            minute=str(minute),
            hour=str(hour),
            day_of_week=str(day_of_week),
            day_of_month=str(day_of_month),
            month_of_year=str(month_of_year)
        )

        dt = datetime.now().strftime('%Y%m%d%H%M%S')
        _p = PeriodicTask.objects.create(
            name=dt + '-' + '运维咖啡吧周期任务A',
            task='coffee.tasks.welcome',
            args=[37],
            enabled=True,
            crontab=_c
        )

        print('计划任务添加成功')
    except Exception as e:
        transaction.savepoint_rollback(save_id)
        print('添加计划任务失败,错误原因:' + str(e))

通过with transaction.atomic()创建一个事物,保证时间和任务都能同时添加成功,否则就回滚

然后通过get_or_create方法去检索循环任务时间定义表CrontabSchedule,如果有就获取到实例,没有就创建

最后往任务表PeriodicTask里插入任务,name为任务名称,具有唯一性,所以这里加了时间前缀防止重复,task为celery的task任务,字符串类型,在启动celery的时候就可以看到,args传给任务的参数,这里也可以用kwargs的字典形式传,就把args字段改成kwargs即可,enabled定义了这个任务是启动或关闭状态,crontab为循环任务时间实例,如果这里要用周期任务,就是每n秒n分循环执行这样的,只需要将crontab关联换成interval即可,那就需要事先往IntervalSchedule表里插入数据

还记得开头settings.py配置文件中我们配置的CELERYBEAT_SCHEDULER吗?就因为有这个配置,所以当数据表里的数据变更之后,celery的beat程序就能监听到从而在配置的时间触发worker去执行任务

至此,主要功能我们都已实现,django-celery的计划任务只能支持固定时间吗?其实不然,他支持的语法与linux下的crontab类似,像hour='*/3,9-18'这样的复杂语法也是支持的,在Probius中对于复杂语法我们也直接提供了更为灵活的自定义任务执行方式

20201229.03.png

为了便于使用,减少学习成本,这里就直接用了linux下crontab的格式,传到后端后解析成对应的时间写入数据库。有些小伙伴跟我说开发出来的工具没人用,或者是其他部门推不动,项目开发中的这些个细节考虑到位,尽量减少用户学习使用成本,做的足够好用、易用,还怕没有人用嘛


能看到这里一定是真爱,关注一下吧

wx.sou1.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK