5

运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践

 2 years ago
source link: https://v3u.cn/a_id_225
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践

    我们一直都相信这样一种说法:协程是比多线程更高效的一种并发工作方式,它完全由程序本身所控制,也就是在用户态执行,协程避免了像线程切换那样产生的上下文切换,在性能方面得到了很大的提升。毫无疑问,这是颠扑不破的业界共识,是放之四海而皆准的真理。

    但事实上,协程远比大多数人想象中的复杂,正因为协程的“用户态”特性,任务调度权掌握在撰写协程任务的人手里,而仅仅依赖async和await关键字远远达不到“调度”的级别,有时候反而会拖累任务效率,使其在任务执行效率上还不及“系统态”的多线程和多进程,本次我们来探讨一下Python3原生协程任务的调度管理。

    Python3.10协程库async.io的基本操作

    事件循环(Eventloop)是 原生协程库asyncio 的核心,可以理解为总指挥。Eventloop实例提供了注册、取消和执行任务和回调的方法。

    Eventloop可以将一些异步方法绑定到事件循环上,事件循环会循环执行这些方法,但是和多线程一样,同时只能执行一个方法,因为协程也是单线程执行。当执行到某个方法时,如果它遇到了阻塞,事件循环会暂停它的执行去执行其他的方法,与此同时为这个方法注册一个回调事件,当某个方法从阻塞中恢复,下次轮询到它的时候将会继续执行,亦或者,当没有轮询到它,它提前从阻塞中恢复,也可以通过回调事件进行切换,如此往复,这就是事件循环的简单逻辑。

    而上面最核心的动作就是切换别的方法,怎么切换?用await关键字:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')


async def job2():
print('job2开始')


async def main():
await job1()
await job2()


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job1开始
job1结束
job2开始

    是的,切则切了,可切的对吗?事实上这两个协程任务并没有达成“协作”,因为它们是同步执行的,所以并不是在方法内await了,就可以达成协程的工作方式,我们需要并发启动这两个协程任务:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')


async def job2():
print('job2开始')


async def main():
#await job1()
#await job2()
await asyncio.gather(job1(), job2())


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job1开始
job2开始
job1结束

    如果没有asyncio.gather的参与,协程方法就是普通的同步方法,就算用async声明了异步也无济于事。而asyncio.gather的基础功能就是将协程任务并发执行,从而达成“协作”。

    但事实上,Python3.10也支持“同步写法”的协程方法:

async def create_task():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
await task1
await task2

    这里我们通过asyncio.create_task对job1和job2进行封装,返回的对象再通过await进行调用,由此两个单独的异步方法就都被绑定到同一个Eventloop了,这样虽然写法上同步,但其实是异步执行:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')


async def job2():
print('job2开始')


async def create_task():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
await task1
await task2


async def main():
#await job1()
#await job2()
await asyncio.gather(job1(), job2())


if __name__ == '__main__':
asyncio.run(create_task())

    系统返回:

job1开始
job2开始
job1结束

    协程任务的上下游监控

    解决了并发执行的问题,现在假设每个异步任务都会返回一个操作结果:

async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"

    通过asyncio.gather方法,我们可以收集到任务执行结果:

async def main():

res = await asyncio.gather(job1(), job2())
print(res)

    并发执行任务:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"

async def main():

res = await asyncio.gather(job1(), job2())
print(res)


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job1开始
job2开始
job1结束
['job1', 'job2']

    但任务结果仅仅也就是方法的返回值,除此之外,并没有其他有价值的信息,对协程任务的执行明细讳莫如深。

    现在我们换成asyncio.wait方法:

async def main():

res = await asyncio.wait([job1(), job2()])
print(res)

    依然并发执行:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"

async def main():

res = await asyncio.wait([job1(), job2()])
print(res)


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job1开始
job2开始
job1结束
({<Task finished name='Task-2' coro=<job1() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:4> result='job1任务结果'>, <Task finished name='Task-3' coro=<job2() done, defined at /Users/liuyue/Downloads/upload/test/test_async.py:12> result='job2任务结果'>}, set())

    可以看出,asyncio.wait返回的是任务对象,里面存储了大部分的任务信息,包括执行状态。

    在默认情况下,asyncio.wait会等待全部任务完成 (return_when='ALL_COMPLETED'),它还支持 return_when='FIRST_COMPLETED'(第一个协程完成就返回)和 return_when='FIRST_EXCEPTION'(出现第一个异常就返回)。

    这就非常令人兴奋了,因为如果异步消费任务是发短信之类的需要统计达到率的任务,利用asyncio.wait特性,我们就可以第一时间记录任务完成或者异常的具体时间。

    协程任务守护

    假设由于某种原因,我们手动终止任务消费:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"

async def main():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
task1.cancel()
res = await asyncio.gather(task1, task2)
print(res)


if __name__ == '__main__':
asyncio.run(main())

     系统报错:

File "/Users/liuyue/Downloads/upload/test/test_async.py", line 23, in main
res = await asyncio.gather(task1, task2)
asyncio.exceptions.CancelledError

    这里job1被手动取消,但会影响job2的执行,这违背了协程“互相提携”的特性。

    事实上,asyncio.gather方法可以捕获协程任务的异常:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"

async def main():
task1 = asyncio.create_task(job1())
task2 = asyncio.create_task(job2())
task1.cancel()
res = await asyncio.gather(task1, task2,return_exceptions=True)
print(res)


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job2开始
[CancelledError(''), 'job2任务结果']

    可以看到job1没有被执行,并且异常替代了任务结果作为返回值。

    但如果协程任务启动之后,需要保证任务情况下都不会被取消,此时可以使用asyncio.shield方法守护协程任务:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"

async def main():
task1 = asyncio.shield(job1())
task2 = asyncio.create_task(job2())

res = await asyncio.gather(task1, task2,return_exceptions=True)

task1.cancel()
print(res)


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job1开始
job2开始
job1结束
['job1任务结果', 'job2任务结果']

    协程任务回调

    假设协程任务执行完毕之后,需要立刻进行回调操作,比如将任务结果推送到其他接口服务上:

import asyncio


async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"


def callback(future):
print(f'回调任务: {future.result()}')

async def main():
task1 = asyncio.shield(job1())
task2 = asyncio.create_task(job2())

task1.add_done_callback(callback)

res = await asyncio.gather(task1, task2,return_exceptions=True)

print(res)


if __name__ == '__main__':
asyncio.run(main())

    这里我们通过add_done_callback方法对job1指定了callback方法,当任务执行完以后,callback会被调用,系统返回:

job1开始
job2开始
job1结束
回调任务: job1任务结果
['job1任务结果', 'job2任务结果']

    与此同时,add_done_callback方法不仅可以获取协程任务返回值,它自己也支持参数参数传递:

import asyncio
from functools import partial

async def job1():
print('job1开始')
await asyncio.sleep(1)
print('job1结束')

return "job1任务结果"


async def job2():
print('job2开始')

return "job2任务结果"


def callback(future,num):
print(f"回调参数{num}")
print(f'回调任务: {future.result()}')

async def main():
task1 = asyncio.shield(job1())
task2 = asyncio.create_task(job2())

task1.add_done_callback(partial(callback,num=1))

res = await asyncio.gather(task1, task2,return_exceptions=True)

print(res)


if __name__ == '__main__':
asyncio.run(main())

    系统返回:

job1开始
job2开始
job1结束
回调参数1
回调任务: job1任务结果
['job1任务结果', 'job2任务结果']

    成也用户态,败也用户态。所谓水能载舟亦能覆舟,协程消费任务的调度远比多线程的系统级调度要复杂,稍不留神就会造成业务上的“同步”阻塞,弄巧成拙,适得其反。这也解释了为什么相似场景中多线程的出场率要远远高于协程,就是因为多线程不需要考虑启动后的“切换”问题,无为而为,简单粗暴。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK