3

任务系统之并行任务

 2 years ago
source link: https://blog.ops-coffee.cn/s/0q31jzeybtewdrlmufdig
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

任务系统之并行任务

上篇文章讲了手动审批任务的实现,这篇文章来讲下并行任务,通常情况下一个任务流里的多个子任务都是采用串行执行的,所谓串行就是第一个子任务结束才开始第二个,第二个子任务结束才开始第三个,依次类推,直到所有的子任务执行完成,整个任务流也就算完成了

20220227.01.png

串行任务逻辑清晰,但执行完成整个任务流的时间会比较长,这在某些情况下是完全没有必要的,可以通过并行执行来节约时间,从而提高效率,所谓并行就是第一个子任务执行后可以同步执行第二个第三个子任务,第二第三个子任务都执行完才会进行下一个

20220227.02.png

如何实现并行任务呢?先来回顾下我们任务流是如何实现的,模板包含了若干个子任务,每个子任务会有一个排序

20220227.03.png

在执行任务时根据任务关联的模板拿到所有子任务,然后按照子任务的执行顺序循环依次执行,这就是串行任务的逻辑,简单的理解就是任务流的子任务存在一个有序的列表里,每次就按顺序去列表去取一个执行,代码实现就是上篇文章里的exec_subtask函数

def exec_subtask(tasklog, subtasklog=None):
    # 获取所有子任务
    subtasklogs = tasklog.get_subtasklogs

    if subtasklog:
        # 如果所传subtask为任务下的最后一个子任务,则结束整个任务
        if subtasklog == subtasklogs.last():
            subtasklog.tasklog.state = 1
            subtasklog.tasklog.save()

            return True, 'Finished'
        else:
            # 否则获取下一条要执行的subtask
            next_subtask = subtasklogs.filter(id__gt=subtasklog.id).first()
    else:
        # 如果没有传subtask则默认取第一条
        next_subtask = subtasklogs.first()

    # 将要执行的子任务传给run_subtask异步执行
    celery_task = run_subtask.delay(next_subtask.id)

    # 记录celery任务的ID,终止任务执行时使用
    next_subtask.celery_task_id = celery_task
    next_subtask.save()

    return True, 'Next'

而对于并行任务,只需要每次取多个同时执行就好了,想清楚了这个逻辑接下来就是定义每次取多个的规则了,继续从执行顺序下手,执行顺序不一样可以排序依次串行执行,而如果实行顺序一样了那不就自然可以理解为并行执行嘛,逻辑通了就改代码,代码上只需要多一层判断,判断下是否有与当前要执行的子任务相同执行顺序的子任务,如果有就一并执行,逻辑仍然在exec_subtask函数里

def exec_subtask(tasklog, subtasklog=None):
    # 获取所有子任务
    subtasklogs = tasklog.get_subtasklogs

    if not subtasklog:
        # 如果没有传subtask则默认取第一组
        next_subtasks = [i for i in subtasklogs.filter(sortnum=subtasklogs.first().sortnum)]
    else:
        # 判断当前任务组任务是否都已执行完成,若有未执行完成的任务则直接返回,等待所有任务执行完成
        if subtasklogs.filter(sortnum=subtasklog.sortnum, state__in=[6, 7, 9]):
            return True, 'Pass'
        else:
            # 若当前任务组所有子任务都已执行完成,则判断当前任务组是否有任务执行失败或被中止或被拒绝,若有则直接修改任务状态未失败
            _finished_state = subtasklogs.filter(sortnum=subtasklog.sortnum).values_list('state', flat=True)

            # 若并行任务有执行失败或被拒绝的,则直接修改整个任务为失败并返回
            if list(set([0, 2, 12]).intersection(set(_finished_state))):
                subtasklog.tasklog.state = 0
                subtasklog.tasklog.save()

                return True, 'Finished'

        # 当前任务组任务都执行成功,则获取当前任务还未执行子任务列表
        _not_run_subtasks = subtasklogs.filter(sortnum__gt=subtasklog.sortnum)

        # 如果不存在未执行的子任务则结束
        if not _not_run_subtasks:
            # 非周期任务修改任务状态为成功
            if subtasklog.tasklog.type != 3:
                subtasklog.tasklog.state = 1
                subtasklog.tasklog.save()

            return True, 'Finished'
        else:
            # 否则获取下一组要执行的subtask,当sortnum一样时并行执行
            next_subtasks = [i for i in subtasklogs.filter(sortnum=_not_run_subtasks.first().sortnum)]

    # 获取到下一组要执行的子任务后,循环异步执行子任务,当sortnum相同时并行执行
    for next_subtask in next_subtasks:
        # 然后将要执行的子任务传给run_subtask异步执行
        celery_task = run_subtask.delay(next_subtask.id)

        # 记录celery任务的ID,终止任务执行时使用
        next_subtask.celery_task_id = celery_task
        next_subtask.save()

    return True, 'Next'

想清楚了执行逻辑,对于代码修改并不算复杂,当在模板里把IOS更新和安卓更新两个子任务执行顺序改成一样时再执行就看到他们两个同时开始执行了

20220227.04.png


能看到这里一定是真爱,关注一下吧

wx.sou1.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK