3

django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logge...

 1 year ago
source link: https://blog.51cto.com/colinspace/5840163
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logger)

精选 原创
django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logger)_python

通过本文可以获取的知识点有:

1、celery信号中的 ​​logging signal​

 ​after_setup_logger 参考地址​

2、Django中如何配置和使用celery

3、Django中如何加载celery 信号

主要是Django中应用入口的 ​​ready(self)​​ 函数认识和使用

4、Python logging​​自定义 Handler​

 ​Python logging 模块介绍​

1、每个任务的日志独立存放,那么肯定是要能获取到任务id,然后按照​​任务id设定日志文件路径​

2、Django程序中执行task,那么程序中日志的写入,肯定不能使用print打印输出到启动程序Django主日志中去, 那肯定是采用​​logging模块​​配置不同的logger来实现

3、Django怎么把自定义的logger和celery关联起来呢, celery有自己自带的logger(​​from celery.utils.log import get_task_logger​​),每个task 独立日志肯定不能放到这个自带的logger

一、我们先创建Django工程和测试用的应用demoapp,然后在应用中利用celery跑一个任务task

先给出工程和应用的结构

├── demoapp
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── celery
│ │ └── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── django_celery_singal
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-36.pyc
│ │ └── settings.cpython-36.pyc
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py

注意这里有个知识点

一般情况,Django继承celery的时候,大家默认是在工程文件夹(比如这里的 django_celery_singal)下创建celery,然后在工程的​​__init__.py​​ 文件中进行import加载。

其实这不是唯一选择。可以放到任何地方,关键在于启动celery的时候​​-A​​ 参数后面根的值

1.1、配置celery
# demoapp/celery/__init__.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_singal.settings')
app = Celery(__name__)

# 从Django的settings.py加载 celery的配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现应用中的tasks(应用中的tasks.py文件中定义的任务)
app.autodiscover_tasks()

# demo_celery_signal/settings.py
# settings for celery
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

CELERY_BROKER_URL = "redis://127.0.0.1:6379/11"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/11"
CELERY_RESULT_SERIALIZER = 'json'

重点📢: 这里切记两个点:

1、在 demoapp/\init.py 中引入celery ,内容如 ​​from .celery import app as celery_app​

2、切记把demoapp加入到 ​​INSTALLED_APPS​​ 中(建议:创建完应用第一时间就加入到该配置项中去)

1.2、在应用中定义task
# demoapp/tasks.py
from celery import shared_task

@shared_task
def task_hello():
print('Hello Task under demoapp')
1.3、task绑定到APP的view中并配置URL
# demoapp/views.py
from django.http import HttpResponse
from demoapp.tasks import task_hello

def demo(request):
task_hello.delay()
return HttpResponse("Task Executed")

配置APP的URL

# demoapp/urls.py
from django.urls import path
from demoapp import views

urlpatterns = [
path('demo/', views.demo, name='demo-task'),
]

把APP的URL加入到工程URL入口中去

# demo_celery_singal/urls.py
from django.urls import include

urlpatterns = [
... ...,
path('demoapp/', include('demoapp.urls')),
]
1.4、启动Django和celery

启动celery,注意这里启动的方式 ​​celery -A demoapp worker -l info​​​ 是 ​​demoapp​​​ 应用,而不是工程名称​​django_celery_signal​​ ,因为celery使用的位置在 demoapp 中

django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logger)_django_02

启动Django服务 ​​python manage.py runserver 127.0.0.1:8088​

1.5、访问测试celery任务

访问 ​ ​http://127.0.0.1:8088/demoapp/demo/​

前端页面显示 ​​Task Executed​​​ (view视图的返回),在后台celery的日志中显示如下,知道​​Django+celery运行异步任务​​,搭建完成

[2022-11-05 09:19:50,973: WARNING/ForkPoolWorker-8] Hello Task under demoapp
[2022-11-05 09:19:50,974: WARNING/ForkPoolWorker-8]

[2022-11-05 09:19:50,978: INFO/ForkPoolWorker-8] Task demoapp.tasks.task_hello[ac93e84d-9903-4b7c-a444-64cf6be5a3da] succeeded in 0.022122333000879735s: 'ok'

二、自定义logging的Handler

因为要把每个task的日志放到独立的文件中的,这个日志的​​处理 handler​​ 就需要自定义了

因为是放到日志文件中,看了​​logging​​​ 模块的介绍我们知道, ​​FileHandler​​​ 是继承自​​StreamHandler​​​ 或者我们这里也继承​​StreamHandler​

import os
from logging import StreamHandler
from celery import current_task
from celery.signals import task_prerun, task_postrun


class CeleryTaskLoggerHandler(StreamHandler):
terminator = '\r\n'

def __init__(self, *args, **kwargs):
self.task_id_fd_mapper = {}
super().__init__(*args, **kwargs)
# 使用 celery的task信号,设置任务开始和结束时的执行的东西
# 主要是获取task_id 然后创建对应的独立任务日志文件
task_prerun.connect(self.on_task_start)
task_postrun.connect(self.on_start_end)

@staticmethod
def get_current_task_id():
# celery 内置提供方法获取task_id
if not current_task:
return
task_id = current_task.request.root_id
return task_id

def on_task_start(self, sender, task_id, **kwargs):
# 这里是根据task_id 定义每个任务的日志文件存放
log_path = os.path.join('logs/', f"{task_id}.log")
f = open(log_path, 'a')
self.task_id_fd_mapper[task_id] = f

def on_start_end(self, sender, task_id, **kwargs):
f = self.task_id_fd_mapper.pop(task_id, None)
if f and not f.closed:
f.close()
self.task_id_fd_mapper.pop(task_id, None)

def emit(self, record):
# 自定义Handler必须要重写的一个方法
task_id = self.get_current_task_id()
if not task_id:
return
try:
f = self.task_id_fd_mapper.get(task_id)
self.write_task_log(f, record)
self.flush()
except Exception:
self.handleError(record)

def write_task_log(self, f, record):
# 日志的实际写入
if not f:
raise ValueError('Not found thread task file')
msg = self.format(record)
f.write(msg)
f.write(self.terminator)
f.flush()

def flush(self):
for f in self.task_id_fd_mapper.values():
f.flush()

三、Django配置调用celery的logging signal

3.1、先创建信号处理函数

先定义信号回调处理函数add_celery_logger_handler, 然后进行信号的绑定,绑定一般是采用​​装饰器​​的方式

当然也可以不采用这种方式,然后在需要使用信号的地方,进行单独绑定配置(​​after_setup_logger.connect(add_celery_logger_handler)​​)

import logging
from celery.signals import after_setup_logger

@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
logger = logging.getLogger('celery_signal')
handler = logging.FileHandler('celery_signal.log')
formatter = logging.Formatter(logging.BASIC_FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("Here call the celery logging signal - after_setup_logger")

这个时候重新启动celery,是不会产生​​celery_signal.log​​ 文件,那就更不会调用对应的信号回调了

3.2、Django绑定celery的信号

上面只是定了信号的​​回调函数​​ 然后和信号进行了绑定,但是Django怎么调用celery的信号处理呢?

答案是利用Django应用的入口​​ready() 函数​

# demoapp/apps.py
from django.apps import AppConfig

class DemoappConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'demoapp'
verbose_name = "Celery Signal App"

def ready(self):
# 可以添加如下语句测试 Django启动的时候会不会执行到这里
print('我被执行了!')
# 导入上面定义的 信号处理回调
from demoapp.celery import signal_handler
super().ready()

然后重新启动celery, 查看 celery_signal.log 日志文件

cat celery_signal.log
INFO:celery_signal:Here call the celery logging signal - after_setup_logger

可以知道,Django 绑定了 celery的信号

四、使用celery的信号after_setup_logger绑定自定义的Handler

修改上面定义的 信号回调函数,绑定自定义的日志处理Handler

# demoapp/celery/signal_handler.py
from celery.signals import after_setup_logger
from .logger import CeleryTaskLoggerHandler

@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
task_handler = CeleryTaskLoggerHandler()
task_handler.setLevel(loglevel)
formatter = logging.Formatter(format)
task_handler.setFormatter(formatter)
logger.addHandler(task_handler)

这里需要先手动在工程目录下创建一个 logs 文件夹,因为handler中没有对应logs不存在做判断处理

然后我们访问Django的view视图 ​​http://127.0.0.1:8088/demoapp/demo/ ​​ 查看logs 目录发现有个UUID为文件名的log文件

[ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] ls -l logs
total 8
-rw-r--r-- 1 colinspace wheel 138 11 5 18:17 8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log
[ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] cat logs/8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log
Hello Task under demoapp

Task demoapp.tasks.task_hello[8ba8d8d6-9d28-4b64-a396-b91f11ae0df8] succeeded in 0.03539445897331461s: 'ok'

这里的​​Hello Task under demoapp​​​ 是task输出的日志,​​'ok'​​ 是task的返回值,没有的话是None

至此,完全实现了刚开始的需求。


完美实现~ 项目源码详见 ​ ​https://gitee.com/colin5063/django-learnning-examples/tree/master/django_celery_singal​

如果觉得文章对你有用,请不吝点赞和关注公众号搜索 ​​全栈运维​​​ 或者 ​​DailyJobOps​

个人博客 ​ ​http://blog.colinspace.com/​

知乎平台 ​ ​https://www.zhihu.com/people/colin-31-49​

简书平台 ​ ​https://www.jianshu.com/u/6d793fbacc88​


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK