实现自定义的可运行组件LangChain 组件基本遵循 Runnable 接口。在制作定制的、可能带状态或可序列化的组件时,继承 RunnableSerializable(位于 langchain_core.runnables 中)通常是个不错的选择。它提供了坚实的基础,并与更广泛的 LangChain 生态系统(包括 LangSmith 追踪功能)良好结合。我们还将使用 Pydantic 模型来为组件定义明确的输入和输出结构,从而提高类型安全性和清晰度。import re import datetime from typing import Dict, Any, Union from pydantic import BaseModel, Field, field_validator, PrivateAttr, ConfigDict from langchain_core.runnables import RunnableSerializable from langchain_core.runnables.config import RunnableConfig # 定义输入和输出结构(使用 Pydantic) class InputSchema(BaseModel): user_query: str = Field(..., description="用户的输入查询,应匹配特定模式。") class OutputSchema(BaseModel): user_query: str timestamp: datetime.datetime = Field(description="处理输入时的 UTC 时间戳。") is_valid: bool = Field(default=True, description="指示验证成功的标记。") # 定义自定义组件 class InputValidatorEnricher(RunnableSerializable[InputSchema, OutputSchema]): """ 一个自定义的可运行组件,用于根据正则表达式 验证 'user_query',并用时间戳丰富输入。 """ pattern: str # 存储正则表达式模式 _compiled_pattern: re.Pattern = PrivateAttr() # Pydantic v2 的配置 model_config = ConfigDict(arbitrary_types_allowed=True) def __init__(self, **kwargs): super().__init__(**kwargs) # 预编译正则表达式以提高效率 self._compiled_pattern = re.compile(self.pattern) @field_validator('pattern') @classmethod def validate_regex_pattern(cls, v: str) -> str: try: re.compile(v) except re.error: raise ValueError("提供了无效的正则表达式模式。") return v def _validate_and_enrich(self, input_data: InputSchema) -> OutputSchema: """同步验证和丰富处理逻辑。""" if not self._compiled_pattern.match(input_data.user_query): # 在实际应用中,你可能会抛出自定义异常 # 或返回特定的错误结构。这里我们抛出 ValueError。 raise ValueError(f"输入查询 '{input_data.user_query}' 与模式 '{self.pattern}' 不匹配") now_utc = datetime.datetime.now(datetime.timezone.utc) enriched_data = OutputSchema( user_query=input_data.user_query, timestamp=now_utc, is_valid=True ) return enriched_data def invoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema: """同步执行方法。""" # 根据结构验证输入,同时处理字典和对象输入 if isinstance(input, dict): validated_input = InputSchema(**input) else: validated_input = input # 执行核心逻辑 result = self._validate_and_enrich(validated_input) return result async def ainvoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema: """异步执行方法。""" # 对于此特定组件,其逻辑本身是同步的。 # 在涉及 I/O(如 API 调用)的实际场景中, # 你会使用异步库(例如 httpx, aiohttp)。 # 这里,我们只是简单地包装同步调用。 if isinstance(input, dict): validated_input = InputSchema(**input) else: validated_input = input result = self._validate_and_enrich(validated_input) return result # 定义输入和输出类型,以便更好地自省和验证 @property def InputType(self): return InputSchema @property def OutputType(self): return OutputSchema在此实现中:我们使用 Pydantic 定义 InputSchema 和 OutputSchema,以获得清晰的数据契约。InputValidatorEnricher 继承自 RunnableSerializable。我们使用 PrivateAttr 来存储已编译的正则表达式模式,确保它不被序列化,但可用于内部逻辑。__init__ 方法会初始化此属性。一个 Pydantic 的 field_validator 会确保提供的模式是有效的正则表达式。核心逻辑被封装在 _validate_and_enrich 方法中。invoke 处理同步调用,在调用核心逻辑之前验证输入(接受字典或 InputSchema 对象)。ainvoke 提供异步接口。由于我们目前的逻辑是 CPU 密集型的,我们重复使用同步方法逻辑。对于 I/O 密集型任务,你应在此处实现真正的异步逻辑。InputType 和 OutputType 属性公开 Pydantic 模型,这有助于 LangChain 的内部机制以及 LangSmith 追踪功能。