Now that you've explored the internal workings of LCEL, asynchronous patterns, and standard component customization, it's time to put this knowledge into practice by building your own bespoke component. In production systems, you'll often encounter scenarios where the standard LangChain building blocks don't perfectly match your specific requirements. You might need to integrate with a proprietary internal service, enforce unique validation logic, or perform complex data transformations not covered by default loaders or parsers. Creating custom Runnable
components is the standard way to extend LangChain's capabilities for these situations.
This practical exercise guides you through creating a custom component that performs a specific task: validating input data against a pattern and enriching it with metadata before passing it along the chain. This reinforces the concepts of component structure, asynchronous execution, and integration within LCEL pipelines.
Imagine a scenario where user input needs to adhere to a specific format (e.g., an order ID) before being processed by an LLM for status lookup. Additionally, we want to automatically add a timestamp to the request data for logging or auditing purposes. A standard prompt template or parser isn't designed for this combined validation and enrichment logic. This is an ideal case for a custom component.
Our custom component, let's call it InputValidatorEnricher
, will:
user_query
).user_query
against a predefined regular expression.timestamp
, containing the current UTC timestamp.LangChain components fundamentally adhere to the Runnable
interface. For creating custom, potentially stateful, or serializable components, inheriting from RunnableSerializable
(found in langchain_core.runnables
) is often a good choice. It provides a solid foundation and integrates well with the broader LangChain ecosystem, including LangSmith tracing. We'll also use Pydantic models to define clear input and output schemas for our component, enhancing type safety and clarity.
import re
import datetime
from typing import Dict, Any
from pydantic import BaseModel, Field, validator
from langchain_core.runnables import RunnableSerializable
from langchain_core.runnables.config import RunnableConfig
# Define Input and Output Schemas using Pydantic
class InputSchema(BaseModel):
user_query: str = Field(..., description="The user's input query, expected to match a pattern.")
class OutputSchema(BaseModel):
user_query: str
timestamp: datetime.datetime = Field(description="UTC timestamp when the input was processed.")
is_valid: bool = Field(default=True, description="Flag indicating successful validation.")
# Define the Custom Component
class InputValidatorEnricher(RunnableSerializable[InputSchema, OutputSchema]):
"""
A custom Runnable that validates the 'user_query' against a regex
pattern and enriches the input with a timestamp.
"""
pattern: str # Store the regex pattern
class Config:
# Allows 'pattern' to be set during initialization
arbitrary_types_allowed = True
def __init__(self, pattern: str, **kwargs):
super().__init__(**kwargs)
self.pattern = pattern
# Pre-compile the regex for efficiency
self._compiled_pattern = re.compile(pattern)
@validator('pattern')
def validate_regex_pattern(cls, v):
try:
re.compile(v)
except re.error:
raise ValueError("Invalid regex pattern provided.")
return v
def _validate_and_enrich(self, input_data: InputSchema) -> OutputSchema:
"""Synchronous validation and enrichment logic."""
if not self._compiled_pattern.match(input_data.user_query):
# In a real application, you might raise a custom exception
# or return a specific error structure. Here we raise ValueError.
raise ValueError(f"Input query '{input_data.user_query}' does not match pattern '{self.pattern}'")
now_utc = datetime.datetime.now(datetime.timezone.utc)
enriched_data = OutputSchema(
user_query=input_data.user_query,
timestamp=now_utc,
is_valid=True
)
return enriched_data
def invoke(self, input: Dict[str, Any], config: RunnableConfig | None = None) -> OutputSchema:
"""Synchronous execution method."""
# Validate input against the schema
validated_input = InputSchema(**input)
# Perform the core logic
result = self._validate_and_enrich(validated_input)
return result
async def ainvoke(self, input: Dict[str, Any], config: RunnableConfig | None = None) -> OutputSchema:
"""Asynchronous execution method."""
# For this specific component, the logic is inherently synchronous.
# In real-world scenarios involving I/O (like API calls),
# you would use async libraries (e.g., httpx, aiohttp).
# Here, we simply wrap the synchronous call.
# In more complex cases, you might use asyncio.to_thread
# or implement native async logic.
validated_input = InputSchema(**input)
result = self._validate_and_enrich(validated_input)
# Simulate async operation if needed, otherwise just return sync result
# await asyncio.sleep(0) # Example placeholder for actual async work
return result
# Define input and output types for better introspection and validation
@property
def InputType(self):
return InputSchema
@property
def OutputType(self):
return OutputSchema
In this implementation:
InputSchema
and OutputSchema
using Pydantic for clear data contracts.InputValidatorEnricher
inherits from RunnableSerializable
.__init__
method accepts the regex pattern
and pre-compiles it. A Pydantic validator
ensures the provided pattern is valid regex._validate_and_enrich
.invoke
handles synchronous calls, first validating the input dictionary against InputSchema
, then calling the core logic.ainvoke
provides the asynchronous interface. Since our current logic is CPU-bound, we reuse the synchronous method. For I/O-bound tasks, you would implement genuinely asynchronous logic here.InputType
and OutputType
properties expose the Pydantic models, aiding LangChain's internal mechanisms and potentially LangSmith tracing.Now, let's integrate our InputValidatorEnricher
into a simple LCEL chain. We'll pipe the output of our custom component into a prompt template and then to a language model.
# Assume necessary imports: ChatOpenAI, PromptTemplate, StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough # To pass input through
# --- Define the Chain ---
# 1. Instantiate our custom component with a specific pattern
# Example: Match order IDs like 'ORD-12345'
validator_enricher = InputValidatorEnricher(pattern=r"^ORD-\d{5}$")
# 2. Define the prompt template
# It expects the output dictionary from our component
prompt = ChatPromptTemplate.from_template(
"User query '{user_query}' received at {timestamp}. Please look up the status."
)
# 3. Instantiate the LLM
# Ensure you have OPENAI_API_KEY set in your environment
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# 4. Define the output parser
parser = StrOutputParser()
# 5. Construct the chain using LCEL pipe syntax
# We use RunnablePassthrough to make the original input available if needed,
# but here we mainly pipe the output of one step to the next.
# The input to the chain should be a dictionary matching InputSchema: {"user_query": "..."}
# Option A: Directly pipe the custom component output to the prompt
chain_a = validator_enricher | prompt | llm | parser
# Option B: If you needed the original input *and* the enriched output later
# (less common for this specific prompt, but shows the pattern)
# chain_b = RunnablePassthrough.assign(enriched=validator_enricher) | prompt | llm | parser
# Input: {"user_query": "ORD-12345"}
# Output of RunnablePassthrough.assign: {'user_query': 'ORD-12345', 'enriched': {'user_query': 'ORD-12345', 'timestamp': ..., 'is_valid': True}}
# The prompt needs adjustment if using chain_b to access {'enriched']['user_query'], etc.
# We will use chain_a for simplicity here.
# --- Testing the Chain ---
# Test with valid input
valid_input = {"user_query": "ORD-98765"}
print(f"Testing with valid input: {valid_input}")
try:
# Use invoke for synchronous execution
result_valid = chain_a.invoke(valid_input)
print("Result (Valid Input):")
print(result_valid)
except Exception as e:
print(f"Error (Valid Input): {e}")
print("\n" + "="*20 + "\n")
# Test with invalid input
invalid_input = {"user_query": "lookup order 123"}
print(f"Testing with invalid input: {invalid_input}")
try:
# Use invoke for synchronous execution
result_invalid = chain_a.invoke(invalid_input)
print("Result (Invalid Input):")
print(result_invalid)
except ValueError as e:
# Catch the specific validation error we expect
print(f"Caught Expected Error (Invalid Input): {e}")
except Exception as e:
print(f"Caught Unexpected Error (Invalid Input): {e}")
# Example of using ainvoke (requires an async environment)
# import asyncio
# async def run_async():
# result_async = await chain_a.ainvoke(valid_input)
# print("\nAsync Result (Valid Input):")
# print(result_async)
#
# # To run the async function:
# # asyncio.run(run_async())
In this integration:
InputValidatorEnricher
with our desired regex pattern.ChatPromptTemplate
is designed to accept the OutputSchema
dictionary from our custom component.|
operator to pipe the output of validator_enricher
directly into the prompt
.ValueError
from our custom component before reaching the LLM, preventing unnecessary API calls.As shown in the test cases, direct execution helps verify the component's behavior for both valid and invalid inputs. In more complex scenarios, remember the debugging techniques discussed earlier:
langchain.debug = True
for detailed execution logs.InputValidatorEnricher
.This practical exercise demonstrates the fundamental process of extending LangChain with custom logic. By mastering the Runnable
interface and LCEL integration, you gain the flexibility to adapt LangChain to nearly any task, building sophisticated, production-ready applications tailored to your specific needs. This ability to seamlessly blend standard components with custom code is a core strength of the framework.
© 2025 ApX Machine Learning