趋近智
这项动手实践应用异步操作技术。重构同步代码并使用 async/await、用于阻塞任务的 run_in_threadpool 以及后台任务来实现新功能。通过此练习,学习如何提升 FastAPI 机器学习应用的响应能力和效率。
我们将从一个基本的同步端点开始,它模拟一个潜在的慢速 I/O 操作(例如从外部源获取特征定义)和一个 CPU 密集型模型预测步骤。
想象您有一个端点,它首先需要获取与请求相关的某些配置或元数据(模拟 I/O),然后运行模型预测(模拟 CPU 工作)。
import time
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# 模拟一个阻塞函数(例如,CPU密集型机器学习推理)
def run_model_prediction(data_point: float) -> float:
print("Starting model prediction...")
# 模拟计算时间
time.sleep(1)
result = data_point * 2 # Simple dummy operation
print("Finished model prediction.")
return result
# 模拟一个阻塞I/O调用(例如,数据库查询,外部API调用)
def fetch_external_data(item_id: int) -> dict:
print(f"Fetching external data for item {item_id}...")
# 模拟I/O延迟
time.sleep(0.5)
print("Finished fetching external data.")
# 返回一些模拟数据
return {"item_id": item_id, "metadata": "some_fetched_value"}
class PredictionRequest(BaseModel):
item_id: int
feature_value: float
class PredictionResponse(BaseModel):
item_id: int
metadata: str
prediction: float
@app.post("/predict_sync", response_model=PredictionResponse)
def predict_synchronously(request: PredictionRequest):
print("Received prediction request.")
# 步骤 1: 获取外部数据(阻塞I/O)
external_data = fetch_external_data(request.item_id)
# 步骤 2: 运行模型预测(阻塞CPU)
prediction_result = run_model_prediction(request.feature_value)
print("Sending response.")
return PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
# 运行此代码:uvicorn 你的模块名:app --reload
# 向 http://127.0.0.1:8000/predict_sync 发送 POST 请求
# 请求体: {"item_id": 123, "feature_value": 5.0}
如果您运行此应用程序并同时发送多个请求(例如,使用 ab 等工具或快速打开多个浏览器选项卡),您会注意到服务器会一个接一个地处理它们。time.sleep 调用会阻塞整个工作进程,阻止它处理其他请求,直到当前请求完全完成。
FastAPI 在处理 I/O 密集型操作时表现出色,因为 async/await 允许服务器在等待 I/O 时切换上下文,同时处理其他请求。让我们修改 fetch_external_data 函数和端点,使其变为异步。我们将使用 asyncio.sleep 来模拟非阻塞 I/O。
import asyncio # 导入 asyncio
import time
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI() # 假设 FastAPI 实例已存在
# 模拟一个非阻塞I/O调用
async def fetch_external_data_async(item_id: int) -> dict: # 注意 'async def'
print(f"Fetching external data async for item {item_id}...")
# 模拟非阻塞I/O延迟
await asyncio.sleep(0.5) # 注意 'await asyncio.sleep'
print("Finished fetching external data async.")
return {"item_id": item_id, "metadata": "some_fetched_value_async"}
# 暂时保持阻塞模型预测函数不变
def run_model_prediction(data_point: float) -> float:
print("Starting model prediction...")
time.sleep(1) # 仍然是阻塞的
result = data_point * 2
print("Finished model prediction.")
return result
class PredictionRequest(BaseModel): # 假设已定义
item_id: int
feature_value: float
class PredictionResponse(BaseModel): # 假设已定义
item_id: int
metadata: str
prediction: float
@app.post("/predict_async_io", response_model=PredictionResponse)
async def predict_with_async_io(request: PredictionRequest): # 注意 'async def'
print("Received async I/O prediction request.")
# 步骤 1: 获取外部数据(非阻塞I/O)
external_data = await fetch_external_data_async(request.item_id) # 注意 'await'
# 步骤 2: 运行模型预测(仍然是阻塞CPU - 有问题!)
# 直接调用此函数仍会阻塞事件循环!
prediction_result = run_model_prediction(request.feature_value)
print("Sending response.")
return PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
在此版本中,fetch_external_data_async 调用现在是非阻塞的。当 await asyncio.sleep(0.5) 发生时,FastAPI 服务器(运行在 Uvicorn 这样的 ASGI 服务器上)可以处理其他传入的请求或任务。然而,我们仍然存在一个问题:run_model_prediction 使用 time.sleep(1),这是一个阻塞调用。即使在 async def 端点内部,像这样直接调用阻塞函数也会停止事件循环,从而抵消异步在特定阶段实现并发的优势。
run_in_threadpool 处理阻塞CPU操作为了正确处理异步端点中的 CPU 密集型 run_model_prediction,我们需要将其委托给 FastAPI/Starlette 管理的独立线程池。这可以防止主事件循环被阻塞。
import asyncio
import time
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool # 导入 run_in_threadpool
from pydantic import BaseModel
app = FastAPI() # 假设 FastAPI 实例已存在
# 非阻塞I/O模拟
async def fetch_external_data_async(item_id: int) -> dict:
print(f"Fetching external data async for item {item_id}...")
await asyncio.sleep(0.5)
print("Finished fetching external data async.")
return {"item_id": item_id, "metadata": "some_fetched_value_async"}
# 阻塞CPU密集型模拟 - 未改变
def run_model_prediction(data_point: float) -> float:
print("Starting model prediction (in thread pool)...")
time.sleep(1)
result = data_point * 2
print("Finished model prediction (in thread pool).")
return result
class PredictionRequest(BaseModel): # 假设已定义
item_id: int
feature_value: float
class PredictionResponse(BaseModel): # 假设已定义
item_id: int
metadata: str
prediction: float
@app.post("/predict_full_async", response_model=PredictionResponse)
async def predict_fully_asynchronous(request: PredictionRequest): # async def
print("Received full async prediction request.")
# 步骤 1: 获取外部数据(非阻塞I/O)
external_data = await fetch_external_data_async(request.item_id) # await
# 步骤 2: 运行模型预测(阻塞CPU,在线程池中运行)
# 使用 run_in_threadpool 避免阻塞事件循环
prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value) # await + run_in_threadpool
print("Sending response.")
return PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
现在,当 predict_fully_asynchronous 被调用时:
await 非阻塞 I/O (fetch_external_data_async)。在此期间,事件循环是空闲的。await run_in_threadpool(run_model_prediction, ...)。这会将 run_model_prediction 函数提交到线程池中的一个单独线程执行。当模型预测在后台线程运行时,事件循环再次空闲,可以处理其他任务。run_model_prediction 的执行,结果就会返回,并且 predict_fully_asynchronous 的执行恢复。这种方法正确地将异步编程用于 I/O,并安全地集成了阻塞的 CPU 密集型代码,而不会使服务器停滞。
假设在返回预测结果后,我们希望将请求和结果记录到文件或数据库中,而无需让客户端等待。这是后台任务的一个理想应用场景。
import asyncio
import time
from fastapi import FastAPI, BackgroundTasks # 导入 BackgroundTasks
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel
app = FastAPI() # 假设 FastAPI 实例已存在
# 前面示例中的函数(异步 I/O,阻塞 CPU)
async def fetch_external_data_async(item_id: int) -> dict: # ...(同前)
print(f"Fetching external data async for item {item_id}...")
await asyncio.sleep(0.5)
print("Finished fetching external data async.")
return {"item_id": item_id, "metadata": "some_fetched_value_async"}
def run_model_prediction(data_point: float) -> float: # ...(同前)
print("Starting model prediction (in thread pool)...")
time.sleep(1)
result = data_point * 2
print("Finished model prediction (in thread pool).")
return result
# 在后台运行的函数
def log_prediction_details(request_data: dict, response_data: dict):
# 模拟写入日志文件或数据库
print("\n--- Background Task Started ---")
print(f"Logging Request: {request_data}")
print(f"Logging Response: {response_data}")
# 模拟后台任务的一些工作
time.sleep(0.2)
print("--- Background Task Finished ---\n")
# 在实际应用中,您会将数据写入文件、数据库或发送到日志服务。
class PredictionRequest(BaseModel): # 假设已定义
item_id: int
feature_value: float
class PredictionResponse(BaseModel): # 假设已定义
item_id: int
metadata: str
prediction: float
@app.post("/predict_with_background", response_model=PredictionResponse)
async def predict_with_background_task(
request: PredictionRequest,
background_tasks: BackgroundTasks # 注入 BackgroundTasks
):
print("Received prediction request with background task.")
# 步骤 1: 获取外部数据(非阻塞I/O)
external_data = await fetch_external_data_async(request.item_id)
# 步骤 2: 运行模型预测(阻塞CPU,在线程池中运行)
prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value)
response = PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
# 添加日志任务,使其在响应发送后运行
background_tasks.add_task(
log_prediction_details,
request.dict(), # 将数据传递给任务
response.dict() # 将数据传递给任务
)
print("Sending response (background task pending).")
return response # 响应在此处发送
在这个最终示例中:
BackgroundTasks。background_tasks: BackgroundTasks 添加为端点函数的参数。FastAPI 会自动注入一个实例。log_prediction_details,其中包含我们希望在后台运行的逻辑。请注意,这个函数本身可以是阻塞的或异步的,但如果它是阻塞且长时间运行的,它可能仍然会消耗资源。对于重要的后台工作,通常更推荐使用专用的任务队列(如 Celery)。background_tasks.add_task(),传入要运行的函数及其参数。response 返回给客户端。log_prediction_details 在响应成功发送后执行。您会看到其打印语句在“Sending response”消息之后出现在服务器控制台中。这种实践说明了如何结合 async/await 处理 I/O、run_in_threadpool 处理 CPU 密集型任务以及 BackgroundTasks 进行响应后处理,从而创建更高性能、响应更快的 FastAPI 应用程序来提供机器学习模型服务。通过运行服务器并发送请求来体验这些模式,以观察执行的时间和流程。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造