Implementing the Custom Runnable ComponentLangChain components fundamentally adhere to the Runnable interface. For creating custom, potentially stateful, or serializable components, inheriting from RunnableSerializable (found in langchain_core.runnables) is often a good choice. It provides a solid foundation and integrates well with the broader LangChain ecosystem, including LangSmith tracing. We'll also use Pydantic models to define clear input and output schemas for our component, enhancing type safety and clarity.import re import datetime from typing import Dict, Any, Union from pydantic import BaseModel, Field, field_validator, PrivateAttr, ConfigDict from langchain_core.runnables import RunnableSerializable from langchain_core.runnables.config import RunnableConfig # Define Input and Output Schemas using Pydantic class InputSchema(BaseModel): user_query: str = Field(..., description="The user's input query, expected to match a pattern.") class OutputSchema(BaseModel): user_query: str timestamp: datetime.datetime = Field(description="UTC timestamp when the input was processed.") is_valid: bool = Field(default=True, description="Flag indicating successful validation.") # Define the Custom Component class InputValidatorEnricher(RunnableSerializable[InputSchema, OutputSchema]): """ A custom Runnable that validates the 'user_query' against a regex pattern and enriches the input with a timestamp. """ pattern: str # Store the regex pattern _compiled_pattern: re.Pattern = PrivateAttr() # Configuration for Pydantic v2 model_config = ConfigDict(arbitrary_types_allowed=True) def __init__(self, **kwargs): super().__init__(**kwargs) # Pre-compile the regex for efficiency self._compiled_pattern = re.compile(self.pattern) @field_validator('pattern') @classmethod def validate_regex_pattern(cls, v: str) -> str: try: re.compile(v) except re.error: raise ValueError("Invalid regex pattern provided.") return v def _validate_and_enrich(self, input_data: InputSchema) -> OutputSchema: """Synchronous validation and enrichment logic.""" if not self._compiled_pattern.match(input_data.user_query): # In a real application, you might raise a custom exception # or return a specific error structure. Here we raise ValueError. raise ValueError(f"Input query '{input_data.user_query}' does not match pattern '{self.pattern}'") now_utc = datetime.datetime.now(datetime.timezone.utc) enriched_data = OutputSchema( user_query=input_data.user_query, timestamp=now_utc, is_valid=True ) return enriched_data def invoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema: """Synchronous execution method.""" # Validate input against the schema, handling both dict and object inputs if isinstance(input, dict): validated_input = InputSchema(**input) else: validated_input = input # Perform the core logic result = self._validate_and_enrich(validated_input) return result async def ainvoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema: """Asynchronous execution method.""" # For this specific component, the logic is inherently synchronous. # In practical scenarios involving I/O (like API calls), # you would use async libraries (e.g., httpx, aiohttp). # Here, we simply wrap the synchronous call. if isinstance(input, dict): validated_input = InputSchema(**input) else: validated_input = input result = self._validate_and_enrich(validated_input) return result # Define input and output types for better introspection and validation @property def InputType(self): return InputSchema @property def OutputType(self): return OutputSchemaIn this implementation:We define InputSchema and OutputSchema using Pydantic for clear data contracts.InputValidatorEnricher inherits from RunnableSerializable.We use PrivateAttr to store the compiled regex pattern, ensuring it's excluded from serialization but available for internal logic. The __init__ method initializes this attribute. A Pydantic field_validator ensures the provided pattern is valid regex.The core logic is encapsulated in _validate_and_enrich.invoke handles synchronous calls, validating the input (accepting either a dictionary or InputSchema object) before calling the core logic.ainvoke provides the asynchronous interface. Since our current logic is CPU-bound, we reuse the synchronous method logic. For I/O-bound tasks, you would implement genuinely asynchronous logic here.InputType and OutputType properties expose the Pydantic models, aiding LangChain's internal mechanisms and potentially LangSmith tracing.