趋近智
随着您的 LangChain 应用超越简单的问答或文本生成功能,您会经常遇到需要多步骤处理、条件逻辑或在不同执行阶段积累信息的情况。在这些复杂的链中,有效管理状态(即在链调用过程中持续并演变的数据)成为设计中的一个主要考虑。无状态执行(其中每个组件只接收前一个组件的直接输出)不足以应对精细的工作流程。
考虑一个应用:它首先分析用户情感,然后根据情感和查询检索相关文档,最后生成一个针对该情感的回复。第一步确定的情感需要供最终生成步骤使用,即使中间有一个文档检索步骤。这要求在主要数据流旁边传递状态信息。
在链中管理状态带来一些难题:
LangChain 表达式语言(LCEL)提供了一些机制和模式来解决这些难题,使您能够顺利构建有状态的复杂序列。
LCEL 的可组合性提供了处理状态的灵活方式。基本思路通常是通过链传递一个字典或自定义数据对象,各组件可以从该对象的特定键读取或写入。
RunnablePassthrough最常见的做法涉及传递字典。RunnablePassthrough 在此特别有用。它允许原始输入(或其选定部分)与并行计算的结果一同传递。通常,输入本身就是持有状态的字典。
您可以在 Runnable 上使用 .assign(**kwargs) 方法来向输出字典添加新键。这是一种简洁的方式,可以随着链的进展扩充状态。
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 假设 'llm' 是一个已初始化的 ChatOpenAI 实例
# 步骤 1:初步处理,可能提取实体
prompt1 = ChatPromptTemplate.from_template("Extract names from: {input}")
chain1 = prompt1 | llm | StrOutputParser()
# 步骤 2:将提取的名称(状态)与原始输入一起用于下一步
prompt2 = ChatPromptTemplate.from_template(
"Generate a greeting for {name} based on this context: {original_input}"
)
chain2 = prompt2 | llm | StrOutputParser()
# 组合链,传递原始输入并将 'name' 添加到状态字典中
# 此链的输入应为一个字典,例如 {"input": "John Doe visited Paris."}
complex_chain = RunnablePassthrough.assign(
name=chain1, # 运行 chain1,将其结果添加到 'name' 键下
original_input=lambda x: x["input"] # 显式地将 'input' 键作为 'original_input' 传递
) | chain2 # chain2 现在接收 {'name': 'John Doe', 'original_input': '...'}
# 调用示例:
# result = complex_chain.invoke({"input": "Alice went to the store."})
# print(result)
在此示例中,RunnablePassthrough.assign 运行 chain1 并将其输出以 name 键添加到字典中。它还显式地将原始 input 值作为一个新键 original_input 传递。随后的 chain2 就可以访问 name(由 chain1 添加的状态)和 original_input。
RunnableParallel 用于结构化状态RunnableParallel(通常通过链中的字典字面量语法使用)允许在相同输入(或其转换)上并发运行多个 Runnable,并将结果收集到一个字典中。这有助于明确地组织状态。
from langchain_core.runnables import RunnableParallel
from operator import itemgetter
# 步骤 1a:提取主题
prompt_topic = ChatPromptTemplate.from_template("What is the topic of: {input}?")
chain_topic = prompt_topic | llm | StrOutputParser()
# 步骤 1b:提取情感
prompt_sentiment = ChatPromptTemplate.from_template("What is the sentiment of: {input}?")
chain_sentiment = prompt_sentiment | llm | StrOutputParser()
# 步骤 2:使用主题和情感进行总结
prompt_summary = ChatPromptTemplate.from_template(
"Summarize this text: {original_input}\nFocusing on the topic: {topic}\nAdopt a {sentiment} tone."
)
chain_summary = prompt_summary | llm | StrOutputParser()
# 使用 RunnableParallel 组合以创建状态字典
state_creation = RunnableParallel(
topic=chain_topic,
sentiment=chain_sentiment,
original_input=itemgetter("input") # 传递原始输入字符串
)
full_chain = state_creation | chain_summary
# 调用示例:
# input_text = "The new product launch was a huge success, exceeding all expectations."
# result = full_chain.invoke({"input": input_text})
# print(result)
在这里,state_creation 同时确定主题和情感,将它们与原始输入一起打包成一个字典。这个字典随后成为 chain_summary 的输入。
对于更精细的状态逻辑,您可以使用 RunnableLambda 引入标准 Python 函数,或定义自定义 Runnable 类。这允许对状态对象进行任意计算和操作。
from langchain_core.runnables import RunnableLambda
def complex_state_logic(state_dict):
# 示例:根据中间结果修改状态
if "topic" in state_dict and "sentiment" in state_dict:
state_dict["priority"] = "High" if state_dict["sentiment"] == "Positive" else "Medium"
# ... 可能包含更复杂的逻辑 ...
return state_dict # 返回修改后的状态
# 假设使用前一个示例中的 state_creation 链
full_chain_with_custom_logic = (
state_creation
| RunnableLambda(complex_state_logic)
| chain_summary # chain_summary 现在可能在需要时使用 'priority'
)
# 调用示例:
# input_text = "Customer reported a critical bug. Needs urgent attention."
# result = full_chain_with_custom_logic.invoke({"input": input_text})
# print(result)
使用 RunnableLambda(或继承自 Runnable 的自定义类)提供了实现特定状态转换逻辑的最大灵活性,这些逻辑可能对于单独使用 assign 或 RunnableParallel 来说过于繁杂。
最直接的做法是在整个链中传递一个单一字典。每一步读取所需信息,并可能添加或更新键。
您可以使用 LCEL 的键值获取语法(itemgetter)或 RunnableLambda 函数,为特定组件只选择状态字典中必要的部分。这可以阻止组件访问或修改它们不需要的状态。
from operator import itemgetter
from langchain_core.runnables import RunnableConfig
# 假设 chain_a 需要 {'input': '...'},chain_b 需要 {'data': '...'}
# 状态字典可能为 {'input': '...', 'data': '...', 'temp': '...'}
scoped_chain = RunnableParallel(
result_a=itemgetter('input') | chain_a,
result_b=itemgetter('data') | chain_b
)
# 输出将是 {'result_a': ..., 'result_b': ...}
# 原始状态中的 'temp' 键在此处被有效丢弃/忽略。
# 调用示例
# state = {'input': 'some text', 'data': 'other data', 'temp': 123}
# result = scoped_chain.invoke(state, config=RunnableConfig(max_concurrency=5))
# print(result)
RunnableBranch 进行条件执行状态对于指导执行流程是必不可少的。RunnableBranch 允许您根据对输入评估的条件,将输入(包括状态字典)路由到不同的 Runnable。
from langchain_core.runnables import RunnableBranch
# 根据状态定义分支
# 条件函数检查状态字典
def check_if_urgent(state_dict):
return state_dict.get("priority") == "High"
def check_if_positive(state_dict):
return state_dict.get("sentiment") == "Positive"
# 假设 urgent_chain、positive_chain、default_chain 是已定义的 Runnable
branch = RunnableBranch(
(check_if_urgent, urgent_chain), # 如果紧急,运行此链
(check_if_positive, positive_chain), # 否则,如果积极,运行此链
default_chain # 否则,运行此链
)
# 集成到主链中(使用之前的 state_creation 示例)
chain_with_branching = (
state_creation
| RunnableLambda(complex_state_logic) # 设置 'priority'、'sentiment' 的逻辑
| branch
)
# 调用示例:
# input_text = "The feedback was overwhelmingly positive!"
# result = chain_with_branching.invoke({"input": input_text}) # 应该路由到 positive_chain
# print(result)
了解状态如何传播很有用。图表可以帮助将其可视化。考虑一个链,它提取用户意图,根据意图检索数据,然后生成回复,同时传递意图状态。
此图表显示了状态对象(由黄色便签表示)在通过链组件时如何累积信息(
intent、docs)。每个组件都从上一步接收所需的状态。
在生产应用中管理状态时:
async-concurrency 部分会提到),如果多个执行路径在没有适当同步的情况下并发修改相同的状态对象,您可能会遇到竞争条件,导致状态不一致。使用 RunnableParallel 等结构时,LCEL 的默认行为通常会避免直接修改共享对象,但在异步环境中实现显式修改共享状态的自定义 Runnable 或 Lambda 时需要小心处理。在高级场景中,可能需要不可变状态更新或谨慎的加锁。掌握状态管理是充分展现 LangChain 能力以构建精密复杂的多步骤应用的基本要求。通过 LCEL 的可组合性、透传机制、并行执行和条件分支,您可以设计出处理复杂信息流和逻辑的工作流程,量身定制以满足您特定的生产需求。
简洁的语法。内置调试功能。从第一天起就可投入生产。
为 ApX 背后的 AI 系统而构建
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造