这项动手实践应用异步操作技术。重构同步代码并使用 async/await、用于阻塞任务的 run_in_threadpool 以及后台任务来实现新功能。通过此练习,学习如何提升 FastAPI 机器学习应用的响应能力和效率。我们将从一个基本的同步端点开始,它模拟一个潜在的慢速 I/O 操作(例如从外部源获取特征定义)和一个 CPU 密集型模型预测步骤。初始同步端点想象您有一个端点,它首先需要获取与请求相关的某些配置或元数据(模拟 I/O),然后运行模型预测(模拟 CPU 工作)。import time from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() # 模拟一个阻塞函数(例如,CPU密集型机器学习推理) def run_model_prediction(data_point: float) -> float: print("Starting model prediction...") # 模拟计算时间 time.sleep(1) result = data_point * 2 # Simple dummy operation print("Finished model prediction.") return result # 模拟一个阻塞I/O调用(例如,数据库查询,外部API调用) def fetch_external_data(item_id: int) -> dict: print(f"Fetching external data for item {item_id}...") # 模拟I/O延迟 time.sleep(0.5) print("Finished fetching external data.") # 返回一些模拟数据 return {"item_id": item_id, "metadata": "some_fetched_value"} class PredictionRequest(BaseModel): item_id: int feature_value: float class PredictionResponse(BaseModel): item_id: int metadata: str prediction: float @app.post("/predict_sync", response_model=PredictionResponse) def predict_synchronously(request: PredictionRequest): print("Received prediction request.") # 步骤 1: 获取外部数据(阻塞I/O) external_data = fetch_external_data(request.item_id) # 步骤 2: 运行模型预测(阻塞CPU) prediction_result = run_model_prediction(request.feature_value) print("Sending response.") return PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result ) # 运行此代码:uvicorn 你的模块名:app --reload # 向 http://127.0.0.1:8000/predict_sync 发送 POST 请求 # 请求体: {"item_id": 123, "feature_value": 5.0}如果您运行此应用程序并同时发送多个请求(例如,使用 ab 等工具或快速打开多个浏览器选项卡),您会注意到服务器会一个接一个地处理它们。time.sleep 调用会阻塞整个工作进程,阻止它处理其他请求,直到当前请求完全完成。将I/O操作转换为异步FastAPI 在处理 I/O 密集型操作时表现出色,因为 async/await 允许服务器在等待 I/O 时切换上下文,同时处理其他请求。让我们修改 fetch_external_data 函数和端点,使其变为异步。我们将使用 asyncio.sleep 来模拟非阻塞 I/O。import asyncio # 导入 asyncio import time from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() # 假设 FastAPI 实例已存在 # 模拟一个非阻塞I/O调用 async def fetch_external_data_async(item_id: int) -> dict: # 注意 'async def' print(f"Fetching external data async for item {item_id}...") # 模拟非阻塞I/O延迟 await asyncio.sleep(0.5) # 注意 'await asyncio.sleep' print("Finished fetching external data async.") return {"item_id": item_id, "metadata": "some_fetched_value_async"} # 暂时保持阻塞模型预测函数不变 def run_model_prediction(data_point: float) -> float: print("Starting model prediction...") time.sleep(1) # 仍然是阻塞的 result = data_point * 2 print("Finished model prediction.") return result class PredictionRequest(BaseModel): # 假设已定义 item_id: int feature_value: float class PredictionResponse(BaseModel): # 假设已定义 item_id: int metadata: str prediction: float @app.post("/predict_async_io", response_model=PredictionResponse) async def predict_with_async_io(request: PredictionRequest): # 注意 'async def' print("Received async I/O prediction request.") # 步骤 1: 获取外部数据(非阻塞I/O) external_data = await fetch_external_data_async(request.item_id) # 注意 'await' # 步骤 2: 运行模型预测(仍然是阻塞CPU - 有问题!) # 直接调用此函数仍会阻塞事件循环! prediction_result = run_model_prediction(request.feature_value) print("Sending response.") return PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result )在此版本中,fetch_external_data_async 调用现在是非阻塞的。当 await asyncio.sleep(0.5) 发生时,FastAPI 服务器(运行在 Uvicorn 这样的 ASGI 服务器上)可以处理其他传入的请求或任务。然而,我们仍然存在一个问题:run_model_prediction 使用 time.sleep(1),这是一个阻塞调用。即使在 async def 端点内部,像这样直接调用阻塞函数也会停止事件循环,从而抵消异步在特定阶段实现并发的优势。使用 run_in_threadpool 处理阻塞CPU操作为了正确处理异步端点中的 CPU 密集型 run_model_prediction,我们需要将其委托给 FastAPI/Starlette 管理的独立线程池。这可以防止主事件循环被阻塞。import asyncio import time from fastapi import FastAPI from fastapi.concurrency import run_in_threadpool # 导入 run_in_threadpool from pydantic import BaseModel app = FastAPI() # 假设 FastAPI 实例已存在 # 非阻塞I/O模拟 async def fetch_external_data_async(item_id: int) -> dict: print(f"Fetching external data async for item {item_id}...") await asyncio.sleep(0.5) print("Finished fetching external data async.") return {"item_id": item_id, "metadata": "some_fetched_value_async"} # 阻塞CPU密集型模拟 - 未改变 def run_model_prediction(data_point: float) -> float: print("Starting model prediction (in thread pool)...") time.sleep(1) result = data_point * 2 print("Finished model prediction (in thread pool).") return result class PredictionRequest(BaseModel): # 假设已定义 item_id: int feature_value: float class PredictionResponse(BaseModel): # 假设已定义 item_id: int metadata: str prediction: float @app.post("/predict_full_async", response_model=PredictionResponse) async def predict_fully_asynchronous(request: PredictionRequest): # async def print("Received full async prediction request.") # 步骤 1: 获取外部数据(非阻塞I/O) external_data = await fetch_external_data_async(request.item_id) # await # 步骤 2: 运行模型预测(阻塞CPU,在线程池中运行) # 使用 run_in_threadpool 避免阻塞事件循环 prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value) # await + run_in_threadpool print("Sending response.") return PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result )现在,当 predict_fully_asynchronous 被调用时:它 await 非阻塞 I/O (fetch_external_data_async)。在此期间,事件循环是空闲的。然后它 await run_in_threadpool(run_model_prediction, ...)。这会将 run_model_prediction 函数提交到线程池中的一个单独线程执行。当模型预测在后台线程运行时,事件循环再次空闲,可以处理其他任务。一旦线程池完成 run_model_prediction 的执行,结果就会返回,并且 predict_fully_asynchronous 的执行恢复。这种方法正确地将异步编程用于 I/O,并安全地集成了阻塞的 CPU 密集型代码,而不会使服务器停滞。实现后台任务假设在返回预测结果后,我们希望将请求和结果记录到文件或数据库中,而无需让客户端等待。这是后台任务的一个理想应用场景。import asyncio import time from fastapi import FastAPI, BackgroundTasks # 导入 BackgroundTasks from fastapi.concurrency import run_in_threadpool from pydantic import BaseModel app = FastAPI() # 假设 FastAPI 实例已存在 # 前面示例中的函数(异步 I/O,阻塞 CPU) async def fetch_external_data_async(item_id: int) -> dict: # ...(同前) print(f"Fetching external data async for item {item_id}...") await asyncio.sleep(0.5) print("Finished fetching external data async.") return {"item_id": item_id, "metadata": "some_fetched_value_async"} def run_model_prediction(data_point: float) -> float: # ...(同前) print("Starting model prediction (in thread pool)...") time.sleep(1) result = data_point * 2 print("Finished model prediction (in thread pool).") return result # 在后台运行的函数 def log_prediction_details(request_data: dict, response_data: dict): # 模拟写入日志文件或数据库 print("\n--- Background Task Started ---") print(f"Logging Request: {request_data}") print(f"Logging Response: {response_data}") # 模拟后台任务的一些工作 time.sleep(0.2) print("--- Background Task Finished ---\n") # 在实际应用中,您会将数据写入文件、数据库或发送到日志服务。 class PredictionRequest(BaseModel): # 假设已定义 item_id: int feature_value: float class PredictionResponse(BaseModel): # 假设已定义 item_id: int metadata: str prediction: float @app.post("/predict_with_background", response_model=PredictionResponse) async def predict_with_background_task( request: PredictionRequest, background_tasks: BackgroundTasks # 注入 BackgroundTasks ): print("Received prediction request with background task.") # 步骤 1: 获取外部数据(非阻塞I/O) external_data = await fetch_external_data_async(request.item_id) # 步骤 2: 运行模型预测(阻塞CPU,在线程池中运行) prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value) response = PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result ) # 添加日志任务,使其在响应发送后运行 background_tasks.add_task( log_prediction_details, request.dict(), # 将数据传递给任务 response.dict() # 将数据传递给任务 ) print("Sending response (background task pending).") return response # 响应在此处发送在这个最终示例中:我们从 FastAPI 导入 BackgroundTasks。我们将 background_tasks: BackgroundTasks 添加为端点函数的参数。FastAPI 会自动注入一个实例。我们定义了一个标准 Python 函数 log_prediction_details,其中包含我们希望在后台运行的逻辑。请注意,这个函数本身可以是阻塞的或异步的,但如果它是阻塞且长时间运行的,它可能仍然会消耗资源。对于重要的后台工作,通常更推荐使用专用的任务队列(如 Celery)。在返回响应之前,我们调用 background_tasks.add_task(),传入要运行的函数及其参数。端点立即将 response 返回给客户端。FastAPI 确保 log_prediction_details 在响应成功发送后执行。您会看到其打印语句在“Sending response”消息之后出现在服务器控制台中。这种实践说明了如何结合 async/await 处理 I/O、run_in_threadpool 处理 CPU 密集型任务以及 BackgroundTasks 进行响应后处理,从而创建更高性能、响应更快的 FastAPI 应用程序来提供机器学习模型服务。通过运行服务器并发送请求来体验这些模式,以观察执行的时间和流程。