LangChain components fundamentally adhere to the Runnable interface. For creating custom, potentially stateful, or serializable components, inheriting from RunnableSerializable (found in langchain_core.runnables) is often a good choice. It provides a solid foundation and integrates well with the broader LangChain ecosystem, including LangSmith tracing. We'll also use Pydantic models to define clear input and output schemas for our component, enhancing type safety and clarity.
import re
import datetime
from typing import Dict, Any, Union
from pydantic import BaseModel, Field, field_validator, PrivateAttr, ConfigDict
from langchain_core.runnables import RunnableSerializable
from langchain_core.runnables.config import RunnableConfig
# Define Input and Output Schemas using Pydantic
class InputSchema(BaseModel):
user_query: str = Field(..., description="The user's input query, expected to match a pattern.")
class OutputSchema(BaseModel):
user_query: str
timestamp: datetime.datetime = Field(description="UTC timestamp when the input was processed.")
is_valid: bool = Field(default=True, description="Flag indicating successful validation.")
# Define the Custom Component
class InputValidatorEnricher(RunnableSerializable[InputSchema, OutputSchema]):
"""
A custom Runnable that validates the 'user_query' against a regex
pattern and enriches the input with a timestamp.
"""
pattern: str # Store the regex pattern
_compiled_pattern: re.Pattern = PrivateAttr()
# Configuration for Pydantic v2
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Pre-compile the regex for efficiency
self._compiled_pattern = re.compile(self.pattern)
@field_validator('pattern')
@classmethod
def validate_regex_pattern(cls, v: str) -> str:
try:
re.compile(v)
except re.error:
raise ValueError("Invalid regex pattern provided.")
return v
def _validate_and_enrich(self, input_data: InputSchema) -> OutputSchema:
"""Synchronous validation and enrichment logic."""
if not self._compiled_pattern.match(input_data.user_query):
# In a real application, you might raise a custom exception
# or return a specific error structure. Here we raise ValueError.
raise ValueError(f"Input query '{input_data.user_query}' does not match pattern '{self.pattern}'")
now_utc = datetime.datetime.now(datetime.timezone.utc)
enriched_data = OutputSchema(
user_query=input_data.user_query,
timestamp=now_utc,
is_valid=True
)
return enriched_data
def invoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema:
"""Synchronous execution method."""
# Validate input against the schema, handling both dict and object inputs
if isinstance(input, dict):
validated_input = InputSchema(**input)
else:
validated_input = input
# Perform the core logic
result = self._validate_and_enrich(validated_input)
return result
async def ainvoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema:
"""Asynchronous execution method."""
# For this specific component, the logic is inherently synchronous.
# In practical scenarios involving I/O (like API calls),
# you would use async libraries (e.g., httpx, aiohttp).
# Here, we simply wrap the synchronous call.
if isinstance(input, dict):
validated_input = InputSchema(**input)
else:
validated_input = input
result = self._validate_and_enrich(validated_input)
return result
# Define input and output types for better introspection and validation
@property
def InputType(self):
return InputSchema
@property
def OutputType(self):
return OutputSchema
In this implementation:
InputSchema and OutputSchema using Pydantic for clear data contracts.InputValidatorEnricher inherits from RunnableSerializable.PrivateAttr to store the compiled regex pattern, ensuring it's excluded from serialization but available for internal logic. The __init__ method initializes this attribute. A Pydantic field_validator ensures the provided pattern is valid regex._validate_and_enrich.invoke handles synchronous calls, validating the input (accepting either a dictionary or InputSchema object) before calling the core logic.ainvoke provides the asynchronous interface. Since our current logic is CPU-bound, we reuse the synchronous method logic. For I/O-bound tasks, you would implement genuinely asynchronous logic here.InputType and OutputType properties expose the Pydantic models, aiding LangChain's internal mechanisms and potentially LangSmith tracing.Cleaner syntax. Built-in debugging. Production-ready from day one.
Built for the AI systems behind ApX Machine Learning
Was this section helpful?
© 2026 ApX Machine LearningEngineered with