This hands-on practice applies asynchronous operation techniques. Refactor synchronous code and implement new features using async/await, run_in_threadpool for blocking tasks, and background tasks. This exercise demonstrates how to improve the responsiveness and efficiency of your FastAPI ML application.We'll start with a basic, synchronous endpoint that simulates both a potentially slow I/O operation (like fetching feature definitions from an external source) and a CPU-intensive model prediction step.Initial Synchronous EndpointImagine you have an endpoint that first needs to fetch some configuration or metadata related to the request (simulated I/O) and then runs a model prediction (simulated CPU work).import time from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() # Simulate a blocking function (e.g., CPU-bound ML inference) def run_model_prediction(data_point: float) -> float: print("Starting model prediction...") # Simulate computation time time.sleep(1) result = data_point * 2 # Simple dummy operation print("Finished model prediction.") return result # Simulate a blocking I/O call (e.g., database query, external API call) def fetch_external_data(item_id: int) -> dict: print(f"Fetching external data for item {item_id}...") # Simulate I/O delay time.sleep(0.5) print("Finished fetching external data.") # Return some dummy data return {"item_id": item_id, "metadata": "some_fetched_value"} class PredictionRequest(BaseModel): item_id: int feature_value: float class PredictionResponse(BaseModel): item_id: int metadata: str prediction: float @app.post("/predict_sync", response_model=PredictionResponse) def predict_synchronously(request: PredictionRequest): print("Received prediction request.") # Step 1: Fetch external data (Blocking I/O) external_data = fetch_external_data(request.item_id) # Step 2: Run model prediction (Blocking CPU) prediction_result = run_model_prediction(request.feature_value) print("Sending response.") return PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result ) # To run this: uvicorn your_module_name:app --reload # Send a POST request to http://127.0.0.1:8000/predict_sync # Body: {"item_id": 123, "feature_value": 5.0}If you run this application and send multiple requests concurrently (e.g., using tools like ab or by opening multiple browser tabs quickly), you'll notice that the server processes them one after another. The time.sleep calls block the entire worker process, preventing it from handling other requests until the current one is fully completed.Converting to Async for I/O OperationsFastAPI shines when dealing with I/O-bound operations because async/await allows the server to switch contexts while waiting for I/O, handling other requests in the meantime. Let's modify the fetch_external_data function and the endpoint to be asynchronous. We'll use asyncio.sleep to simulate non-blocking I/O.import asyncio # Import asyncio import time from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() # Assume FastAPI instance exists # Simulate a non-blocking I/O call async def fetch_external_data_async(item_id: int) -> dict: # Note 'async def' print(f"Fetching external data async for item {item_id}...") # Simulate non-blocking I/O delay await asyncio.sleep(0.5) # Note 'await asyncio.sleep' print("Finished fetching external data async.") return {"item_id": item_id, "metadata": "some_fetched_value_async"} # Keep the blocking model prediction function as is for now def run_model_prediction(data_point: float) -> float: print("Starting model prediction...") time.sleep(1) # Still blocking result = data_point * 2 print("Finished model prediction.") return result class PredictionRequest(BaseModel): # Assume defined item_id: int feature_value: float class PredictionResponse(BaseModel): # Assume defined item_id: int metadata: str prediction: float @app.post("/predict_async_io", response_model=PredictionResponse) async def predict_with_async_io(request: PredictionRequest): # Note 'async def' print("Received async I/O prediction request.") # Step 1: Fetch external data (Non-blocking I/O) external_data = await fetch_external_data_async(request.item_id) # Note 'await' # Step 2: Run model prediction (Still Blocking CPU - problematic!) # This will still block the event loop when called directly! prediction_result = run_model_prediction(request.feature_value) print("Sending response.") return PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result )In this version, the fetch_external_data_async call is now non-blocking. While the await asyncio.sleep(0.5) happens, the FastAPI server (running on an ASGI server like Uvicorn) can handle other incoming requests or tasks. However, we still have a problem: run_model_prediction uses time.sleep(1), which is a blocking call. Even within an async def endpoint, calling a blocking function directly like this will halt the event loop, negating the benefits of async for concurrency during that specific phase.Handling Blocking CPU Operations with run_in_threadpoolTo properly handle the CPU-bound run_model_prediction within our asynchronous endpoint, we need to delegate it to a separate thread pool managed by FastAPI/Starlette. This prevents the main event loop from being blocked.import asyncio import time from fastapi import FastAPI from fastapi.concurrency import run_in_threadpool # Import run_in_threadpool from pydantic import BaseModel app = FastAPI() # Assume FastAPI instance exists # Non-blocking I/O simulation async def fetch_external_data_async(item_id: int) -> dict: print(f"Fetching external data async for item {item_id}...") await asyncio.sleep(0.5) print("Finished fetching external data async.") return {"item_id": item_id, "metadata": "some_fetched_value_async"} # Blocking CPU-bound simulation - unchanged def run_model_prediction(data_point: float) -> float: print("Starting model prediction (in thread pool)...") time.sleep(1) result = data_point * 2 print("Finished model prediction (in thread pool).") return result class PredictionRequest(BaseModel): # Assume defined item_id: int feature_value: float class PredictionResponse(BaseModel): # Assume defined item_id: int metadata: str prediction: float @app.post("/predict_full_async", response_model=PredictionResponse) async def predict_fully_asynchronous(request: PredictionRequest): # async def print("Received full async prediction request.") # Step 1: Fetch external data (Non-blocking I/O) external_data = await fetch_external_data_async(request.item_id) # await # Step 2: Run model prediction (Blocking CPU, run in thread pool) # Use run_in_threadpool to avoid blocking the event loop prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value) # await + run_in_threadpool print("Sending response.") return PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result )Now, when predict_fully_asynchronous is called:It awaits the non-blocking I/O (fetch_external_data_async). The event loop is free during this time.It then awaits run_in_threadpool(run_model_prediction, ...). This submits the run_model_prediction function to execute in a separate thread from the thread pool. The event loop is again free to handle other tasks while the model prediction runs in the background thread.Once the thread pool finishes executing run_model_prediction, the result is returned, and the execution of predict_fully_asynchronous resumes.This approach correctly uses asynchronous programming for I/O and safely integrates blocking CPU-bound code without stalling the server.Implementing Background TasksSuppose after returning the prediction, we want to log the request and result to a file or database without making the client wait. This is a perfect use case for background tasks.import asyncio import time from fastapi import FastAPI, BackgroundTasks # Import BackgroundTasks from fastapi.concurrency import run_in_threadpool from pydantic import BaseModel app = FastAPI() # Assume FastAPI instance exists # Functions from previous example (async I/O, blocking CPU) async def fetch_external_data_async(item_id: int) -> dict: # ... (as before) print(f"Fetching external data async for item {item_id}...") await asyncio.sleep(0.5) print("Finished fetching external data async.") return {"item_id": item_id, "metadata": "some_fetched_value_async"} def run_model_prediction(data_point: float) -> float: # ... (as before) print("Starting model prediction (in thread pool)...") time.sleep(1) result = data_point * 2 print("Finished model prediction (in thread pool).") return result # Function to be run in the background def log_prediction_details(request_data: dict, response_data: dict): # Simulate writing to a log file or database print("\n--- Background Task Started ---") print(f"Logging Request: {request_data}") print(f"Logging Response: {response_data}") # Simulate some work for the background task time.sleep(0.2) print("--- Background Task Finished ---\n") # In a real app, you'd write to a file, DB, or send to a logging service. class PredictionRequest(BaseModel): # Assume defined item_id: int feature_value: float class PredictionResponse(BaseModel): # Assume defined item_id: int metadata: str prediction: float @app.post("/predict_with_background", response_model=PredictionResponse) async def predict_with_background_task( request: PredictionRequest, background_tasks: BackgroundTasks # Inject BackgroundTasks ): print("Received prediction request with background task.") # Step 1: Fetch external data (Non-blocking I/O) external_data = await fetch_external_data_async(request.item_id) # Step 2: Run model prediction (Blocking CPU, run in thread pool) prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value) response = PredictionResponse( item_id=external_data["item_id"], metadata=external_data["metadata"], prediction=prediction_result ) # Add the logging task to run AFTER the response is sent background_tasks.add_task( log_prediction_details, request.dict(), # Pass data to the task response.dict() # Pass data to the task ) print("Sending response (background task pending).") return response # Response is sent hereIn this final example:We import BackgroundTasks from FastAPI.We add background_tasks: BackgroundTasks as a parameter to our endpoint function. FastAPI automatically injects an instance.We define a standard Python function log_prediction_details which contains the logic we want to run in the background. Note that this function itself can be blocking or async, but if it's blocking and long-running, it might still consume resources. For significant background work, dedicated task queues (like Celery) are often preferred.Before returning the response, we call background_tasks.add_task(), passing the function to run and its arguments.The endpoint immediately returns the response to the client.FastAPI ensures that log_prediction_details is executed after the response has been successfully sent. You will see its print statements appear in the server console after the "Sending response" message.This practice demonstrates how to combine async/await for I/O, run_in_threadpool for CPU-bound tasks, and BackgroundTasks for post-response processing, creating more performant and responsive FastAPI applications for serving machine learning models. Experiment with these patterns by running the server and sending requests to observe the timing and flow of execution.