随着您的 LangChain 应用超越简单的问答或文本生成功能,您会经常遇到需要多步骤处理、条件逻辑或在不同执行阶段积累信息的情况。在这些复杂的链中,有效管理状态(即在链调用过程中持续并演变的数据)成为设计中的一个主要考虑。无状态执行(其中每个组件只接收前一个组件的直接输出)不足以应对精细的工作流程。考虑一个应用:它首先分析用户情感,然后根据情感和查询检索相关文档,最后生成一个针对该情感的回复。第一步确定的情感需要供最终生成步骤使用,即使中间有一个文档检索步骤。这要求在主要数据流旁边传递状态信息。状态管理中的难题在链中管理状态带来一些难题:信息传递: 如何将早期步骤中计算出的信息传给后续的步骤,同时跳过不需要它的中间组件?数据累积: 如何将多个步骤的结果或中间计算收集起来并一同提供,供最终的综合步骤使用?条件逻辑: 链的执行路径如何根据计算出的状态动态改变(例如,路由到不同的子链)?清晰度和调试: 随着状态管理变得更为繁杂,如何保持易懂性并更方便地追踪状态在执行过程中如何变化?LangChain 表达式语言(LCEL)提供了一些机制和模式来解决这些难题,使您能够顺利构建有状态的复杂序列。LCEL 的主要状态机制LCEL 的可组合性提供了处理状态的灵活方式。基本思路通常是通过链传递一个字典或自定义数据对象,各组件可以从该对象的特定键读取或写入。使用字典和 RunnablePassthrough最常见的做法涉及传递字典。RunnablePassthrough 在此特别有用。它允许原始输入(或其选定部分)与并行计算的结果一同传递。通常,输入本身就是持有状态的字典。您可以在 Runnable 上使用 .assign(**kwargs) 方法来向输出字典添加新键。这是一种简洁的方式,可以随着链的进展扩充状态。from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate # 假设 'llm' 是一个已初始化的 ChatOpenAI 实例 # 步骤 1:初步处理,可能提取实体 prompt1 = ChatPromptTemplate.from_template("Extract names from: {input}") chain1 = prompt1 | llm | StrOutputParser() # 步骤 2:将提取的名称(状态)与原始输入一起用于下一步 prompt2 = ChatPromptTemplate.from_template( "Generate a greeting for {name} based on this context: {original_input}" ) chain2 = prompt2 | llm | StrOutputParser() # 组合链,传递原始输入并将 'name' 添加到状态字典中 # 此链的输入应为一个字典,例如 {"input": "John Doe visited Paris."} complex_chain = RunnablePassthrough.assign( name=chain1, # 运行 chain1,将其结果添加到 'name' 键下 original_input=lambda x: x["input"] # 显式地将 'input' 键作为 'original_input' 传递 ) | chain2 # chain2 现在接收 {'name': 'John Doe', 'original_input': '...'} # 调用示例: # result = complex_chain.invoke({"input": "Alice went to the store."}) # print(result)在此示例中,RunnablePassthrough.assign 运行 chain1 并将其输出以 name 键添加到字典中。它还显式地将原始 input 值作为一个新键 original_input 传递。随后的 chain2 就可以访问 name(由 chain1 添加的状态)和 original_input。RunnableParallel 用于结构化状态RunnableParallel(通常通过链中的字典字面量语法使用)允许在相同输入(或其转换)上并发运行多个 Runnable,并将结果收集到一个字典中。这有助于明确地组织状态。from langchain_core.runnables import RunnableParallel from operator import itemgetter # 步骤 1a:提取主题 prompt_topic = ChatPromptTemplate.from_template("What is the topic of: {input}?") chain_topic = prompt_topic | llm | StrOutputParser() # 步骤 1b:提取情感 prompt_sentiment = ChatPromptTemplate.from_template("What is the sentiment of: {input}?") chain_sentiment = prompt_sentiment | llm | StrOutputParser() # 步骤 2:使用主题和情感进行总结 prompt_summary = ChatPromptTemplate.from_template( "Summarize this text: {original_input}\nFocusing on the topic: {topic}\nAdopt a {sentiment} tone." ) chain_summary = prompt_summary | llm | StrOutputParser() # 使用 RunnableParallel 组合以创建状态字典 state_creation = RunnableParallel( topic=chain_topic, sentiment=chain_sentiment, original_input=itemgetter("input") # 传递原始输入字符串 ) full_chain = state_creation | chain_summary # 调用示例: # input_text = "The new product launch was a huge success, exceeding all expectations." # result = full_chain.invoke({"input": input_text}) # print(result)在这里,state_creation 同时确定主题和情感,将它们与原始输入一起打包成一个字典。这个字典随后成为 chain_summary 的输入。自定义函数和 Runnable对于更精细的状态逻辑,您可以使用 RunnableLambda 引入标准 Python 函数,或定义自定义 Runnable 类。这允许对状态对象进行任意计算和操作。from langchain_core.runnables import RunnableLambda def complex_state_logic(state_dict): # 示例:根据中间结果修改状态 if "topic" in state_dict and "sentiment" in state_dict: state_dict["priority"] = "High" if state_dict["sentiment"] == "Positive" else "Medium" # ... 可能包含更复杂的逻辑 ... return state_dict # 返回修改后的状态 # 假设使用前一个示例中的 state_creation 链 full_chain_with_custom_logic = ( state_creation | RunnableLambda(complex_state_logic) | chain_summary # chain_summary 现在可能在需要时使用 'priority' ) # 调用示例: # input_text = "Customer reported a critical bug. Needs urgent attention." # result = full_chain_with_custom_logic.invoke({"input": input_text}) # print(result)使用 RunnableLambda(或继承自 Runnable 的自定义类)提供了实现特定状态转换逻辑的最大灵活性,这些逻辑可能对于单独使用 assign 或 RunnableParallel 来说过于繁杂。状态管理的方法与做法集中式状态字典最直接的做法是在整个链中传递一个单一字典。每一步读取所需信息,并可能添加或更新键。优点: 容易理解基本流程;所有状态都集中在一处。缺点: 字典可能变得庞大且难以处理;组件可能通过共享键隐式耦合;更难追踪哪个组件修改了哪个状态部分。使用选择的作用域状态您可以使用 LCEL 的键值获取语法(itemgetter)或 RunnableLambda 函数,为特定组件只选择状态字典中必要的部分。这可以阻止组件访问或修改它们不需要的状态。from operator import itemgetter from langchain_core.runnables import RunnableConfig # 假设 chain_a 需要 {'input': '...'},chain_b 需要 {'data': '...'} # 状态字典可能为 {'input': '...', 'data': '...', 'temp': '...'} scoped_chain = RunnableParallel( result_a=itemgetter('input') | chain_a, result_b=itemgetter('data') | chain_b ) # 输出将是 {'result_a': ..., 'result_b': ...} # 原始状态中的 'temp' 键在此处被有效丢弃/忽略。 # 调用示例 # state = {'input': 'some text', 'data': 'other data', 'temp': 123} # result = scoped_chain.invoke(state, config=RunnableConfig(max_concurrency=5)) # print(result)使用 RunnableBranch 进行条件执行状态对于指导执行流程是必不可少的。RunnableBranch 允许您根据对输入评估的条件,将输入(包括状态字典)路由到不同的 Runnable。from langchain_core.runnables import RunnableBranch # 根据状态定义分支 # 条件函数检查状态字典 def check_if_urgent(state_dict): return state_dict.get("priority") == "High" def check_if_positive(state_dict): return state_dict.get("sentiment") == "Positive" # 假设 urgent_chain、positive_chain、default_chain 是已定义的 Runnable branch = RunnableBranch( (check_if_urgent, urgent_chain), # 如果紧急,运行此链 (check_if_positive, positive_chain), # 否则,如果积极,运行此链 default_chain # 否则,运行此链 ) # 集成到主链中(使用之前的 state_creation 示例) chain_with_branching = ( state_creation | RunnableLambda(complex_state_logic) # 设置 'priority'、'sentiment' 的逻辑 | branch ) # 调用示例: # input_text = "The feedback was overwhelmingly positive!" # result = chain_with_branching.invoke({"input": input_text}) # 应该路由到 positive_chain # print(result)状态流的可视化了解状态如何传播很有用。图表可以帮助将其可视化。考虑一个链,它提取用户意图,根据意图检索数据,然后生成回复,同时传递意图状态。digraph G { bgcolor="transparent"; rankdir=LR; node [shape=box, style=filled, fillcolor="#e9ecef", fontname="Arial"]; edge [fontname="Arial", fontsize=10]; Start [label="输入\n{query}", shape=ellipse, fillcolor="#a5d8ff"]; ExtractIntent [label="提取意图\n(提示词 + LLM)", fillcolor="#bac8ff"]; State1 [label="状态\n{query, intent}", shape=note, fillcolor="#ffec99"]; RetrieveDocs [label="检索文档\n(RAG)", fillcolor="#96f2d7"]; State2 [label="状态\n{query, intent, docs}", shape=note, fillcolor="#ffec99"]; GenerateResponse [label="生成回复\n(提示词 + LLM)", fillcolor="#bac8ff"]; Output [label="最终回复", shape=ellipse, fillcolor="#a5d8ff"]; Start -> ExtractIntent [label="{query}"]; ExtractIntent -> State1 [label="+ intent"]; State1 -> RetrieveDocs [label="{query, intent}"]; RetrieveDocs -> State2 [label="+ docs"]; State2 -> GenerateResponse [label="{query, intent, docs}"]; GenerateResponse -> Output; }此图表显示了状态对象(由黄色便签表示)在通过链组件时如何累积信息(intent、docs)。每个组件都从上一步接收所需的状态。生产环境注意事项在生产应用中管理状态时:序列化: 如果状态需要持久化(例如,在用户交互之间存储到数据库中)或跨网络边界发送(例如,在分布式任务队列中),请确保您的状态对象(字典或自定义类)易于序列化(例如,为 JSON)。包含原始类型、列表和嵌套字典的标准 Python 字典通常是安全的。对于复杂的自定义对象要小心处理。复杂情况管理: 具有精细状态依赖的深层嵌套链可能变得难以调试和维护。清晰地组织您的状态字典。考虑将非常繁杂的流程分解为多个更小、相互关联的链,可能由一个总体的协调器或代理来管理。并发问题: 在异步应用中(在 async-concurrency 部分会提到),如果多个执行路径在没有适当同步的情况下并发修改相同的状态对象,您可能会遇到竞争条件,导致状态不一致。使用 RunnableParallel 等结构时,LCEL 的默认行为通常会避免直接修改共享对象,但在异步环境中实现显式修改共享状态的自定义 Runnable 或 Lambda 时需要小心处理。在高级场景中,可能需要不可变状态更新或谨慎的加锁。掌握状态管理是充分展现 LangChain 能力以构建精密复杂的多步骤应用的基本要求。通过 LCEL 的可组合性、透传机制、并行执行和条件分支,您可以设计出处理复杂信息流和逻辑的工作流程,量身定制以满足您特定的生产需求。