This practical exercise synthesizes the concepts covered in this chapter. We will build a Retrieval-Augmented Generation (RAG) pipeline that goes beyond basic vector search, incorporating advanced document handling, hybrid retrieval, and re-ranking techniques suitable for production environments. The goal is to construct a system that provides more relevant and accurate answers by leveraging multiple retrieval signals and refining the retrieved context before generation.
Assume you have a collection of documents (e.g., technical articles, project documentation, or research papers in PDF or Markdown format) that you want to use as the knowledge base for a question-answering system.
Prerequisites: Ensure you have the necessary libraries installed:
pip install langchain langchain-openai langchain-community sentence-transformers faiss-cpu tiktoken pypdf rank_bm25 # Or faiss-gpu if you have CUDA
# Or substitute faiss with another vector store client like chromadb or pinecone-client
pip install chromadb # Example using Chroma
pip install unstructured # For more document types
You will also need access to an LLM (like OpenAI's models) and potentially set API keys as environment variables.
Instead of simple fixed-size chunks, we'll use RecursiveCharacterTextSplitter
which attempts to split based on semantic boundaries first (paragraphs, sentences) before resorting to character counts. This often preserves context better. We also need to handle loading potentially diverse document types.
import os
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader, UnstructuredMarkdownLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings # Or use another embedding provider
from langchain_community.vectorstores import Chroma # Or Pinecone, FAISS, etc.
# Configure for your document path and types
DOCS_PATH = "./your_documents"
# Use DirectoryLoader for flexibility, configure loaders per file type if needed
# Example: Load only PDFs for simplicity here
loader = DirectoryLoader(DOCS_PATH, glob="**/*.pdf", loader_cls=PyPDFLoader, show_progress=True)
documents = loader.load()
# Advanced Chunking Strategy
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200, # Overlap helps maintain context between chunks
length_function=len,
add_start_index=True, # Useful for potential parent document retrieval later
)
chunks = text_splitter.split_documents(documents)
print(f"Loaded {len(documents)} documents.")
print(f"Split into {len(chunks)} chunks.")
# Initialize embedding model (ensure OPENAI_API_KEY is set if using OpenAI)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
Consider using UnstructuredMarkdownLoader
or other specific loaders from langchain_community.document_loaders
if you have different file types. The parameters for RecursiveCharacterTextSplitter
(chunk_size
, chunk_overlap
) often require tuning based on your document characteristics and downstream LLM context window size.
A robust RAG system often benefits from combining dense (vector) and sparse (keyword-based) retrieval. We'll set up both.
a) Dense Retrieval (Vector Store)
We index the document chunks into a vector store. Here, we use Chroma for a local example, but substitute your preferred production-grade vector store (Pinecone, Weaviate, etc.) as needed.
# Initialize Vector Store and Index Chunks
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db_hybrid" # Choose a directory to save the index
)
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) # Retrieve more initially for re-ranking
print("Vector store initialized and chunks indexed.")
b) Sparse Retrieval (BM25)
BM25 (Best Matching 25) is a popular keyword-based algorithm. LangChain provides a retriever for it.
from langchain.retrievers import BM25Retriever
# BM25 requires the raw text of the chunks
# Make sure your 'chunks' contain page_content attribute
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 10 # Retrieve top 10 results based on keywords
print("BM25 retriever initialized.")
Now, combine the dense and sparse retrievers using EnsembleRetriever
. This allows weighting the contribution of each method. The optimal weights often depend on the specific dataset and query types and require experimentation.
from langchain.retrievers import EnsembleRetriever
# Initialize the Ensemble Retriever
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.4, 0.6] # Example weights: prioritize vector search slightly
)
print("Ensemble retriever created.")
# Test retrieval (optional)
# sample_query = "What are the best practices for prompt engineering?"
# retrieved_docs = ensemble_retriever.invoke(sample_query)
# print(f"Retrieved {len(retrieved_docs)} docs for sample query.")
Hybrid search retrieves a diverse set of documents, but some might still be only marginally relevant. A re-ranking step using a more computationally intensive cross-encoder model can significantly improve the final context quality passed to the LLM.
We use ContextualCompressionRetriever
which wraps our ensemble retriever and applies a re-ranking model.
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers import ContextualCompressionRetriever
# Initialize a cross-encoder model
# Models like 'cross-encoder/ms-marco-MiniLM-L-6-v2' are efficient and effective
model = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")
# Initialize the reranker compressor
compressor = CrossEncoderReranker(model=model, top_n=5) # Keep top 5 most relevant docs after re-ranking
# Create the compression retriever
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=ensemble_retriever # Use the hybrid retriever as the base
)
print("Re-ranking retriever configured.")
# Test retrieval with re-ranking (optional)
# reranked_docs = compression_retriever.invoke(sample_query)
# print(f"Re-ranked to {len(reranked_docs)} docs for sample query.")
# Compare content of reranked_docs vs retrieved_docs
The top_n
parameter controls how many documents are kept after re-ranking. This helps focus the LLM on the most promising information and fits within context window limits.
Finally, integrate the optimized retriever into a complete RAG chain using LangChain Expression Language (LCEL). The chain fetches context using our compression_retriever
, formats it into a prompt, sends it to the LLM, and parses the output.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# Initialize the LLM
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) # Or gpt-4, claude, etc.
# Define the prompt template
template = """You are an assistant for question-answering tasks.
Use only the following retrieved context to answer the question.
If you don't know the answer from the context, just say that you don't know.
Keep the answer concise and directly based on the provided information.
Context:
{context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
# Helper function to format retrieved documents
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# Build the RAG chain
rag_chain = (
{"context": compression_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
print("Optimized RAG chain constructed.")
This LCEL structure clearly defines the data flow: the user's question (RunnablePassthrough()
) is passed along, while also being used by the compression_retriever
to fetch and format the context. These are then combined in the prompt template, processed by the LLM, and the final string output is parsed.
You can now invoke the chain with user questions:
query = "Explain the concept of hybrid search in RAG."
final_answer = rag_chain.invoke(query)
print("\n--- Final Answer ---")
print(final_answer)
query_2 = "What is the capital of France?" # Example of out-of-context question
final_answer_2 = rag_chain.invoke(query_2)
print("\n--- Final Answer 2 ---")
print(final_answer_2)
Visualizing the Flow:
Here's a conceptual diagram of the pipeline we've built:
Flow of the optimized RAG pipeline, incorporating hybrid search and re-ranking stages before final answer generation by the LLM. Dashed lines indicate optional query transformation.
Evaluation: This optimized pipeline should yield better results than a basic vector-search-only RAG. However, rigorous evaluation is necessary. As discussed in Chapter 5, use tools like LangSmith to trace executions and define metrics (e.g., RAGAS metrics like faithfulness, context precision, context recall, answer relevance) evaluated over a representative dataset of question-answer pairs. Compare this pipeline's performance against simpler baselines to quantify the improvements gained from hybrid search and re-ranking. Tuning the ensemble weights (weights
in EnsembleRetriever
) and the number of documents kept after re-ranking (top_n
in CrossEncoderReranker
) are common optimization steps guided by evaluation results.
This exercise provides a template for a production-oriented RAG system. You can further enhance it by integrating query transformations (like HyDE), implementing parent document retrieval strategies for better context coherence, and refining the data loading/chunking specific to your document corpus. Remember that monitoring (Chapter 5) and robust deployment practices (Chapter 7) are essential for maintaining performance and reliability in production.
© 2025 ApX Machine Learning