Now that we've discussed various strategies for optimizing different parts of your RAG system, it's time to get practical. This section will guide you through profiling a sample RAG pipeline to identify latency bottlenecks and then apply targeted optimizations. We'll measure the impact of our changes, illustrating how a systematic approach can lead to significant performance gains. Remember, low latency is often a critical requirement for user-facing applications.
Our goal here is not just to show you what to optimize, but how to approach the problem of performance analysis in your own RAG systems.
Let's define a simple RAG pipeline consisting of:
We'll use Python for this exercise. First, ensure you have necessary libraries installed:
pip install sentence-transformers faiss-cpu numpy
Here's a basic structure for our pipeline. For simplicity, we'll use an in-memory FAISS index and mock the generator's processing time.
import time
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer, CrossEncoder
# 1. Initialize Models
print("Loading models...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
reranker_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
print("Models loaded.")
# 2. Prepare Sample Data and Vector Store
documents = [
"The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France.",
"Photosynthesis is a process used by plants and other organisms to convert light energy into chemical energy.",
The Amazon rainforest is the largest tropical rainforest, famed for its biodiversity.
"Quantum computing studies theoretical computation systems that make direct use of quantum-mechanical phenomena.",
"The Colosseum is an oval amphitheatre in the centre of the city of Rome, Italy, built of travertine limestone, tuff, and brick-faced concrete."
]
print("Embedding documents...")
doc_embeddings = embedding_model.encode(documents)
dimension = doc_embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(doc_embeddings)
print("FAISS index created.")
# 3. Define Pipeline Stages
def embed_query(query_text):
start_time = time.perf_counter()
query_vector = embedding_model.encode([query_text])
end_time = time.perf_counter()
print(f"Query Embedding Latency: {end_time - start_time:.4f}s")
return query_vector
def retrieve_documents(query_vector, top_k=3):
start_time = time.perf_counter()
distances, indices = index.search(query_vector, top_k)
retrieved_docs = [documents[i] for i in indices[0]]
end_time = time.perf_counter()
print(f"Retrieval Latency: {end_time - start_time:.4f}s")
return retrieved_docs, indices[0]
def rerank_documents(query_text, retrieved_docs):
if not retrieved_docs:
return []
start_time = time.perf_counter()
pairs = [[query_text, doc] for doc in retrieved_docs]
scores = reranker_model.predict(pairs)
# Sort documents by re-ranker score
reranked_docs_with_scores = sorted(zip(scores, retrieved_docs), key=lambda x: x[0], reverse=True)
reranked_docs = [doc for score, doc in reranked_docs_with_scores]
end_time = time.perf_counter()
print(f"Re-ranking Latency: {end_time - start_time:.4f}s")
return reranked_docs
def generate_answer(query_text, context_docs):
start_time = time.perf_counter()
# Simulate LLM generation latency
# In a real system, this involves formatting context and calling an LLM
prompt = f"Query: {query_text}\n\nContext:\n" + "\n".join(context_docs)
# print(f"Prompt length for LLM: {len(prompt)} characters")
time.sleep(0.5) # Simulate LLM processing time
generated_text = f"Based on the context, the answer related to '{query_text}' is synthesized here."
end_time = time.perf_counter()
print(f"Generation Latency: {end_time - start_time:.4f}s (simulated)")
return generated_text
# 4. End-to-End RAG Function
def full_rag_pipeline(query_text):
print(f"\nProcessing query: '{query_text}'")
total_start_time = time.perf_counter()
query_vector = embed_query(query_text)
retrieved_docs, _ = retrieve_documents(query_vector, top_k=3)
print(f"Retrieved: {retrieved_docs}")
reranked_docs = rerank_documents(query_text, retrieved_docs)
print(f"Re-ranked: {reranked_docs}")
# Use top N re-ranked documents for generation context
context_for_generation = reranked_docs[:2] if reranked_docs else []
answer = generate_answer(query_text, context_for_generation)
total_end_time = time.perf_counter()
print(f"Generated Answer: {answer}")
print(f"Total Pipeline Latency: {total_end_time - total_start_time:.4f}s")
return answer
# Run the pipeline
sample_query = "Tell me about ancient Rome"
_ = full_rag_pipeline(sample_query)
This script provides a basic RAG flow with print statements for timing each major step. Running this will give you an initial idea of where time is spent.
cProfile
and SnakeViz
While time.perf_counter()
is useful for coarse-grained timing, Python's built-in cProfile
module offers a more detailed breakdown of function call times. SnakeViz
can then visualize this profiling data, making it easier to spot bottlenecks.
Install SnakeViz:
pip install snakeviz
To profile our full_rag_pipeline
function, you can run your script with cProfile
:
python -m cProfile -o rag_profile.prof your_script_name.py
Replace your_script_name.py
with the name of your Python file. This command will execute your script and save the profiling data to rag_profile.prof
.
Then, visualize it with SnakeViz:
snakeviz rag_profile.prof
This will open a web browser interface. Look for functions that have a high "TotalTime" or "CumTime" (cumulative time, including sub-function calls). You'll likely see significant time spent in model inference (encode
for sentence-transformers, predict
for cross-encoders) and our simulated time.sleep
for generation.
From the initial run and cProfile
output, you might observe:
time.sleep(0.5)
) is typically the most time-consuming part of a RAG pipeline.Let's assume our profiling highlights the re-ranking step and the LLM generation as major contributors.
The re-ranker processes each query-document pair. If the initial retrieval brings back k
documents, we perform k
cross-encoder predictions. We can reduce this load by only re-ranking a smaller subset of the top initially retrieved documents, say top_n_rerank
where top_n_rerank < k
.
Let's modify the full_rag_pipeline
and retrieve_documents
functions slightly. We'll retrieve more documents initially (e.g., top_k_retrieve = 10
) but only re-rank a smaller number (e.g., top_n_rerank = 3
).
Update retrieve_documents
to accept top_k_retrieve
:
# ... (previous code) ...
def retrieve_documents(query_vector, top_k_retrieve=3): # Renamed top_k to top_k_retrieve
start_time = time.perf_counter()
distances, indices = index.search(query_vector, top_k_retrieve)
retrieved_docs = [documents[i] for i in indices[0]]
end_time = time.perf_counter()
print(f"Retrieval Latency ({top_k_retrieve} docs): {end_time - start_time:.4f}s")
return retrieved_docs, indices[0]
# ... (rest of the pipeline stages) ...
Now, modify full_rag_pipeline
to implement selective re-ranking:
# ... (previous functions: embed_query, retrieve_documents, rerank_documents, generate_answer)
def full_rag_pipeline_optimized_reranking(query_text):
print(f"\nProcessing query with optimized re-ranking: '{query_text}'")
total_start_time = time.perf_counter()
query_vector = embed_query(query_text)
# Retrieve more initially, e.g., top 5
initial_retrieval_count = 5
documents_to_consider, _ = retrieve_documents(query_vector, top_k_retrieve=initial_retrieval_count)
print(f"Initially Retrieved ({initial_retrieval_count}): {documents_to_consider[:3]}...") # Show first few
# Re-rank only the top, e.g., 3, of these
docs_for_reranking = documents_to_consider[:3]
reranked_docs = rerank_documents(query_text, docs_for_reranking)
print(f"Re-ranked (from {len(docs_for_reranking)} docs): {reranked_docs}")
# Use top N re-ranked documents for generation context
context_for_generation = reranked_docs[:2] if reranked_docs else []
answer = generate_answer(query_text, context_for_generation)
total_end_time = time.perf_counter()
print(f"Generated Answer: {answer}")
print(f"Total Pipeline Latency (Optimized Re-ranking): {total_end_time - total_start_time:.4f}s")
return answer
# Run the original and optimized pipelines to compare
sample_query = "Tell me about ancient Rome"
print("\n--- Running Baseline Pipeline ---")
_ = full_rag_pipeline(sample_query)
print("\n--- Running Pipeline with Optimized Re-ranking ---")
_ = full_rag_pipeline_optimized_reranking(sample_query)
After running this, compare the "Re-ranking Latency" and "Total Pipeline Latency" outputs. You should see a reduction in re-ranking time if it was a bottleneck proportional to the number of documents re-ranked. The trade-off is that potentially relevant documents ranked lower than top_n_rerank
by the initial retriever won't get a chance to be promoted by the re-ranker. This balance between performance and accuracy is common in RAG optimization.
If similar contexts are frequently generated for certain types of queries, caching the LLM's response can save significant time and cost. Here, we'll implement a simple in-memory cache for the generate_answer
function. For production, you'd use a more solution like Redis.
# ... (previous code, including model initializations and other pipeline stages) ...
llm_response_cache = {}
def generate_answer_with_cache(query_text, context_docs):
# Create a cache key from the query and context
# A more approach might involve hashing or normalizing the text
cache_key_list = [query_text] + sorted(context_docs) # Sort docs for consistent key
cache_key = "##".join(cache_key_list)
if cache_key in llm_response_cache:
start_time = time.perf_counter()
cached_answer = llm_response_cache[cache_key]
end_time = time.perf_counter()
print(f"Generation Latency (Cache Hit): {end_time - start_time:.4f}s (negligible)")
return cached_answer
# If not in cache, proceed with generation
start_time = time.perf_counter()
prompt = f"Query: {query_text}\n\nContext:\n" + "\n".join(context_docs)
time.sleep(0.5) # Simulate LLM processing time
generated_text = f"Based on the context, the answer related to '{query_text}' is synthesized here (freshly generated)."
end_time = time.perf_counter()
llm_response_cache[cache_key] = generated_text # Store in cache
print(f"Generation Latency (Cache Miss - Simulated): {end_time - start_time:.4f}s")
return generated_text
# Update the optimized pipeline to use the cached generator
def full_rag_pipeline_optimized_reranking_and_cache(query_text):
print(f"\nProcessing query with optimized re-ranking and cache: '{query_text}'")
total_start_time = time.perf_counter()
query_vector = embed_query(query_text)
initial_retrieval_count = 5
documents_to_consider, _ = retrieve_documents(query_vector, top_k_retrieve=initial_retrieval_count)
docs_for_reranking = documents_to_consider[:3]
reranked_docs = rerank_documents(query_text, docs_for_reranking)
context_for_generation = reranked_docs[:2] if reranked_docs else []
# Use the generator with caching
answer = generate_answer_with_cache(query_text, context_for_generation)
total_end_time = time.perf_counter()
print(f"Generated Answer: {answer}")
print(f"Total Pipeline Latency (Optimized Re-ranking & Cache): {total_end_time - total_start_time:.4f}s")
return answer
# Test the caching
sample_query = "Eiffel Tower information"
print("\n--- Running Pipeline with Optimized Re-ranking and Cache (1st time) ---")
_ = full_rag_pipeline_optimized_reranking_and_cache(sample_query)
print("\n--- Running Pipeline with Optimized Re-ranking and Cache (2nd time - should hit cache) ---")
_ = full_rag_pipeline_optimized_reranking_and_cache(sample_query)
When you run this, the first call to full_rag_pipeline_optimized_reranking_and_cache
for a given query will be a cache miss for the generation step. The second call with the exact same query and resulting context should be a cache hit, and you'll see a dramatically reduced "Generation Latency" and "Total Pipeline Latency".
After applying optimizations, it's useful to visualize the impact. Let's say our initial timings were:
After selective re-ranking (e.g., retrieving 5, re-ranking 2):
With caching (on a second identical request):
We can represent this with a chart:
Latency breakdown for different RAG pipeline configurations. Baseline shows initial timings. "Selective Re-rank" reduces re-ranking latency. "Optimized + Cache (Hit)" demonstrates significant generation latency reduction due to caching.
This hands-on exercise touched upon a few important areas. In a real-world scenario, you would also consider:
nvidia-smi
can monitor GPU usage. PyTorch Profiler or TensorFlow Profiler can give insights into GPU kernel execution times.asyncio
.Profiling is an iterative process. Optimize one bottleneck, then re-profile to find the next. Always measure the impact of your changes on both latency and overall system quality (e.g., retrieval and generation accuracy). This practical approach to identifying and addressing performance issues is fundamental to building fast, responsive, and scalable RAG systems for production.
Was this section helpful?
© 2025 ApX Machine Learning