趋近智
动手实践 RAG 系统优化,对示例 RAG 流水线进行性能分析。识别延迟瓶颈并应用有针对性的优化,并衡量这些改动的影响。系统化的方法可以带来显著的性能提升,因为低延迟通常是面向用户应用程序的重要的要求。
我们这里的目标不仅仅是向你展示 优化什么,更是教你 如何 在自己的 RAG 系统中进行性能分析。
我们来定义一个简单的 RAG 流水线,它包含:
本练习将使用 Python。首先,请确保你已安装必要的库:
pip install sentence-transformers faiss-cpu numpy
这是我们流水线的基本结构。为简单起见,我们将使用内存中的 FAISS 索引并模拟生成器的处理时间。
import time
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer, CrossEncoder
# 1. 初始化模型
print("正在加载模型...")
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
reranker_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
print("模型加载完成。")
# 2. 准备示例数据和向量存储
documents = [
"The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France.",
"Photosynthesis is a process used by plants and other organisms to convert light energy into chemical energy.",
"The Amazon rainforest is the largest tropical rainforest, famed for its biodiversity.",
"Quantum computing studies theoretical computation systems that make direct use of quantum-mechanical phenomena.",
"The Colosseum is an oval amphitheatre in the centre of the city of Rome, Italy, built of travertine limestone, tuff, and brick-faced concrete."
]
print("正在嵌入文档...")
doc_embeddings = embedding_model.encode(documents)
dimension = doc_embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(doc_embeddings)
print("FAISS 索引已创建。")
# 3. 定义流水线阶段
def embed_query(query_text):
start_time = time.perf_counter()
query_vector = embedding_model.encode([query_text])
end_time = time.perf_counter()
print(f"查询嵌入延迟: {end_time - start_time:.4f}s")
return query_vector
def retrieve_documents(query_vector, top_k=3):
start_time = time.perf_counter()
distances, indices = index.search(query_vector, top_k)
retrieved_docs = [documents[i] for i in indices[0]]
end_time = time.perf_counter()
print(f"检索延迟: {end_time - start_time:.4f}s")
return retrieved_docs, indices[0]
def rerank_documents(query_text, retrieved_docs):
if not retrieved_docs:
return []
start_time = time.perf_counter()
pairs = [[query_text, doc] for doc in retrieved_docs]
scores = reranker_model.predict(pairs)
# 根据重排序器得分对文档进行排序
reranked_docs_with_scores = sorted(zip(scores, retrieved_docs), key=lambda x: x[0], reverse=True)
reranked_docs = [doc for score, doc in reranked_docs_with_scores]
end_time = time.perf_counter()
print(f"重排序延迟: {end_time - start_time:.4f}s")
return reranked_docs
def generate_answer(query_text, context_docs):
start_time = time.perf_counter()
# 模拟 LLM 生成延迟
# 在真实系统中,这涉及格式化上下文并调用 LLM
prompt = f"查询: {query_text}\n\n上下文:\n" + "\n".join(context_docs)
# print(f"LLM 的提示长度: {len(prompt)} 个字符")
time.sleep(0.5) # 模拟 LLM 处理时间
generated_text = f"根据上下文,与 '{query_text}' 相关联的答案在此合成。"
end_time = time.perf_counter()
print(f"生成延迟: {end_time - start_time:.4f}s (模拟)")
return generated_text
# 4. 端到端 RAG 函数
def full_rag_pipeline(query_text):
print(f"\n正在处理查询: '{query_text}'")
total_start_time = time.perf_counter()
query_vector = embed_query(query_text)
retrieved_docs, _ = retrieve_documents(query_vector, top_k=3)
print(f"已检索到: {retrieved_docs}")
reranked_docs = rerank_documents(query_text, retrieved_docs)
print(f"已重排序: {reranked_docs}")
# 使用重排序后的前 N 个文档作为生成上下文
context_for_generation = reranked_docs[:2] if reranked_docs else []
answer = generate_answer(query_text, context_for_generation)
total_end_time = time.perf_counter()
print(f"生成的答案: {answer}")
print(f"流水线总延迟: {total_end_time - total_start_time:.4f}s")
return answer
# 运行流水线
sample_query = "Tell me about ancient Rome"
_ = full_rag_pipeline(sample_query)
这个脚本提供了一个基本的 RAG 流程,带有打印语句,用于记录每个主要步骤的时间。运行它将使你初步了解时间消耗在哪里。
cProfile 和 SnakeViz 进行性能分析虽然 time.perf_counter() 对于粗粒度计时很有用,但 Python 内置的 cProfile 模块提供了更详细的函数调用时间明细。SnakeViz 随后可以可视化这些性能分析数据,使发现瓶颈更容易。
安装 SnakeViz:
pip install snakeviz
要对我们的 full_rag_pipeline 函数进行性能分析,你可以使用 cProfile 运行你的脚本:
python -m cProfile -o rag_profile.prof your_script_name.py
将 your_script_name.py 替换为你的 Python 文件名。此命令将执行你的脚本并将性能分析数据保存到 rag_profile.prof。
然后,使用 SnakeViz 可视化它:
snakeviz rag_profile.prof
这将打开一个网页浏览器界面。查找具有高“TotalTime”或“CumTime”(累计时间,包括子函数调用)的函数。你可能会看到模型推理(sentence-transformers 的 encode,cross-encoders 的 predict)以及我们模拟的用于生成的 time.sleep 消耗了大量时间。
从初步运行和 cProfile 输出中,你可能会发现:
time.sleep(0.5) 模拟)通常是 RAG 流水线中最耗时的部分。我们假设性能分析突出显示重排序步骤和 LLM 生成是主要的时间消耗者。
重排序器处理每个查询-文档对。如果初始检索返回 k 个文档,我们将执行 k 次交叉编码器预测。我们可以通过仅对最初检索到的顶部文档中较小的子集进行重排序来减少这种负载,例如 top_n_rerank,其中 top_n_rerank < k。
我们稍微修改一下 full_rag_pipeline 和 retrieve_documents 函数。我们将最初检索更多文档(例如,top_k_retrieve = 10),但只对更少的数量进行重排序(例如,top_n_rerank = 3)。
更新 retrieve_documents 以接受 top_k_retrieve 参数:
# ... (之前的代码) ...
def retrieve_documents(query_vector, top_k_retrieve=3): # 将 top_k 重命名为 top_k_retrieve
start_time = time.perf_counter()
distances, indices = index.search(query_vector, top_k_retrieve)
retrieved_docs = [documents[i] for i in indices[0]]
end_time = time.perf_counter()
print(f"检索延迟 ({top_k_retrieve} 个文档): {end_time - start_time:.4f}s")
return retrieved_docs, indices[0]
# ... (流水线其他阶段) ...
现在,修改 full_rag_pipeline 以实现选择性重排序:
# ... (之前的函数:embed_query, retrieve_documents, rerank_documents, generate_answer)
def full_rag_pipeline_optimized_reranking(query_text):
print(f"\n正在处理具有优化重排序的查询: '{query_text}'")
total_start_time = time.perf_counter()
query_vector = embed_query(query_text)
# 最初检索更多,例如,前 5 个
initial_retrieval_count = 5
documents_to_consider, _ = retrieve_documents(query_vector, top_k_retrieve=initial_retrieval_count)
print(f"最初检索到 ({initial_retrieval_count} 个): {documents_to_consider[:3]}...") # 显示前几个
# 只对这些文档中的顶部,例如,3 个,进行重排序
docs_for_reranking = documents_to_consider[:3]
reranked_docs = rerank_documents(query_text, docs_for_reranking)
print(f"已重排序 (从 {len(docs_for_reranking)} 个文档中): {reranked_docs}")
# 使用重排序后的前 N 个文档作为生成上下文
context_for_generation = reranked_docs[:2] if reranked_docs else []
answer = generate_answer(query_text, context_for_generation)
total_end_time = time.perf_counter()
print(f"生成的答案: {answer}")
print(f"流水线总延迟 (优化重排序): {total_end_time - total_start_time:.4f}s")
return answer
# 运行原始和优化后的流水线进行比较
sample_query = "Tell me about ancient Rome"
print("\n--- 正在运行基线流水线 ---")
_ = full_rag_pipeline(sample_query)
print("\n--- 正在运行具有优化重排序的流水线 ---")
_ = full_rag_pipeline_optimized_reranking(sample_query)
运行此代码后,比较“重排序延迟”和“流水线总延迟”的输出。如果重排序是与文档数量成比例的瓶颈,你应该会看到重排序时间的减少。权衡是,初始检索器排名低于 top_n_rerank 的潜在相关文档将没有机会被重排序器提升。性能和准确性之间的这种平衡在 RAG 优化中很常见。
如果某些类型的查询经常生成相似的上下文,缓存 LLM 的响应可以节省大量时间和成本。这里,我们将为 generate_answer 函数实现一个简单的内存缓存。对于生产环境,你会使用更完善的方案,例如 Redis。
# ... (之前的代码,包括模型初始化和其他流水线阶段) ...
llm_response_cache = {}
def generate_answer_with_cache(query_text, context_docs):
# 从查询和上下文创建缓存键
# 更完善的方法可能涉及哈希或文本归一化
cache_key_list = [query_text] + sorted(context_docs) # 对文档进行排序以保持键的一致性
cache_key = "##".join(cache_key_list)
if cache_key in llm_response_cache:
start_time = time.perf_counter()
cached_answer = llm_response_cache[cache_key]
end_time = time.perf_counter()
print(f"生成延迟 (缓存命中): {end_time - start_time:.4f}s (可忽略)")
return cached_answer
# 如果不在缓存中,则继续生成
start_time = time.perf_counter()
prompt = f"查询: {query_text}\n\n上下文:\n" + "\n".join(context_docs)
time.sleep(0.5) # 模拟 LLM 处理时间
generated_text = f"根据上下文,与 '{query_text}' 相关联的答案在此合成 (全新生成)。"
end_time = time.perf_counter()
llm_response_cache[cache_key] = generated_text # 存储到缓存中
print(f"生成延迟 (缓存未命中 - 模拟): {end_time - start_time:.4f}s")
return generated_text
# 更新优化后的流水线以使用带缓存的生成器
def full_rag_pipeline_optimized_reranking_and_cache(query_text):
print(f"\n正在处理具有优化重排序和缓存的查询: '{query_text}'")
total_start_time = time.perf_counter()
query_vector = embed_query(query_text)
initial_retrieval_count = 5
documents_to_consider, _ = retrieve_documents(query_vector, top_k_retrieve=initial_retrieval_count)
docs_for_reranking = documents_to_consider[:3]
reranked_docs = rerank_documents(query_text, docs_for_reranking)
context_for_generation = reranked_docs[:2] if reranked_docs else []
# 使用带缓存的生成器
answer = generate_answer_with_cache(query_text, context_for_generation)
total_end_time = time.perf_counter()
print(f"生成的答案: {answer}")
print(f"流水线总延迟 (优化重排序和缓存): {total_end_time - total_start_time:.4f}s")
return answer
# 测试缓存
sample_query = "Eiffel Tower information"
print("\n--- 正在运行具有优化重排序和缓存的流水线 (第一次) ---")
_ = full_rag_pipeline_optimized_reranking_and_cache(sample_query)
print("\n--- 正在运行具有优化重排序和缓存的流水线 (第二次 - 应命中缓存) ---")
_ = full_rag_pipeline_optimized_reranking_and_cache(sample_query)
当你运行此代码时,第一次调用 full_rag_pipeline_optimized_reranking_and_cache 对于给定查询,生成步骤将是缓存未命中。第二次使用 完全相同的查询和结果上下文 的调用应该会命中缓存,你将看到“生成延迟”和“流水线总延迟”显著减少。
应用优化后,可视化其影响很有用。假设我们的初始计时如下:
选择性重排序后(例如,检索 5 个,重排序 2 个):
加上缓存后(第二次相同请求时):
我们可以用图表表示:
不同 RAG 流水线配置的延迟细分。基线显示初始计时。“选择性重排序”减少重排序延迟。“优化 + 缓存(命中)”说明由于缓存,生成延迟显著减少。
本次实践练习涉及了几个重要方面。在实际应用中,你还会考虑:
nvidia-smi 这样的工具可以监控 GPU 使用情况。PyTorch Profiler 或 TensorFlow Profiler 可以提供关于 GPU 内核执行时间的见解。asyncio 进行异步执行。性能分析是一个迭代过程。优化一个瓶颈后,重新进行性能分析以找出下一个。始终衡量你的更改对延迟和整体系统质量(例如,检索和生成准确性)的影响。这种识别和解决性能问题的实践方法对于构建快速、响应迅速且可扩展的生产 RAG 系统是基础性的。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造