当你的 LangChain 应用不再是简单的问答或文本生成时,你将经常遇到需要多步骤处理、条件逻辑或在不同执行阶段累积信息的场景。在这些复杂链中,有效地管理状态——即在链调用过程中持续存在和演变的数据——成为设计中的一个重要考虑。无状态执行,即每个组件只接收前一个组件的直接输出,不足以处理精细的工作流程。设想一个应用,它首先分析用户情感,然后根据情感和查询获取相关文档,最后生成一个根据该情感定制的响应。第一步中确定的情感需要对最终的生成步骤可用,即使中间有一个文档检索步骤。这要求在主要数据流之外传递状态信息。状态管理面临的挑战在链中管理状态存在几个挑战:信息传递: 如何将早期步骤计算出的信息传递到后续步骤,同时跳过不需要它的中间组件?数据累积: 如何收集多个步骤的结果或中间计算,并使它们统一可用于最终的合成步骤?条件逻辑: 链的执行路径如何根据计算出的状态动态改变(例如,路由到不同的子链)?清晰度和调试: 随着状态管理变得更复杂,如何保持可理解性,并更容易地追踪状态在执行过程中如何变化?LangChain 表达式语言 (LCEL) 提供多种机制和模式来应对这些挑战,让你能够有效地构建有状态的、复杂序列。LCEL 状态管理核心机制LCEL 的可组合性提供了处理状态的灵活方式。基本思想通常是通过链传递一个字典或自定义数据对象,不同的组件可以读取或写入该对象中的特定键。使用字典和 RunnablePassthrough最常见的方法是传递字典。RunnablePassthrough 在此处特别有用。它允许原始输入(或其选定部分)与并行计算的结果一起传递。通常,输入本身就是持有状态的字典。你可以在 Runnable 上使用 .assign(**kwargs) 方法向输出字典添加新键。这是一个在链执行过程中扩展状态的整洁方法。from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate # 假设 'llm' 是一个已初始化的 ChatOpenAI 实例 # 步骤 1: 初始处理,可能提取实体 prompt1 = ChatPromptTemplate.from_template("Extract names from: {input}") chain1 = prompt1 | llm # 步骤 2: 使用提取出的名称(状态)和原始输入进行下一步 prompt2 = ChatPromptTemplate.from_template( "Generate a greeting for {name} based on this context: {original_input}" ) chain2 = prompt2 | llm # 组合,传递原始输入并将 'name' 添加到状态字典中 # 此链的输入预计为一个字典,例如 {"input": "John Doe visited Paris."} complex_chain = RunnablePassthrough.assign( name=chain1, # 运行 chain1,将其结果添加到 'name' 键下 lambda x: x["input"] # 将 'input' 键作为 'original_input' 传递 ) | chain2 # chain2 现在接收 {'name': 'John Doe', 'original_input': '...'} # 示例调用: # result = complex_chain.invoke({"input": "Alice went to the store."}) # print(result)在此示例中,RunnablePassthrough.assign 运行 chain1 并将其输出添加到字典的 name 键下。它还明确地将原始 input 值以新键 original_input 传递。随后的 chain2 就能访问 name(由 chain1 添加的状态)和 original_input。RunnableParallel 用于结构化状态RunnableParallel(通常在链中通过字典字面量语法使用)允许在相同输入(或其转换)上并行运行多个 Runnable,并将它们的结果收集到一个字典中。这对于明确地组织状态很有用。from langchain_core.runnables import RunnableParallel # 步骤 1a: 提取主题 prompt_topic = ChatPromptTemplate.from_template("What is the topic of: {input}?") chain_topic = prompt_topic | llm # 步骤 1b: 提取情感 prompt_sentiment = ChatPromptTemplate.from_template("What is the sentiment of: {input}?") chain_sentiment = prompt_sentiment | llm # 步骤 2: 使用主题和情感进行总结 prompt_summary = ChatPromptTemplate.from_template( "Summarize this text: {original_input}\nFocusing on the topic: {topic}\nAdopt a {sentiment} tone." ) chain_summary = prompt_summary | llm # 使用 RunnableParallel 组合以创建状态字典 state_creation = RunnableParallel( topic=chain_topic, sentiment=chain_sentiment, original_input=RunnablePassthrough() # 传递原始输入 ) full_chain = state_creation | chain_summary # 示例调用: # input_text = "The new product launch was a huge success, exceeding all expectations." # result = full_chain.invoke({"input": input_text}) # print(result)在这里,state_creation 同时确定主题和情感,将它们与原始输入一起打包成一个字典。这个字典随后成为 chain_summary 的输入。自定义函数和 Runnables对于更精细的状态逻辑,你可以使用 RunnableLambda 整合标准 Python 函数或定义自定义 Runnable 类。这允许对状态对象进行任意计算和操作。from langchain_core.runnables import RunnableLambda def complex_state_logic(state_dict): # 示例:根据中间结果修改状态 if "topic" in state_dict and "sentiment" in state_dict: state_dict["priority"] = "High" if state_dict["sentiment"] == "Positive" else "Medium" # ... 可能有更复杂的逻辑 ... return state_dict # 返回修改后的状态 # 假设来自之前示例的 state_creation 链 full_chain_with_custom_logic = ( state_creation | RunnableLambda(complex_state_logic) # 设置 'priority', 'sentiment' 的逻辑 | chain_summary # 如果需要,chain_summary 现在可以使用 'priority' ) # 示例调用: # input_text = "Customer reported a critical bug. Needs urgent attention." # result = full_chain_with_custom_logic.invoke({"input": input_text}) # print(result)使用 RunnableLambda(或继承自 Runnable 的自定义类)为实现特定的状态转换逻辑提供最大灵活性,否则仅靠 assign 或 RunnableParallel 可能过于复杂。状态管理策略与模式集中式状态字典最直接的模式是在整个链中传递一个字典。每一步读取其所需信息,并可能添加或更新键。优点: 基本流程易于理解;所有状态集中在一处。缺点: 字典可能变得庞大且难以管理;组件可能通过共享键隐式耦合;更难追踪哪个组件修改了哪部分状态。使用选择进行作用域状态管理你可以使用 LCEL 的项获取语法(itemgetter)或 RunnableLambda 函数为特定组件仅选择状态字典中必需的部分。这能防止组件访问或修改它们不需要的状态。from operator import itemgetter from langchain_core.runnables import RunnableConfig # 假设 chain_a 需要 {'input': '...'},chain_b 需要 {'data': '...'} # 状态字典可能是 {'input': '...', 'data': '...', 'temp': '...'} scoped_chain = RunnableParallel( result_a=itemgetter('input') | chain_a, result_b=itemgetter('data') | chain_b ) # 输出将是 {'result_a': ..., 'result_b': ...} # 原始状态中的 'temp' 键在此处被有效丢弃/忽略。 # 示例调用 # state = {'input': 'some text', 'data': 'other data', 'temp': 123} # result = scoped_chain.invoke(state, config=RunnableConfig(max_concurrency=5)) # print(result)使用 RunnableBranch 进行条件执行状态对于引导执行流程至关重要。RunnableBranch 允许你根据对输入评估的条件,将输入(包括状态字典)路由到不同的 Runnable。from langchain_core.runnables import RunnableBranch # 根据状态定义分支 # 条件函数检查状态字典 def check_if_urgent(state_dict): return state_dict.get("priority") == "High" def check_if_positive(state_dict): return state_dict.get("sentiment") == "Positive" # 假设 urgent_chain, positive_chain, default_chain 是已定义的 Runnables branch = RunnableBranch( (check_if_urgent, urgent_chain), # 如果紧急,运行此链 (check_if_positive, positive_chain), # 否则,如果积极,运行此链 default_chain # 否则,运行此链 ) # 整合到主链中(使用之前的 state_creation 示例) chain_with_branching = ( state_creation | RunnableLambda(complex_state_logic) # 设置 'priority', 'sentiment' 的逻辑 | branch ) # 示例调用: # input_text = "The feedback was overwhelmingly positive!" # result = chain_with_branching.invoke({"input": input_text}) # 应该路由到 positive_chain # print(result)状态流可视化了解状态如何传播很重要。图表有助于将其可视化。设想一个链,它提取用户意图,根据意图检索数据,然后生成响应,并传递意图状态。digraph G { bgcolor="transparent"; rankdir=LR; node [shape=box, style=filled, fillcolor="#e9ecef", fontname="Arial"]; edge [fontname="Arial", fontsize=10]; Start [label="输入\n{查询}", shape=ellipse, fillcolor="#a5d8ff"]; ExtractIntent [label="提取意图\n(提示词 + LLM)", fillcolor="#bac8ff"]; State1 [label="状态\n{查询, 意图}", shape=note, fillcolor="#ffec99"]; RetrieveDocs [label="检索文档\n(RAG)", fillcolor="#96f2d7"]; State2 [label="状态\n{查询, 意图, 文档}", shape=note, fillcolor="#ffec99"]; GenerateResponse [label="生成响应\n(提示词 + LLM)", fillcolor="#bac8ff"]; Output [label="最终响应", shape=ellipse, fillcolor="#a5d8ff"]; Start -> ExtractIntent [label="{查询}"]; ExtractIntent -> State1 [label="+ 意图"]; State1 -> RetrieveDocs [label="{查询, 意图}"]; RetrieveDocs -> State2 [label="+ 文档"]; State2 -> GenerateResponse [label="{查询, 意图, 文档}"]; GenerateResponse -> Output; }该图表展示了状态对象(由黄色便签表示)在通过链组件时如何累积信息(intent、docs)。每个组件都从上一步接收所需的状态。生产环境考量在生产应用中管理状态时:序列化: 如果状态需要在持久化(例如,在用户交互之间存储到数据库)或跨网络边界传输(例如,在分布式任务队列中),请确保你的状态对象(字典或自定义类)易于序列化(例如,为 JSON)。包含基本类型、列表和嵌套字典的标准 Python 字典通常是安全的。对复杂自定义对象要谨慎。复杂性管理: 具有复杂状态依赖的深度嵌套链可能变得难以调试和维护。清晰地组织你的状态字典。考虑将非常复杂的过程分解为多个更小、相互关联的链,可能由一个总体的协调器或代理来管理。并发问题: 在异步应用中(在 async-concurrency 部分会涉及),如果多个执行路径在没有适当同步的情况下并发修改同一个状态对象,可能会遇到导致状态不一致的竞态条件。LCEL 的默认行为在使用 RunnableParallel 等构造时通常避免直接修改共享对象,但在异步环境中实现显式修改共享状态的自定义 Runnables 或 Lambdas 时,需要谨慎。在高级场景中,可能需要不可变状态更新或细致的加锁。掌握状态管理对于发挥 LangChain 的全部潜力至关重要,用于构建精细的多步骤应用。通过利用 LCEL 的可组合性、直通机制、并行执行和条件分支,你可以设计出能处理复杂信息流和逻辑的工作流程,以满足你的特定生产需求。