定制组件在生产环境中是必不可少的,尤其当标准的LangChain构建模块无法完全满足特定要求时。这些要求常常包括与专有的内部服务集成、执行独特的验证逻辑,或执行默认的加载器或解析器无法处理的复杂数据转换。创建自定义的Runnable组件是扩展LangChain功能以应对这些情况的标准方法,从而提供量身定制的解决方案。本次实践练习将指导您创建一个执行特定任务的自定义组件:根据模式验证输入数据,并用元数据对其进行丰富,然后再将其传递给链。这将巩固组件结构、异步执行以及LCEL管道中集成的这些知识点。定义自定义逻辑:输入验证与数据丰富设想一种情况,用户输入需要符合特定格式(例如订单ID),才能由LLM处理以查询状态。此外,我们希望自动向请求数据添加时间戳,用于日志记录或审计。标准的提示模板或解析器并非为这种组合的验证和丰富逻辑而设计。这是一个自定义组件的理想用例。我们的自定义组件,我们称之为InputValidatorEnricher,将会:接受一个字典作为输入,并期待一个特定键(例如user_query)。根据预定义的正则表达式,验证与user_query关联的值。如果验证失败,则抛出信息性错误。如果验证成功,则向字典添加一个新的键值对timestamp,包含当前的UTC时间戳。返回丰富后的字典。实现自定义Runnable组件LangChain组件基本遵循Runnable接口。对于创建自定义的、可能具有状态的或可序列化的组件,继承自RunnableSerializable(在langchain_core.runnables中)通常是一个不错的选择。它提供了一个坚实的基础,并与更广泛的LangChain生态系统良好集成,包括LangSmith追踪。我们还将使用Pydantic模型来定义组件清晰的输入和输出模式,提高类型安全性和清晰度。import re import datetime from typing import Dict, Any from pydantic import BaseModel, Field, validator from langchain_core.runnables import RunnableSerializable from langchain_core.runnables.config import RunnableConfig # 使用 Pydantic 定义输入和输出模式 class InputSchema(BaseModel): user_query: str = Field(..., description="用户的输入查询,应匹配特定模式。") class OutputSchema(BaseModel): user_query: str timestamp: datetime.datetime = Field(description="输入处理时的UTC时间戳。") is_valid: bool = Field(default=True, description="表示验证是否成功的标志。") # 定义自定义组件 class InputValidatorEnricher(RunnableSerializable[InputSchema, OutputSchema]): """ 一个自定义的Runnable,用于根据正则表达式模式验证“user_query” 并用时间戳丰富输入。 """ pattern: str # 存储正则表达式模式 class Config: # 允许在初始化时设置“pattern” arbitrary_types_allowed = True def __init__(self, pattern: str, **kwargs): super().__init__(**kwargs) self.pattern = pattern # 预编译正则表达式以提高效率 self._compiled_pattern = re.compile(pattern) @validator('pattern') def validate_regex_pattern(cls, v): try: re.compile(v) except re.error: raise ValueError("提供的正则表达式模式无效。") return v def _validate_and_enrich(self, input_data: InputSchema) -> OutputSchema: """同步验证和丰富逻辑。""" if not self._compiled_pattern.match(input_data.user_query): # 在实际应用中,您可能会抛出自定义异常 # 或返回特定的错误结构。这里我们抛出ValueError。 raise ValueError(f"输入查询 '{input_data.user_query}' 与模式 '{self.pattern}' 不匹配。") now_utc = datetime.datetime.now(datetime.timezone.utc) enriched_data = OutputSchema( user_query=input_data.user_query, timestamp=now_utc, is_valid=True ) return enriched_data def invoke(self, input: Dict[str, Any], config: RunnableConfig | None = None) -> OutputSchema: """同步执行方法。""" # 根据模式验证输入 validated_input = InputSchema(**input) # 执行核心逻辑 result = self._validate_and_enrich(validated_input) return result async def ainvoke(self, input: Dict[str, Any], config: RunnableConfig | None = None) -> OutputSchema: """异步执行方法。""" # 对于此特定组件,其逻辑本身是同步的。 " # 在涉及I/O(如API调用)的实际场景中," # 您将使用异步库(例如 httpx、aiohttp)。 # 在这里,我们只是封装同步调用。 # 或更复杂的情况下,您可以使用 asyncio.to_thread # 或实现原生异步逻辑。 validated_input = InputSchema(**input) result = self._validate_and_enrich(validated_input) # 如果需要,模拟异步操作,否则直接返回同步结果 # await asyncio.sleep(0) # 实际异步工作的示例占位符 return result # 定义输入和输出类型,以实现更好的内省和验证 @property def InputType(self): return InputSchema @property def OutputType(self): return OutputSchema 在此实现中:我们使用Pydantic定义InputSchema和OutputSchema,以实现清晰的数据契约。InputValidatorEnricher继承自RunnableSerializable。__init__方法接受正则表达式pattern并对其进行预编译。Pydantic的validator确保提供的模式是有效的正则表达式。核心逻辑封装在_validate_and_enrich中。invoke处理同步调用,首先根据InputSchema验证输入字典,然后调用核心逻辑。ainvoke提供异步接口。由于我们目前的逻辑是CPU密集型的,因此我们重用同步方法。对于I/O密集型任务,您将在此处实现真正的异步逻辑。InputType和OutputType属性公开Pydantic模型,有助于LangChain的内部机制和潜在的LangSmith追踪。将组件集成到LCEL链中现在,让我们将InputValidatorEnricher集成到一个简单的LCEL链中。我们将把自定义组件的输出通过管道传递给提示模板,然后再传递给语言模型。# 假设已导入所需内容:ChatOpenAI、PromptTemplate、StrOutputParser from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # 用于传递输入 # --- 定义链 --- # 1. 使用特定模式实例化我们的自定义组件 # 示例:匹配类似“ORD-12345”的订单ID validator_enricher = InputValidatorEnricher(pattern=r"^ORD-\d{5}$") # 2. 定义提示模板 # 它期望从我们的组件中获得输出字典 prompt = ChatPromptTemplate.from_template( "用户查询 '{user_query}' 在 {timestamp} 收到。请查询状态。" ) # 3. 实例化LLM # 确保您的环境中设置了OPENAI_API_KEY llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0) # 4. 定义输出解析器 parser = StrOutputParser() # 5. 使用LCEL管道语法构建链 # 我们使用RunnablePassthrough来在需要时提供原始输入, # 但这里我们主要将一步的输出传递给下一步。 # 链的输入应该是一个匹配InputSchema的字典:{"user_query": "..."} # 选项A:将自定义组件输出直接传递给提示 chain_a = validator_enricher | prompt | llm | parser # 选项B:如果您稍后需要原始输入和丰富后的输出 # (对于此特定提示不那么常见,但展示了模式) # chain_b = RunnablePassthrough.assign(enriched=validator_enricher) | prompt | llm | parser # 输入:{"user_query": "ORD-12345"} # RunnablePassthrough.assign的输出:{'user_query': 'ORD-12345', 'enriched': {'user_query': 'ORD-12345', 'timestamp': ..., 'is_valid': True}} # 如果使用chain_b访问{'enriched']['user_query'}等,则需要调整提示。 # 为简洁起见,我们在此处使用chain_a。 # --- 测试链 --- # 使用有效输入进行测试 valid_input = {"user_query": "ORD-98765"} print(f"正在使用有效输入进行测试:{valid_input}") try: # 使用invoke进行同步执行 result_valid = chain_a.invoke(valid_input) print("结果(有效输入):") print(result_valid) except Exception as e: print(f"错误(有效输入):{e}") print("\n" + "="*20 + "\n") # 使用无效输入进行测试 invalid_input = {"user_query": "lookup order 123"} print(f"正在使用无效输入进行测试:{invalid_input}") try: # 使用invoke进行同步执行 result_invalid = chain_a.invoke(invalid_input) print("结果(无效输入):") print(result_invalid) except ValueError as e: # 捕获我们预期的特定验证错误 print(f"捕获到预期错误(无效输入):{e}") except Exception as e: print(f"捕获到意外错误(无效输入):{e}") # 使用ainvoke的示例(需要异步环境) # import asyncio # async def run_async(): # result_async = await chain_a.ainvoke(valid_input) # print("\n异步结果(有效输入):") # print(result_async) # # # 运行异步函数: # # asyncio.run(run_async())在此集成中:我们使用所需的正则表达式模式创建了InputValidatorEnricher的一个实例。ChatPromptTemplate被设计为接受我们自定义组件的OutputSchema字典。我们使用标准的LCEL | 运算符将validator_enricher的输出直接传递给prompt。测试展示了有效输入如何流经整个链,而无效输入则在到达LLM之前,从我们的自定义组件中正确抛出ValueError,从而避免了不必要的API调用。验证与调试如测试用例所示,直接执行有助于验证组件在处理有效和无效输入时的行为。在更复杂的情况下,请记住之前讨论的调试技术:详细模式: 设置 langchain.debug = True 以获取详细的执行日志。LangSmith: 如果配置了LangSmith,它会自动追踪执行过程,使您能够检查每个步骤的输入和输出,包括您的自定义组件。这对于理解生产环境中的故障或意外行为非常有用。您可以清晰地看到InputValidatorEnricher执行的数据转换。本次实践练习演示了使用自定义逻辑扩展LangChain的基本过程。通过掌握Runnable接口和LCEL集成,您可以获得灵活性,将LangChain适应几乎任何任务,构建满足您特定需求的复杂且可用于生产的应用程序。这种将标准组件与自定义代码结合的能力是该框架的核心优势。