django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logge...
source link: https://blog.51cto.com/colinspace/5840163
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logger)
精选 原创通过本文可以获取的知识点有:
1、celery信号中的 logging signal
2、Django中如何配置和使用celery
3、Django中如何加载celery 信号
主要是Django中应用入口的 ready(self)
函数认识和使用
4、Python logging自定义 Handler
1、每个任务的日志独立存放,那么肯定是要能获取到任务id,然后按照任务id设定日志文件路径
2、Django程序中执行task,那么程序中日志的写入,肯定不能使用print打印输出到启动程序Django主日志中去, 那肯定是采用logging模块
配置不同的logger来实现
3、Django怎么把自定义的logger和celery关联起来呢, celery有自己自带的logger(from celery.utils.log import get_task_logger
),每个task 独立日志肯定不能放到这个自带的logger
一、我们先创建Django工程和测试用的应用demoapp,然后在应用中利用celery跑一个任务task
先给出工程和应用的结构
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── celery
│ │ └── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── django_celery_singal
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-36.pyc
│ │ └── settings.cpython-36.pyc
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py
注意这里有个知识点
一般情况,Django继承celery的时候,大家默认是在工程文件夹(比如这里的 django_celery_singal)下创建celery,然后在工程的
__init__.py
文件中进行import加载。
其实这不是唯一选择。可以放到任何地方,关键在于启动celery的时候-A
参数后面根的值
1.1、配置celery
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_singal.settings')
app = Celery(__name__)
# 从Django的settings.py加载 celery的配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现应用中的tasks(应用中的tasks.py文件中定义的任务)
app.autodiscover_tasks()
# demo_celery_signal/settings.py
# settings for celery
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_BROKER_URL = "redis://127.0.0.1:6379/11"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/11"
CELERY_RESULT_SERIALIZER = 'json'
重点📢: 这里切记两个点:
1、在 demoapp/\init.py 中引入celery ,内容如
from .celery import app as celery_app
2、切记把demoapp加入到
INSTALLED_APPS
中(建议:创建完应用第一时间就加入到该配置项中去)
1.2、在应用中定义task
from celery import shared_task
@shared_task
def task_hello():
print('Hello Task under demoapp')
1.3、task绑定到APP的view中并配置URL
from django.http import HttpResponse
from demoapp.tasks import task_hello
def demo(request):
task_hello.delay()
return HttpResponse("Task Executed")
配置APP的URL
from django.urls import path
from demoapp import views
urlpatterns = [
path('demo/', views.demo, name='demo-task'),
]
把APP的URL加入到工程URL入口中去
from django.urls import include
urlpatterns = [
... ...,
path('demoapp/', include('demoapp.urls')),
]
1.4、启动Django和celery
启动celery,注意这里启动的方式 celery -A demoapp worker -l info
是 demoapp
应用,而不是工程名称django_celery_signal
,因为celery使用的位置在 demoapp 中
启动Django服务 python manage.py runserver 127.0.0.1:8088
1.5、访问测试celery任务
访问 http://127.0.0.1:8088/demoapp/demo/
前端页面显示 Task Executed
(view视图的返回),在后台celery的日志中显示如下,知道Django+celery运行异步任务
,搭建完成
[2022-11-05 09:19:50,974: WARNING/ForkPoolWorker-8]
[2022-11-05 09:19:50,978: INFO/ForkPoolWorker-8] Task demoapp.tasks.task_hello[ac93e84d-9903-4b7c-a444-64cf6be5a3da] succeeded in 0.022122333000879735s: 'ok'
二、自定义logging的Handler
因为要把每个task的日志放到独立的文件中的,这个日志的
处理 handler
就需要自定义了
因为是放到日志文件中,看了logging
模块的介绍我们知道, FileHandler
是继承自StreamHandler
或者我们这里也继承StreamHandler
from logging import StreamHandler
from celery import current_task
from celery.signals import task_prerun, task_postrun
class CeleryTaskLoggerHandler(StreamHandler):
terminator = '\r\n'
def __init__(self, *args, **kwargs):
self.task_id_fd_mapper = {}
super().__init__(*args, **kwargs)
# 使用 celery的task信号,设置任务开始和结束时的执行的东西
# 主要是获取task_id 然后创建对应的独立任务日志文件
task_prerun.connect(self.on_task_start)
task_postrun.connect(self.on_start_end)
@staticmethod
def get_current_task_id():
# celery 内置提供方法获取task_id
if not current_task:
return
task_id = current_task.request.root_id
return task_id
def on_task_start(self, sender, task_id, **kwargs):
# 这里是根据task_id 定义每个任务的日志文件存放
log_path = os.path.join('logs/', f"{task_id}.log")
f = open(log_path, 'a')
self.task_id_fd_mapper[task_id] = f
def on_start_end(self, sender, task_id, **kwargs):
f = self.task_id_fd_mapper.pop(task_id, None)
if f and not f.closed:
f.close()
self.task_id_fd_mapper.pop(task_id, None)
def emit(self, record):
# 自定义Handler必须要重写的一个方法
task_id = self.get_current_task_id()
if not task_id:
return
try:
f = self.task_id_fd_mapper.get(task_id)
self.write_task_log(f, record)
self.flush()
except Exception:
self.handleError(record)
def write_task_log(self, f, record):
# 日志的实际写入
if not f:
raise ValueError('Not found thread task file')
msg = self.format(record)
f.write(msg)
f.write(self.terminator)
f.flush()
def flush(self):
for f in self.task_id_fd_mapper.values():
f.flush()
三、Django配置调用celery的logging signal
3.1、先创建信号处理函数
先定义信号回调处理函数add_celery_logger_handler, 然后进行信号的绑定,绑定一般是采用装饰器
的方式
当然也可以不采用这种方式,然后在需要使用信号的地方,进行单独绑定配置(after_setup_logger.connect(add_celery_logger_handler)
)
from celery.signals import after_setup_logger
@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
logger = logging.getLogger('celery_signal')
handler = logging.FileHandler('celery_signal.log')
formatter = logging.Formatter(logging.BASIC_FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info("Here call the celery logging signal - after_setup_logger")
这个时候重新启动celery,是不会产生celery_signal.log
文件,那就更不会调用对应的信号回调了
3.2、Django绑定celery的信号
上面只是定了信号的回调函数
然后和信号进行了绑定,但是Django怎么调用celery的信号处理呢?
答案是利用Django应用的入口ready() 函数
from django.apps import AppConfig
class DemoappConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'demoapp'
verbose_name = "Celery Signal App"
def ready(self):
# 可以添加如下语句测试 Django启动的时候会不会执行到这里
print('我被执行了!')
# 导入上面定义的 信号处理回调
from demoapp.celery import signal_handler
super().ready()
然后重新启动celery, 查看 celery_signal.log 日志文件
INFO:celery_signal:Here call the celery logging signal - after_setup_logger
可以知道,Django 绑定了 celery的信号
四、使用celery的信号after_setup_logger绑定自定义的Handler
修改上面定义的 信号回调函数,绑定自定义的日志处理Handler
from celery.signals import after_setup_logger
from .logger import CeleryTaskLoggerHandler
@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
if not logger:
return
task_handler = CeleryTaskLoggerHandler()
task_handler.setLevel(loglevel)
formatter = logging.Formatter(format)
task_handler.setFormatter(formatter)
logger.addHandler(task_handler)
这里需要先手动在工程目录下创建一个 logs 文件夹,因为handler中没有对应logs不存在做判断处理
然后我们访问Django的view视图 http://127.0.0.1:8088/demoapp/demo/
查看logs 目录发现有个UUID为文件名的log文件
total 8
-rw-r--r-- 1 colinspace wheel 138 11 5 18:17 8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log
[ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] cat logs/8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log
Hello Task under demoapp
Task demoapp.tasks.task_hello[8ba8d8d6-9d28-4b64-a396-b91f11ae0df8] succeeded in 0.03539445897331461s: 'ok'
这里的Hello Task under demoapp
是task输出的日志,'ok'
是task的返回值,没有的话是None
至此,完全实现了刚开始的需求。
完美实现~ 项目源码详见 https://gitee.com/colin5063/django-learnning-examples/tree/master/django_celery_singal
如果觉得文章对你有用,请不吝点赞和关注公众号搜索 全栈运维
或者 DailyJobOps
个人博客 http://blog.colinspace.com/
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK