虽然许多 LangChain 应用专注于交互式、实时响应,但并非所有任务都需要即时结果。处理大量数据、生成报告或进行大量分析通常得益于离线、批处理方法。这种策略与本章讨论的优化和扩展目标非常契合,使您能够最大化吞吐量,有效管理成本,并高效运用资源来完成不那么时间敏感的任务。批量处理是指在输入集合上顺序或并行执行 LangChain 管道(一个链、一个代理调用),通常在运行期间没有直接的用户交互。这与请求-响应模式形成对比,后者每个输入都会引发即时计算和响应。当满足以下条件时,采用批量处理特别有益:需要成本效益: 在非高峰时段进行计算或使用可能较慢、更便宜的 LLM 端点可以显著降低运营开支。吞吐量比延迟更重要: 对于像丰富大型数据集这样的任务,处理所有项目的总时间比处理单个项目所需的时间更重要。处理大型数据集: 处理数千或数百万条记录需要一种能够管理大规模单次实时请求的方法。资源调度: 批处理作业可以安排在计算资源可用时运行,从而平滑基础设施上的负载。批量处理的常见用途LangChain 组件可以很方便地应用于批量处理工作流中,用于各种离线任务:批量数据丰富: 想象一个大型客户评论数据库。批处理作业可以使用 LangChain 链来分类每条评论的情感,提取重要主题,或将其翻译成不同语言,将有价值的结构化信息添加回您的数据库。大规模文档分析: 处理整个研究论文库或法律文档,以生成摘要、提取实体、回答预定义问题或识别语料库中的趋势。合成数据生成: 通过 LangChain 使用 LLM 生成现有数据点的变体,或创建全新的示例来训练机器学习模型或增强测试套件。周期性报告: 通过查询多个数据源,汇总信息并使用 LLM 链对其进行结构化,自动生成每日、每周或每月报告。例如,将销售数据、支持工单趋势和市场新闻汇总成一份综合高管简报。离线评估运行: 如第 5 章所述,评估模型或链的性能通常涉及针对大型静态数据集运行它。批量处理很适合定期执行这些评估管道。实现批量处理工作流基本想法很简单:遍历您的输入数据并应用 LangChain 逻辑。然而,简单的实现可能缓慢且效率低下。基本顺序处理最直接的方法是使用简单的循环:import time from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 假设 'inputs' 是一个字典列表,例如:[{'topic': 'AI ethics'}, {'topic': 'quantum computing'}] # 假设 'llm' 和 'prompt' 已配置 # 示例链:为某个主题生成简短解释 prompt = ChatPromptTemplate.from_template("Explain the basics of {topic} in one sentence.") llm = ChatOpenAI(model="gpt-3.5-turbo") # 使用您的 API 密钥进行配置 chain = prompt | llm inputs = [ {"topic": "large language models"}, {"topic": "vector databases"}, {"topic": "prompt engineering"}, # ... 可能还有数千个 ] results = [] start_time = time.time() for item in inputs: try: result = chain.invoke(item) results.append(result.content) # 基本进度指示 if len(results) % 10 == 0: print(f"Processed {len(results)} items...") except Exception as e: print(f"Error processing item {item}: {e}") results.append(None) # 失败项的占位符 end_time = time.time() print(f"Sequential processing took: {end_time - start_time:.2f} seconds") # 'results' 现在包含生成的解释,或失败项的 None这种方法可行,但它逐个处理项目。如果每次 LLM 调用需要一秒钟,处理 1000 个项目将花费超过 16 分钟,外加任何开销。对于大型任务,这通常太慢。并行处理以提高速度由于大多数涉及 LLM 调用的 LangChain 操作是 I/O 密集型(等待网络响应),并行执行可以提供显著加速。Python 的 concurrent.futures 模块是管理线程或进程池的方便方式。对于像 API 调用这样的 I/O 密集型任务,ThreadPoolExecutor 通常很适合。LangChain 的表达式语言(LCEL)通常提供内置的并行执行方法,例如 .batch() 和 .map(),它们抽象化了一些样板代码。import time from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.runnables import RunnableConfig # 假设 'llm' 和 'prompt' 像之前一样已配置 # chain = prompt | llm (之前已定义) inputs = [ {"topic": "large language models"}, {"topic": "vector databases"}, {"topic": "prompt engineering"}, {"topic": "retrieval-augmented generation"}, {"topic": "agentic systems"}, # ... 可能还有数千个 ] start_time = time.time() # 使用 LCEL 的 .batch() 方法进行并行调用 # max_concurrency 控制并行线程的数量 try: # 注意:.batch() 中的错误处理可能因 LangChain 版本而异。 # 请查阅文档,了解异常如何聚合或处理的具体信息。 # 'return_exceptions=True' 参数可能很有帮助。 results_batch = chain.batch(inputs, config=RunnableConfig(max_concurrency=10), return_exceptions=True) # 筛选结果,将成功输出与异常分开 successful_results = [res.content for res in results_batch if not isinstance(res, Exception)] errors = [err for err in results_batch if isinstance(err, Exception)] print(f"Processed {len(inputs)} items. Success: {len(successful_results)}, Errors: {len(errors)}") if errors: print("Sample Error:", errors[0]) except Exception as e: # 捕获批量设置过程中可能出现的错误 print(f"An error occurred during batch processing setup: {e}") successful_results = [] # 确保即使批量处理早期失败,结果列表也存在 end_time = time.time() print(f"Batch processing took: {end_time - start_time:.2f} seconds") # 'successful_results' 包含成功调用的内容使用 .batch()(或 .map())通过并发运行多个调用,显著减少执行时间。max_concurrency 参数对于根据 API 速率限制和资源约束调整性能很重要。处理规模:分布式框架对于超出单机内存或计算能力限制的超大型数据集,可以考虑分布式计算框架:Ray: 一个用于构建分布式应用的开源框架。您可以将 LangChain 组件包装在 Ray 任务或 actor 中。Dask: 提供并行和分布式集合,模仿 NumPy 数组和 Pandas DataFrames,支持并行处理大型数据集。Apache Spark: 一个广泛用于大规模数据处理的引擎。LangChain 组件可以在 Spark UDF(用户定义函数)中应用,或使用连接 Spark 和 Python 执行的库,如 pyspark.sql.functions.pandas_udf。与这些系统的集成需要更多设置,但可以实现更大规模的处理。重要考量:速率限制和错误处理批量处理通常涉及快速连续地进行大量 API 调用。速率限制: LLM 提供商会强制执行速率限制(每分钟请求数、每分钟令牌数)。您的批量处理代码必须遵守这些限制。实施指数退避和重试逻辑(使用 tenacity 等库)以优雅地处理瞬时速率限制错误。根据您的 API 计划,仔细配置并发数(.batch() 中的 max_concurrency 或 ThreadPoolExecutor 中的 worker 数量)。错误处理: 批处理中的单个项目可能由于各种原因(无效输入、API 错误、超时)而失败。您的批处理作业应该有弹性。清晰地记录错误,识别是哪个输入导致了失败,并决定是存储失败的项目以便稍后重新处理还是跳过它们。在 .batch() 中使用 return_exceptions=True 是管理此问题的一种方法。优化批处理任务考虑以下优化点:成本追踪: 在批量运行期间密切监控令牌使用情况。记录每个项目的令牌计数,或使用 LangSmith(第 5 章)进行详细追踪。这有助于估算成本并找出意外昂贵的项目。模型选择: 使用最符合您批量任务质量要求的经济高效的 LLM。与复杂的推理任务相比,更简单、更便宜的模型可能足以应对批量分类。缓存: 如果相同的输入可能在不同的批处理运行中(甚至在同一次运行中)被多次处理,请为 LLM 响应实施缓存。LangChain 提供缓存集成(例如 InMemoryCache、SQLiteCache、RedisCache)。输入/输出效率: 优化加载输入数据和存储结果的方式。高效读取大型文件或使用优化的数据库查询可以影响整体作业持续时间。检查点: 对于运行时间很长的任务(数小时或数天),实施检查点。定期保存已处理项目和结果的状态。如果任务失败,可以从上一个检查点恢复,而不是从头开始,从而节省大量时间与成本。批量处理是扩展 LangChain 应用的强大技术。通过仔细选择您的实现策略(顺序、并行、分布式),管理 API 限制和错误,并优化成本和效率,您可以使用 LangChain 有效地处理大规模离线任务。这补充了您应用的实时功能,为各种生产场景提供了全面的工具包。