趋近智
LangChain 组件基本遵循 Runnable 接口。在制作定制的、可能带状态或可序列化的组件时,继承 RunnableSerializable(位于 langchain_core.runnables 中)通常是个不错的选择。它提供了坚实的基础,并与更广泛的 LangChain 生态系统(包括 LangSmith 追踪功能)良好结合。我们还将使用 Pydantic 模型来为组件定义明确的输入和输出结构,从而提高类型安全性和清晰度。
import re
import datetime
from typing import Dict, Any, Union
from pydantic import BaseModel, Field, field_validator, PrivateAttr, ConfigDict
from langchain_core.runnables import RunnableSerializable
from langchain_core.runnables.config import RunnableConfig
# 定义输入和输出结构(使用 Pydantic)
class InputSchema(BaseModel):
user_query: str = Field(..., description="用户的输入查询,应匹配特定模式。")
class OutputSchema(BaseModel):
user_query: str
timestamp: datetime.datetime = Field(description="处理输入时的 UTC 时间戳。")
is_valid: bool = Field(default=True, description="指示验证成功的标记。")
# 定义自定义组件
class InputValidatorEnricher(RunnableSerializable[InputSchema, OutputSchema]):
"""
一个自定义的可运行组件,用于根据正则表达式
验证 'user_query',并用时间戳丰富输入。
"""
pattern: str # 存储正则表达式模式
_compiled_pattern: re.Pattern = PrivateAttr()
# Pydantic v2 的配置
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 预编译正则表达式以提高效率
self._compiled_pattern = re.compile(self.pattern)
@field_validator('pattern')
@classmethod
def validate_regex_pattern(cls, v: str) -> str:
try:
re.compile(v)
except re.error:
raise ValueError("提供了无效的正则表达式模式。")
return v
def _validate_and_enrich(self, input_data: InputSchema) -> OutputSchema:
"""同步验证和丰富处理逻辑。"""
if not self._compiled_pattern.match(input_data.user_query):
# 在实际应用中,你可能会抛出自定义异常
# 或返回特定的错误结构。这里我们抛出 ValueError。
raise ValueError(f"输入查询 '{input_data.user_query}' 与模式 '{self.pattern}' 不匹配")
now_utc = datetime.datetime.now(datetime.timezone.utc)
enriched_data = OutputSchema(
user_query=input_data.user_query,
timestamp=now_utc,
is_valid=True
)
return enriched_data
def invoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema:
"""同步执行方法。"""
# 根据结构验证输入,同时处理字典和对象输入
if isinstance(input, dict):
validated_input = InputSchema(**input)
else:
validated_input = input
# 执行核心逻辑
result = self._validate_and_enrich(validated_input)
return result
async def ainvoke(self, input: Union[Dict[str, Any], InputSchema], config: RunnableConfig | None = None) -> OutputSchema:
"""异步执行方法。"""
# 对于此特定组件,其逻辑本身是同步的。
# 在涉及 I/O(如 API 调用)的实际场景中,
# 你会使用异步库(例如 httpx, aiohttp)。
# 这里,我们只是简单地包装同步调用。
if isinstance(input, dict):
validated_input = InputSchema(**input)
else:
validated_input = input
result = self._validate_and_enrich(validated_input)
return result
# 定义输入和输出类型,以便更好地自省和验证
@property
def InputType(self):
return InputSchema
@property
def OutputType(self):
return OutputSchema
在此实现中:
InputSchema 和 OutputSchema,以获得清晰的数据契约。InputValidatorEnricher 继承自 RunnableSerializable。PrivateAttr 来存储已编译的正则表达式模式,确保它不被序列化,但可用于内部逻辑。__init__ 方法会初始化此属性。一个 Pydantic 的 field_validator 会确保提供的模式是有效的正则表达式。_validate_and_enrich 方法中。invoke 处理同步调用,在调用核心逻辑之前验证输入(接受字典或 InputSchema 对象)。ainvoke 提供异步接口。由于我们目前的逻辑是 CPU 密集型的,我们重复使用同步方法逻辑。对于 I/O 密集型任务,你应在此处实现真正的异步逻辑。InputType 和 OutputType 属性公开 Pydantic 模型,这有助于 LangChain 的内部机制以及 LangSmith 追踪功能。简洁的语法。内置调试功能。从第一天起就可投入生产。
为 ApX 背后的 AI 系统而构建
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造