7

FastAPI 异步后台任务阻塞其他请求如何处理?

 9 months ago
source link: https://liruilongs.github.io/2023/11/03/%E5%BE%85%E5%8F%91%E5%B8%83/FastAPI%20%E5%BC%82%E6%AD%A5%E5%90%8E%E5%8F%B0%E4%BB%BB%E5%8A%A1%E9%98%BB%E5%A1%9E%E5%85%B6%E4%BB%96%E8%AF%B7%E6%B1%82%E5%A6%82%E4%BD%95%E5%A4%84%E7%90%86%EF%BC%9F/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

FastAPI 异步后台任务阻塞其他请求如何处理?

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


  • 工作中遇到,有大佬做了解答,简单整理
  • 阻塞的主要原因是 网络IO 密集型CPU 密集型是两个不同的概念, ASGI 更多的是面向 网络/IO 密集型的非阻塞处理,不适用 CPU 密集型
  • 理解不足小伙伴帮忙指正

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


在使用 FastAPIweb 服务的时候, 使用 BackgroundTasks 执行CPU密集型或者 IO 密集型任务,会阻塞当前 web 服务的所有接口。

@app.get('/face_recognition_activated')
async def face_recognition_activated(level : int,background_tasks: BackgroundTasks,token: bool = Depends(get_current_token)):
"""
@Time : 2023/10/20 03:28:11
@Author : [email protected]
@Version : 1.0
@Desc : 开始进行人脸识别
"""
# 提取人脸库数据

background_tasks.add_task(face_recognition, data = {
"dir_name":"A205_20237test4",
"class_code": "A0205",
"real_real_id": [2747,2745,345435]
})
return {"status": 200,"message": "人脸识别开始了 🚀🚀🚀🚀🚀" }

对应的后台任务

async def face_recognition(c_data):
"""
@Time : 2023/10/20 05:09:23
@Author : [email protected]
@Version : 1.0
@Desc : 人脸识别
"""
logging.info("开始人脸数据库特征提取!")
....................
pbar_i = tqdm(
range(0, len(face_list)),
desc="特征载入内存:👀👀👀",
mininterval=0.001,
smoothing=0.0001,
colour='green',
)
for idx in pbar_i:
face_id = face_list[idx]
face_id = face_id.decode('utf-8')
face_score_key = class_code_account_period +"_face_score"
faces = pkl.rc.zrange(face_score_key + ":" + face_id, 0, -1)
meta_dates = []
for hallmark_id in faces:
face_recognition_key = class_code_account_period + "_face_recognition"
face_r = pkl.rc.hget(
face_recognition_key + ":" + face_id, hallmark_id)
face_path_key = class_code_account_period + ':Path'
f_path = pkl.rc.hget(face_path_key, face_id)

meta_date = {"hallmark_id": hallmark_id.decode('utf-8'),
"face_r": pickle.loads(face_r),
"f_path": f_path.decode('utf-8'),
"mass": 500,
}
#logging.info(meta_date)
meta_dates.append(meta_date)
checks.append({"face_id": face_id, "meta_dates": meta_dates})

logging.info("构建内存人脸库完成")
# 开始人脸识别
logging.info("开始人脸识别..")
r_p = RedisClient(1)
logging.info("人脸识别后台任务启动......") #
consumer_task = asyncio.create_task(AdafaceFaceIdentification.consumer_facial_feature_clustering(global_object,checks,r_p,class_code_account_period,c_data))
await asyncio.gather(consumer_task)

对于这种情况,这是因为 对应的 后台任务被定义为 async , 意味着 fastapi 会在 asyncio 事件循环中运行它。并且因为 对应后台任务的某一环节是同步的(即不等待某些 IO或者是网络请求,而是进行计算)只要它正在运行,它就会阻塞事件循环。

这有在涉及异步IO网络操作的情况下,asyncio 才不会阻塞,能够以非阻塞的方式运行,从而充分利用系统资源并提高应用程序的并发性能。

解决这个问题的几种方法:

  • 使用更多的工人(例如 uvicorn main:app --workers 4 )。这将允许最多 4 个 后台任务 并行。
  • 将任务重写为不是 async (即将其定义为 def task(data): … 等)。然后 starlette 将在单独的线程中运行它。
  • 使用 fastapi.concurrency.run_in_threadpool ,这也将在单独的线程中运行它。像这样:
 from fastapi.concurrency import run_in_threadpool
async def task(data):
otherdata = await db.fetch("some sql")
newdata = await run_in_threadpool(lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)
  • 或者直接使用 asynciosrun_in_executor (其中 run_in_threadpool 在后台使用):
 import asyncio
async def task(data):
otherdata = await db.fetch("some sql")
loop = asyncio.get_running_loop()
newdata = await loop.run_in_executor(None, lambda: somelongcomputation(data, otherdata))
await db.execute("some sql", newdata)

  • 自己生成一个单独的线程/进程。例如使用 concurrent.futures
  • 使用更重的东西,如芹菜。 (也在 此处 的 fastapi 文档中提到)。

博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)


https://segmentfault.com/q/1010000043296883

https://stackoverflow.com/questions/63169865/how-to-do-multiprocessing-in-fastapi/63171013#63171013


© 2018-2023 [email protected], All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK