趋近智
随着应用复杂程度和用户负载的增加,异步处理对于保持响应能力和吞吐量不可或缺。Python 的 asyncio 等框架使应用能够并发处理多个操作,例如处理用户请求或与外部 API 交互,而不会阻塞主执行线程。然而,引入并发会在管理对话内存时带来一些特定难题,对话内存通常依赖于共享状态。
当多个异步任务尝试同时读写同一个内存对象时,可能会遇到竞态条件和状态不一致的问题。设想同一用户的两个并发请求与聊天机器人交互。如果这两个请求都读取当前的对话历史,并基于此生成响应,然后尝试保存更新后的历史记录,那么最终状态可能只反映其中一个请求的更改,而丢失另一个请求的上下文。
在异步应用中管理内存的主要策略是,在可能的情况下,为每个并发执行上下文隔离内存状态。这通常意味着以每个请求或每个会话为基础管理内存,而不是为所有可能涉及不同用户或会话的并发操作使用单个共享内存对象。
对于使用 FastAPI 或 Starlette 等框架构建的许多 Web 应用或 API 服务,你可以采用 RunnableWithMessageHistory 模式。这使你能够定义一个单一的链结构,同时确保每个传入请求都根据会话 ID 检索和更新其独立的消息历史。
# 使用 FastAPI 的示例
from fastapi import FastAPI
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 全局初始化 LLM 和 Prompt
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的助手。"),
("placeholder", "{history}"),
("human", "{input}"),
])
chain = prompt | llm
app = FastAPI()
# 在生产应用中,请使用持久化存储(Redis、Postgres 等)
store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
# 从存储中检索 session_id 的历史记录
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
# 封装链以自动处理每个会话的消息历史记录
conversation_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
@app.post("/chat/{user_id}")
async def chat_endpoint(user_id: str, user_input: str):
# 使用会话配置调用链
# 这确保了并发请求之间的隔离
response = await conversation_with_history.ainvoke(
{"input": user_input},
config={"configurable": {"session_id": user_id}}
)
# 持久化由 get_session_history 工厂和历史对象方法处理
return {"response": response.content}
# 运行此代码(需要 FastAPI, Uvicorn, LangChain, OpenAI):
# pip install fastapi uvicorn langchain langchain-openai langchain-community python-dotenv
# 设置 OPENAI_API_KEY 环境变量
# 运行方式:uvicorn your_module_name:app --reload
在此模式中,get_session_history 从持久化存储(如 Redis 或数据库)获取特定用户的历史记录,并使用它初始化 ChatMessageHistory。在链执行 (conversation_with_history.ainvoke) 期间,系统会自动检索正确的历史记录,将其注入到提示中,并将新消息保存回存储。每个并发请求处理其自身的历史记录检索,从而防止直接干扰。
如果你使用的内存类型是由外部存储支持的,例如使用数据库的自定义实现,请确保底层客户端库支持异步操作。
asyncpg,MongoDB 的 motor)。在 LangChain 应用的异步代码中与这些存储交互时(例如,在自定义历史记录 aget_messages 或 aadd_messages 方法中),请使用 await 与异步客户端方法配合。如果异步客户端不可用,你可能需要使用 asyncio.to_thread 封装阻塞调用,以避免事件循环停滞,尽管这比原生异步支持效率更低。
虽然通常不推荐使用共享内存而更倾向于隔离,但如果你必须在多个异步任务之间使用共享历史对象(可能用于全局上下文或聚合统计),那么适当的同步是必要的。
asyncio.Lock: 你可以使用锁来确保一次只有一个任务可以访问或修改共享历史对象的关键部分。import asyncio
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage
# 警告:异步中的共享内存很复杂,通常不是最佳模式。
# 尽可能优先使用请求作用域内存。此示例仅作说明。
class AsyncSafeHistory:
def __init__(self):
self._history = ChatMessageHistory()
self._lock = asyncio.Lock()
async def add_user_message(self, message: str):
async with self._lock:
# 关键代码段:一次只有一个任务可以执行此部分
print(f"Task {asyncio.current_task().get_name()} acquired lock to add user msg.")
await asyncio.sleep(0.1) # 模拟工作
await self._history.aadd_message(HumanMessage(content=message))
print(f"Task {asyncio.current_task().get_name()} releasing lock.")
async def add_ai_message(self, message: str):
async with self._lock:
print(f"Task {asyncio.current_task().get_name()} acquired lock to add AI msg.")
await asyncio.sleep(0.1) # 模拟工作
await self._history.aadd_message(AIMessage(content=message))
print(f"Task {asyncio.current_task().get_name()} releasing lock.")
async def get_messages(self):
# 如果同时发生写入,读取也可能需要加锁
async with self._lock:
print(f"Task {asyncio.current_task().get_name()} acquired lock to read history.")
await asyncio.sleep(0.05) # 模拟工作
result = await self._history.aget_messages()
print(f"Task {asyncio.current_task().get_name()} releasing lock.")
return result
# 使用示例
async def worker(history_wrapper, task_id, message):
print(f"Task {task_id} started.")
await history_wrapper.add_user_message(f"User message from {task_id}: {message}")
current_messages = await history_wrapper.get_messages()
# 基于历史记录模拟 LLM 调用
await asyncio.sleep(0.2)
ai_response = f"AI response to {task_id}"
await history_wrapper.add_ai_message(ai_response)
print(f"Task {task_id} finished. History length: {len(current_messages)}")
async def main():
shared_safe_history = AsyncSafeHistory()
tasks = [
asyncio.create_task(worker(shared_safe_history, i, f"Hello from task {i}"), name=f"Worker-{i}")
for i in range(3)
]
await asyncio.gather(*tasks)
final_messages = await shared_safe_history.get_messages()
print("\nFinal History:")
for msg in final_messages:
print(msg.content)
# asyncio.run(main()) # 执行异步主函数
这种加锁机制可以防止竞态条件,但会引入潜在的瓶颈,因为任务可能不得不等待锁释放。过度使用锁会使执行序列化,并减少异步处理的优势。
思考当两个并发请求在没有适当处理的情况下尝试更新内存,与使用隔离的、请求作用域的内存时的工作流程。
说明了共享内存可能出现的竞态条件,与使用请求作用域内存实例的安全处理方式之间的对比。在第二种场景中,每个请求操作自己的副本,防止直接覆盖,尽管持久化存储中的最终组合状态受应用程序逻辑的影响,用于处理同一用户会话的并发更新(如果这可行)。
在异步 LangChain 应用中正确处理内存对于构建可靠和可伸缩的系统不可或缺。虽然并发提供了性能优势,但它也带来了与状态管理相关的难题。通常的模式是为每个请求或会话隔离内存实例,使用异步兼容的持久化存储来加载和保存历史记录。除非绝对必要,否则应避免在并发任务之间共享可变内存状态;如果必须共享,则应实施严格的加锁机制,并理解潜在的性能权衡。通过仔细考虑这些模式,你可以有效地将高级内存管理整合到你的高性能异步 LangChain 应用中。
简洁的语法。内置调试功能。从第一天起就可投入生产。
为 ApX 背后的 AI 系统而构建
这部分内容有帮助吗?
asyncio - Asynchronous I/O, Python Software Foundation, 2025 (Python Software Foundation) - 官方文档,提供了Python异步编程框架的基础知识,包括事件循环、任务以及锁等同步原语。async/await, Sebastián Ramírez, 2024 - FastAPI关于处理异步操作和上下文管理的文档,与在Web应用程序中实现请求范围内存直接相关。asyncio及其同步原语,这对于安全地管理共享状态很重要。© 2026 ApX Machine Learning用心打造