Running minor tasks with a simple job system in Django
source link: https://nessuent.xyz/posts/2022-03-26_Django_jobs.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Running minor tasks with a simple job system in Django
by epilys on 2022-03-26
Suppose you have a Django website that needs to run jobs and when exactly it runs and idempotence are not important, as long as it eventually runs soon enough.
Examples:
- run and collect statistics on data
- send or process received Webmentions
- cache remote URL contents
You can encode a generic job function as a Django model and store it in the database. The job’s function is a plain text field that must include a valid Python dotted path of a function, that is, on runtime we must be able to import the function by using its path from the given string.
from django.db import models
from django.utils.timezone import make_aware
from django.utils.module_loading import import_string
class JobKind(models.Model):
id = models.AutoField(primary_key=True)
dotted_path = models.TextField(null=False, blank=False, unique=True)
created = models.DateTimeField(auto_now_add=True, null=False, blank=False)
last_modified = models.DateTimeField(auto_now_add=True, null=False, blank=False)
def __str__(self):
return self.dotted_path
@staticmethod
def from_func(func):
if isinstance(func, types.FunctionType):
dotted_path = f"{func.__module__}.{func.__name__}"
ret, _ = JobKind.objects.get_or_create(dotted_path=dotted_path)
return ret
else:
raise TypeError
def run(self, job):
try:
func = import_string(self.dotted_path)
return func(job)
except ImportError:
logging.error(f"Could not resolve job dotted_path: {self.dotted_path}")
raise ImportError
You can implement a Job
Django model that can run a JobKind
as follows:
class Job(models.Model):
id = models.AutoField(primary_key=True)
kind = models.ForeignKey(JobKind, null=True, on_delete=models.SET_NULL)
created = models.DateTimeField(auto_now_add=True)
active = models.BooleanField(default=True, null=False, blank=False)
periodic = models.BooleanField(default=False, null=False, blank=False)
failed = models.BooleanField(default=False, null=False, blank=False)
last_run = models.DateTimeField(default=None, null=True, blank=True)
logs = models.TextField(null=True, blank=True)
data = models.JSONField(null=True, blank=True)
def __str__(self):
return f"{self.kind} {self.data}"
def run(self):
if not self.kind_id:
return
self.last_run = make_aware(datetime.now())
try:
res = self.kind.run(self)
if res and not self.periodic:
self.active = False
if isinstance(res, str):
if self.logs is None:
self.logs = ""
self.logs += res
self.failed = False
self.save(update_fields=["last_run", "failed", "active", "logs"])
except Exception as exc:
if self.logs is None:
self.logs = ""
self.logs += str(exc)
self.failed = True
self.save(update_fields=["last_run", "failed", "logs"])
return
Now you can run pending jobs with cron by making a Django management command:
# my_project/management/commands/run_jobs.py
from django.core.management.base import BaseCommand
from my_project.jobs import Job
class Command(BaseCommand):
help = "Run pending jobs"
def handle(self, *args, **kwargs):
for job in Job.objects.filter(active=True):
job.run()
You can also setup a thread that sleeps and periodically wakes up to run any pending tasks by overriding the ready
method on your django.apps.AppConfig
:
# my_project/apps.py
import threading
def ready(self):
import my_project.jobs
def sched_jobs():
from my_project.jobs import Job
import sched
import time
def exec_fn():
for job in Job.objects.filter(active=True, failed=False):
job.run()
s = sched.scheduler(time.time, time.sleep)
while True:
s.enter(15 * 60, 1, exec_fn)
s.run(blocking=True)
self.scheduling_thread = threading.Thread(target=sched_jobs, daemon=True)
self.scheduling_thread.name = "scheduling_thread"
self.scheduling_thread.start()
You can easily inspect jobs from the Django admin panel by registering the models to the admin app:
@admin.action(description="Run jobs")
def run_jobs(modeladmin, request, queryset):
for job in queryset.all():
job.run()
class JobAdmin(ModelAdmin):
def success(self, obj):
if obj.last_run is None:
return None
return not obj.failed
readonly_fields = (
"json_pprint",
)
@admin.display(description="JSON pretty print")
def json_pprint(self, instance):
import json
return mark_safe(
f"""<pre>{json.dumps(instance.data, sort_keys=True, indent=4)}</pre>"""
)
success.boolean = True
ordering = ["-created", "-last_run"]
actions = [run_jobs]
list_display = ["__str__", "created", "active", "periodic", "success", "last_run"]
list_filter = [
"kind",
"active",
"failed",
]
class JobKindAdmin(ModelAdmin):
def resolves(self, obj):
from django.utils.module_loading import import_string
try:
_ = import_string(obj.dotted_path)
return True
except ImportError:
return False
resolves.boolean = True
ordering = ["-created", "-last_modified"]
list_display = ["__str__", "created", "last_modified", "resolves"]
Now you can create new jobs from the admin panel and from code elsewhere in your app. Suppose you have an API endpoint to receive Webmentions. You can avoid blocking the HTTP response by scheduling the processing for later in the view:
from my_project.jobs import Job, JobKind
# schedule job
kind = JobKind.from_func(webmention_receive)
_job_obj, _ = Job.objects.get_or_create(
kind=kind, periodic=False, data={"source": source, "target": target}
)
Improvements
- Make a job to periodically delete/cleanup old jobs
- override
Job
’ssave
method to limit the maximum amount of jobs in the database
Real life example
This pattern is used in the sic.pm
link aggregator community: https://github.com/epilys/sic/blob/158284451097ab94da0efe5cbdfae14b0bb3a1a8/sic/jobs.py
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK