趋近智
异步路由处理器(async def)在处理I/O密集型任务时性能优异,但若直接在其中运行CPU密集型操作,如机器学习模型推理,则会引发严重问题。Python的asyncio依赖于单线程事件循环来管理并发任务。如果async def路由中的某个函数执行长时间计算,且未将控制权交回(即,未对允许事件循环切换任务的操作使用await),它将实际冻结事件循环。在此期间,服务器无法响应其他传入请求,这与使用异步框架实现高并发的初衷相悖。
考虑一个典型的ML预测端点:
# 假设 'model' 是一个已加载的ML模型(例如 scikit-learn)
# 假设 'preprocess_input' 和 'format_output' 存在
# 有问题的方法:阻塞事件循环
@app.post("/predict_blocking")
async def predict_blocking(data: InputData): # InputData is a Pydantic model
processed_data = preprocess_input(data)
# 如果 model.predict 是CPU密集型操作,这行会阻塞事件循环
prediction = model.predict(processed_data)
results = format_output(prediction)
return {"prediction": results}
在这个例子中,如果model.predict()的运行需要数百毫秒甚至数秒(这对于复杂模型或大型输入来说很常见),那么整个FastAPI应用程序在此期间将无法响应。
FastAPI提供了一种简洁的方法来处理这种情况,它允许在单独的线程池中运行阻塞的CPU密集型代码。这样,主事件循环可以保持非阻塞,在繁重计算在另一个线程中进行时,继续处理其他请求。
这里的实用工具是run_in_threadpool,这是Starlette(FastAPI所使用的底层ASGI工具包)提供的一个函数,在FastAPI中可以直接使用。你await这个函数,并将你希望执行的阻塞函数及其参数传递给它。
以下是正确重构之前示例的方法:
from fastapi.concurrency import run_in_threadpool
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# 假设 'model' 已加载,且 'preprocess_input' 和 'format_output' 存在
# 阻塞函数的示例占位符
def run_model_inference(processed_data):
# 模拟一个CPU密集型任务
import time
time.sleep(0.5) # 代表 model.predict() 的时间
# 实际情况中:prediction = model.predict(processed_data)
prediction = [1] # 占位符结果
return prediction
# 定义输入数据模型
class InputData(BaseModel):
feature1: float
feature2: float
# 正确的方法:使用 run_in_threadpool
@app.post("/predict_non_blocking")
async def predict_non_blocking(data: InputData):
# 如果预处理涉及I/O,通常可以是异步的,
# 但这里我们假设它是同步的CPU工作或快速的。
processed_data = preprocess_input(data) # 假设这返回所需的格式
# 将阻塞调用卸载到线程池
# 传递函数及其参数
prediction = await run_in_threadpool(run_model_inference, processed_data)
# 后处理
results = format_output(prediction)
return {"prediction": results}
# 完整性的虚拟实现
def preprocess_input(data: InputData): return [[data.feature1, data.feature2]]
def format_output(prediction): return prediction[0]
在predict_non_blocking中,调用await run_in_threadpool(run_model_inference, processed_data)会执行以下操作:
run_model_inference函数(包含阻塞的model.predict()调用)调度到由线程池管理的独立线程中执行。run_model_inference函数在其线程中完成,run_in_threadpool会获取结果。await操作完成,predict_non_blocking函数的执行会带着prediction结果继续进行。图示说明
run_in_threadpool如何在与直接调用对比时,阻止事件循环被阻塞。
run_in_threadpool在async def路由中使用run_in_threadpool的主要场景是处理CPU密集型同步代码,这类代码通常难以直接转换为异步(例如大多数标准机器学习库的推理调用)。
用于:
model.predict()、model.transform()。请勿用于:
httpx、用于数据库的asyncpg或databases),并直接对其进行await。将I/O操作包装在run_in_threadpool中会增加不必要的线程开销,并且无法发挥事件循环在I/O方面的效率。async def的函数。直接await一个async def函数是其标准运行方式。通过正确识别并使用run_in_threadpool卸载阻塞的CPU密集型操作,可以确保你的FastAPI应用程序保持响应性,并能高效处理并发请求,即使在执行计算量大的机器学习推理时也是如此。这是将同步机器学习工作流融入现代异步网络框架的一种标准模式。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造