随着用LangChain构建的应用变得更复杂且需要处理更多流量,同步执行常会成为一个主要瓶颈。许多核心操作,比如通过网络与大型语言模型(LLMs)互动、查询向量数据库或通过工具调用外部API,本质上都受I/O限制。在同步模型中,应用线程在这些操作期间会空闲等待,这会妨碍它处理其他请求或执行后台任务的能力。这会导致吞吐量降低,用户体验也变得不那么即时。异步编程,主要使用Python的asyncio库,提供了一种有效处理这些I/O密集型操作的方法。异步操作不会阻塞整个线程,而是在遇到等待时间(如等待网络响应)时将控制权交还给事件循环。事件循环随后可以运行其他任务,从而大幅提高资源利用率和整体应用性能。在LangChain中应用异步操作LangChain在设计时就考虑到了异步操作。大多数涉及潜在I/O等待的核心组件都为其标准同步方法提供了异步版本。约定很简单:如果一个组件有像invoke()这样的同步方法,其异步版本通常会被命名为ainvoke()。类似地,你会找到与batch()、stream()、call()和transform_documents()对应的abatch()、astream()、acall()(针对旧接口)和atransform_documents()。使用这些方法需要你的应用代码运用Python的async和await关键字。LangChain中asyncio的核心要点要有效使用LangChain的异步能力,需要对asyncio有基本的了解:async def: 这种语法将函数声明为协程。协程是可暂停和恢复的特殊函数。await: 在async def函数内部使用,await会暂停当前协程的执行,直到所等待的操作(通常是另一个协程或可等待对象)完成。暂停期间,事件循环可以运行其他任务。事件循环: asyncio的核心。它管理并分配不同异步任务的执行。asyncio.run(coroutine()): 一种启动asyncio事件循环并运行顶层协程直至完成的常用方法。asyncio.gather(*coroutines): 并发处理的重要函数。它接受多个协程(或可等待对象)作为参数,并并发运行它们。它会等待所有协程完成并返回它们运行结果的列表。实现异步调用让我们考虑一个简单的同步链,并将其转换为异步执行。# 同步示例 # 假设已完成提示、模型、解析器的设置 # from langchain_core.prompts import ChatPromptTemplate # from langchain_openai import ChatOpenAI # from langchain_core.output_parsers import StrOutputParser # prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}") # model = ChatOpenAI() # output_parser = StrOutputParser() # sync_chain = prompt | model | output_parser # result = sync_chain.invoke({"topic": "programmers"}) # print(result)要使其异步运行,只需在async def函数中使用ainvoke方法:import asyncio # 假设已完成提示、模型、解析器的设置(如上所示) # sync_chain = prompt | model | output_parser # 如果组件支持异步,LCEL链会自动支持异步 async def run_async_chain(): print("正在运行异步链...") # 使用 ainvoke 而不是 invoke result = await sync_chain.ainvoke({"topic": "数据科学家"}) print(result) # 如果模型支持流式传输: # print("流式响应:") # async for chunk in sync_chain.astream({"topic": "异步编程"}): # print(chunk, end="", flush=True) # print() async def main(): await run_async_chain() if __name__ == "__main__": # 启动 asyncio 事件循环 asyncio.run(main())注意,链定义本身(prompt | model | output_parser)保持不变。LangChain表达式语言(LCEL)会自动处理执行流程。如果你在LCEL链上调用ainvoke,它将尝试按顺序调用每个组件的ainvoke(或等效的异步)方法。如果一个组件只有同步的invoke方法,LCEL通常会在一个单独的线程池执行器中运行它,以避免阻塞主asyncio事件循环,尽管这会引入一些开销。为了获得最佳性能,请确保链中所有I/O密集型组件都具有原生异步支持。使用asyncio.gather实现并发异步的真正优势在于并发执行多个操作。例如,你需要使用多个不同提示来查询LLM,或同时从多个来源获取文档时,asyncio.gather便是解决此问题的工具。import asyncio import time # 假设已完成提示、模型、解析器和sync_chain的设置 async def call_llm(topic): # 模拟带延迟的LLM调用 print(f"开始LLM调用,主题为:{topic}") await asyncio.sleep(1.5) # 模拟网络延迟 # 在实际场景中: # result = await sync_chain.ainvoke({"topic": topic}) result = f"这是一个关于{topic}的笑话。" print(f"完成LLM调用,主题为:{topic}") return result async def run_concurrent_chains(): start_time = time.time() topics = ["猫", "狗", "鹦鹉"] # 创建协程对象列表 tasks = [call_llm(topic) for topic in topics] # 并发运行它们 results = await asyncio.gather(*tasks) end_time = time.time() print(f"\n所有结果:{results}") print(f"总耗时:{end_time - start_time:.2f} 秒") # 大约1.5秒 async def main(): await run_concurrent_chains() if __name__ == "__main__": asyncio.run(main()) 如果这些调用是同步进行的,总耗时将大约是1.5秒 * 3 = 4.5秒。而使用asyncio.gather,因为模拟的asyncio.sleep(代表I/O等待)允许其他任务运行,所以总耗时更接近于单个最长操作的持续时间(约1.5秒),外加少量开销。digraph G { rankdir=TB; node [shape=rect, style=filled, color="#ced4da", fontname="Arial"]; edge [fontname="Arial"]; subgraph cluster_sync { label = "同步执行"; style=dashed; bgcolor="#e9ecef"; s_start [label="开始", shape=ellipse, style=filled, color="#a5d8ff"]; s_op1 [label="操作 1 (等待 1.5秒)"]; s_op2 [label="操作 2 (等待 1.5秒)"]; s_op3 [label="操作 3 (等待 1.5秒)"]; s_end [label="结束 (总计约 4.5秒)", shape=ellipse, style=filled, color="#a5d8ff"]; s_start -> s_op1 [label="0.0s"]; s_op1 -> s_op2 [label="1.5s"]; s_op2 -> s_op3 [label="3.0s"]; s_op3 -> s_end [label="4.5s"]; } subgraph cluster_async { label = "异步执行 (asyncio.gather)"; style=dashed; bgcolor="#e9ecef"; a_start [label="开始", shape=ellipse, style=filled, color="#96f2d7"]; a_op1 [label="开始操作 1"]; a_op2 [label="开始操作 2"]; a_op3 [label="开始操作 3"]; a_wait [label="并发等待 (1.5秒)", shape=box3d, style=filled, color="#ffec99"]; a_end [label="结束 (总计约 1.5秒)", shape=ellipse, style=filled, color="#96f2d7"]; a_start -> a_op1 [label="0.0s"]; a_start -> a_op2 [label="0.0s"]; a_start -> a_op3 [label="0.0s"]; a_op1 -> a_wait; a_op2 -> a_wait; a_op3 -> a_wait; a_wait -> a_end [label="1.5s"]; } }三个独立的I/O密集型操作(每个耗时1.5秒)在同步执行与并发异步执行下的总耗时比较。异步流式传输对于支持流式传输的组件(主要是LLMs),astream()方法提供了一个异步迭代器。这使得你可以在响应块可用时立即处理它们,而不是等待整个响应生成完毕。这对于聊天机器人等面向用户的应用尤其适用,能提升感知响应速度。import asyncio # 假设已完成提示、模型、解析器和sync_chain的设置 async def stream_joke(): print("正在异步流式传输一个笑话:") async for chunk in sync_chain.astream({"topic": "异步编程"}): # 在每个数据块到达时进行处理(例如,打印到控制台,通过WebSocket发送) print(chunk, end="", flush=True) print("\n--- 流式传输结束 ---") async def main(): await stream_joke() if __name__ == "__main__": asyncio.run(main())使用abatch进行批处理与batch()类似,abatch()方法允许并发处理多个输入。如果底层组件(例如LLM提供商的API)支持,LangChain会尝试在单个批处理请求中将这些输入传递给它,这可能比使用多个ainvoke调用配合asyncio.gather更高效,并减少API调用开销。import asyncio import time # 假设已完成提示、模型、解析器和sync_chain的设置 async def run_batch_chains(): start_time = time.time() topics = ["AI伦理", "量子计算", "无服务器架构"] inputs = [{"topic": t} for t in topics] print(f"正在为 {len(inputs)} 个输入运行批处理...") # 使用 abatch 进行并发处理 results = await sync_chain.abatch(inputs) end_time = time.time() print(f"\n批处理结果:{results}") print(f"总耗时:{end_time - start_time:.2f} 秒") # 取决于后端批处理效率 async def main(): await run_batch_chains() if __name__ == "__main__": asyncio.run(main())abatch相对于asyncio.gather的性能优势,很大程度上取决于底层服务(LLM API、嵌入模型API等)是否真的针对批处理请求进行了优化。生产环境的重要注意事项错误处理: 使用asyncio.gather时,如果一个协程引发异常,gather会立即传播该异常,这可能会取消其他正在进行的任务(取决于return_exceptions参数)。你需要在gather调用周围进行错误处理,例如在每个协程内部使用try...except块,或者在return_exceptions=True时检查返回结果。并发限制: 同时运行过多的异步操作可能会使下游服务(LLM API、数据库)不堪重负,或耗尽本地资源(套接字、内存)。使用asyncio.Semaphore等机制来限制访问特定资源的并发任务数量。调试: 由于非线性的执行流程,调试异步代码可能比同步代码更具挑战性。LangSmith等工具对于跟踪异步链和代理步骤的执行路径非常有价值。Python内置的asyncio调试工具也很有帮助。同步-异步集成: 尽管纯异步通常是I/O密集型应用的理想选择,但你可能需要与同步库集成。asyncio提供了loop.run_in_executor()等机制,可在单独的线程池中运行阻塞的同步代码,而不会阻塞主事件循环,尽管这会增加开销。事件循环策略(高级): 在某些环境中(如Jupyter notebooks或特定的Web框架),你可能需要了解asyncio的事件循环策略或显式管理循环生命周期。通过理解和应用异步模式,你可以构建出性能更优、可扩展性更强、响应更快的LangChain应用,这些都是生产就绪应用的重要特点。熟练使用ainvoke、astream、abatch和asyncio.gather为在LangChain框架内高效处理并发操作提供了支持。