While optimizing individual retriever and generator components can yield significant gains, achieving truly responsive and scalable RAG systems in production demands a look at how requests flow through the entire pipeline, especially under concurrent load. Synchronous, one-at-a-time processing can quickly become a bottleneck, leading to poor user experience and underutilized resources. This is where asynchronous processing and request batching come into play, transforming how your RAG system handles load and executes its core tasks.
In a synchronous RAG pipeline, if a user makes a request, the system typically blocks at each long-running step, such as querying the vector database or waiting for the LLM to generate a response. If ten users make requests simultaneously, the later users will experience considerable delays as earlier requests are processed sequentially.
Asynchronous processing changes this by allowing the system to handle multiple operations concurrently without waiting for each one to complete. When an asynchronous operation (like a network call to an LLM API or a query to a vector database) is initiated, the system doesn't halt. Instead, it can switch to work on other tasks, such as accepting new incoming requests or processing other stages of different requests. Once the asynchronous operation finishes, its result is made available, often via a callback, future, or an event loop mechanism, allowing the originating request flow to resume.
How it Works in RAG: Many components in a RAG pipeline are I/O-bound, meaning they spend a significant amount of time waiting for external operations (network, disk). These are prime candidates for asynchronous execution:
By employing async/await
patterns (common in languages like Python with asyncio
, JavaScript, or C#) or by using message queues with worker services, you can design your RAG API endpoints to be non-blocking.
Comparison of synchronous versus asynchronous request handling. Asynchronous processing allows the system to manage multiple requests concurrently, improving overall responsiveness and throughput by not blocking on I/O-bound operations.
Benefits of Asynchronous Processing:
While asynchronous processing helps manage concurrent requests and I/O waits, request batching focuses on improving the efficiency of computation-heavy parts of your RAG pipeline, particularly those involving neural models running on hardware accelerators like GPUs.
Operations like generating embeddings or running inference on large language models (LLMs) often have a significant per-call overhead. Processing items one by one can be inefficient, leaving the GPU underutilized. Batching groups multiple individual requests (e.g., multiple texts for embedding, or multiple prompts for an LLM) into a single "batch" that is then processed in one go.
Why Batching is Effective for RAG:
Typical relationship between batch size, throughput, and average request latency for an LLM inference task. Increasing batch size generally boosts throughput but can also increase latency for individual requests if they must wait for a batch to fill.
Dynamic Batching: The Sweet Spot Simply picking a fixed batch size (static batching) can be suboptimal. If traffic is low, requests might wait a long time to fill a large batch. If traffic is high, a small static batch size underutilizes the hardware.
Dynamic batching offers a more adaptive solution. Incoming requests are collected in a queue. A batch is formed and sent for processing when either:
MAX_BATCH_SIZE
).BATCH_TIMEOUT_SECONDS
) has passed since the first request in the current pending batch arrived.The timeout is important to ensure that requests don't wait indefinitely during periods of low traffic, balancing throughput and individual request latency. The total latency for a request becomes Ltotal=Lwait_for_batch+Lbatch_processing_time.
Flow of dynamic batching: requests are queued, and a batch is formed either when a maximum size is reached or a timeout occurs. This batch is then processed efficiently by the underlying model.
Asynchronous processing and request batching are not mutually exclusive; in fact, they work powerfully together. An ideal high-performance RAG system might look like this:
This combination allows the system to remain responsive to new users (thanks to async ingestion) while maximizing the throughput of its most computationally intensive parts (thanks to batching).
Implementing asynchronous and batching systems requires attention to detail:
MAX_BATCH_SIZE
and BATCH_TIMEOUT_SECONDS
are highly dependent on your specific models, hardware (GPU memory, compute capability), and expected traffic patterns. These usually require empirical tuning and continuous monitoring. Start with conservative values and adjust based on performance metrics.TextStreamer
or pipeline
batching can be useful for LLMs.Here's a simplified Python sketch using asyncio
to illustrate the core logic of a dynamic batcher:
import asyncio
import time
import random
# Configuration for the dynamic batcher
MAX_BATCH_SIZE_CONFIG = 8
BATCH_TIMEOUT_CONFIG_SEC = 0.1
# A global queue to hold incoming requests
# In a real app, this might be specific to a stage (e.g., embedding_request_queue)
global_request_queue = asyncio.Queue()
# Dictionary to store results for client retrieval (simplified)
# Key: request_id, Value: result or error
processed_request_results = {}
async def perform_batched_operation(batch_of_requests):
"""
Simulates a time-consuming batched operation, e.g., LLM inference.
Each item in batch_of_requests is assumed to be a dict like:
{'id': request_id, 'payload': data, 'future': asyncio.Future}
"""
request_ids = [req['id'] for req in batch_of_requests]
print(f"[{time.strftime('%X')}] Processing batch of size {len(batch_of_requests)}: {request_ids}")
# Simulate work proportional to batch size, plus some base latency
await asyncio.sleep(0.05 + 0.02 * len(batch_of_requests))
# Simulate individual results within the batch
for req_item in batch_of_requests:
if random.random() < 0.05: # Simulate an occasional error
result = {"error": "Simulated processing error"}
req_item['future'].set_exception(RuntimeError(f"Error processing {req_item['id']}"))
else:
result = f"Successfully processed: {req_item['payload']}"
req_item['future'].set_result(result)
# Store result for polling (alternative to only using future)
processed_request_results[req_item['id']] = result
print(f"[{time.strftime('%X')}] Batch {request_ids} processed.")
async def dynamic_batching_worker():
"""
Worker that pulls requests from the queue and processes them in dynamic batches.
"""
print(f"[{time.strftime('%X')}] Dynamic batching worker started (Max Size: {MAX_BATCH_SIZE_CONFIG}, Timeout: {BATCH_TIMEOUT_CONFIG_SEC}s).")
while True:
current_batch = []
first_item_received_at = None
try:
# Loop to accumulate items for a batch
while len(current_batch) < MAX_BATCH_SIZE_CONFIG:
if not current_batch:
# If batch is empty, wait indefinitely for the first item
item = await global_request_queue.get()
current_batch.append(item)
first_item_received_at = time.monotonic()
global_request_queue.task_done()
else:
# Batch has items, wait for more with a timeout
time_since_first_item = time.monotonic() - first_item_received_at
remaining_time_for_batch = BATCH_TIMEOUT_CONFIG_SEC - time_since_first_item
if remaining_time_for_batch <= 0:
# Timeout reached, process current batch
break
try:
item = await asyncio.wait_for(global_request_queue.get(), timeout=remaining_time_for_batch)
current_batch.append(item)
global_request_queue.task_done()
except asyncio.TimeoutError:
# Timeout waiting for the *next* item, process current batch
break
if current_batch:
await perform_batched_operation(current_batch)
# Reset for the next batch
current_batch = []
first_item_received_at = None
except asyncio.CancelledError:
print(f"[{time.strftime('%X')}] Batching worker received cancellation.")
# Optionally, process any remaining items in current_batch before exiting
if current_batch:
print(f"[{time.strftime('%X')}] Processing final batch of {len(current_batch)} items before shutdown.")
await perform_batched_operation(current_batch)
break # Exit the worker loop
except Exception as e:
# General error handling for the worker loop itself
print(f"[{time.strftime('%X')}] Critical error in batching_worker: {e}. Affected batch: {[req['id'] for req in current_batch]}")
# Mark all items in current batch as errored if not already handled by perform_batched_operation
for req_item in current_batch:
if not req_item['future'].done():
req_item['future'].set_exception(e)
current_batch = [] # Avoid reprocessing problematic batch
await asyncio.sleep(1) # Prevent rapid error looping
# To run this (example, not part of the final content block):
# async def client_request(request_id, data_payload):
# future = asyncio.Future()
# await global_request_queue.put({'id': request_id, 'payload': data_payload, 'future': future})
# print(f"[{time.strftime('%X')}] Client enqueued {request_id}.")
# try:
# result = await asyncio.wait_for(future, timeout=5.0) # Client waits for its specific result
# print(f"[{time.strftime('%X')}] Client received result for {request_id}: {result}")
# except asyncio.TimeoutError:
# print(f"[{time.strftime('%X')}] Client timed out waiting for {request_id}.")
# except Exception as e:
# print(f"[{time.strftime('%X')}] Client received error for {request_id}: {e}")
# async def main_example():
# worker = asyncio.create_task(dynamic_batching_worker())
# await asyncio.sleep(0.1) # Give worker time to start
# # Simulate a burst of client requests
# client_tasks = [client_request(f"req_{i}", f"data_for_req_{i}") for i in range(15)]
# await asyncio.gather(*client_tasks)
# await asyncio.sleep(2) # Allow any final batches to process
# worker.cancel()
# await worker
# if __name__ == "__main__":
# asyncio.run(main_example())
This Python snippet illustrates the core logic: items are added to a queue, and the dynamic_batching_worker
collects them until either the MAX_BATCH_SIZE_CONFIG
is met or BATCH_TIMEOUT_CONFIG_SEC
elapses since the first item for the current batch was received. The asyncio.Future
objects are used here to allow individual client requests to await their specific results even if processed as part of a batch.
By strategically implementing asynchronous processing and request batching, you can build RAG systems that are not only intelligent but also highly efficient and responsive, capable of handling substantial production workloads with grace. This is a significant step towards creating enterprise-grade AI applications.
Was this section helpful?
© 2025 ApX Machine Learning