趋近智
许多LangChain应用侧重于交互式、实时响应,但并非所有任务都需要即时结果。处理大量数据、生成报告或进行广泛分析通常能从离线、面向批处理的方法中受益。这种策略对于优化和扩展LangChain应用十分有益,能够为非时间敏感型任务实现吞吐量最大化、有效管理成本和高效利用资源。
批量处理涉及对输入集合(例如链或智能体调用)顺序或并行地执行LangChain管道,通常在运行时不直接与用户交互。这与请求-响应模式不同,在请求-响应模式中,每个输入都会触发即时计算和响应。采用批量处理在以下情况中特别有利:
LangChain组件可以轻松应用于批处理工作流,用于各种离线任务:
基本思想很简单:遍历输入数据并应用LangChain逻辑。然而,简单的实现可能缓慢且效率低下。
最直接的方法是使用一个简单的循环:
import time
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 假设 'inputs' 是一个字典列表,例如 [{'topic': 'AI ethics'}, {'topic': 'quantum computing'}]
# 假设 'llm' 和 'prompt' 已配置
# 示例链:为主题生成一个简短解释
prompt = ChatPromptTemplate.from_template("Explain the basics of {topic} in one sentence.")
llm = ChatOpenAI(model="gpt-4o-mini") # 用您的API密钥配置
chain = prompt | llm
inputs = [
{"topic": "large language models"},
{"topic": "vector databases"},
{"topic": "prompt engineering"},
# ... 可能还有数千个
]
results = []
start_time = time.time()
for item in inputs:
try:
result = chain.invoke(item)
results.append(result.content)
# 基本进度指示
if len(results) % 10 == 0:
print(f"Processed {len(results)} items...")
except Exception as e:
print(f"Error processing item {item}: {e}")
results.append(None) # 失败项的占位符
end_time = time.time()
print(f"Sequential processing took: {end_time - start_time:.2f} seconds")
# 'results' 现在包含生成的解释或错误情况下的None
这可行,但会逐个处理项。如果每次LLM调用需要一秒,处理1000个项将耗时超过16分钟,外加任何开销。对于大型作业,这通常太慢了。
由于大多数涉及LLM调用的LangChain操作是I/O密集型(等待网络响应),并行执行可以显著提速。Python的concurrent.futures模块是管理线程池或进程池的便捷方法。对于API调用等I/O密集型任务,ThreadPoolExecutor通常适用。
LangChain的表达语言(LCEL)提供了用于并行执行的内置方法,例如.batch()和.map(),这些方法简化了部分样板代码。
import time
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
# 假设 'llm' 和 'prompt' 已像之前一样配置
# chain = prompt | llm (之前已定义)
inputs = [
{"topic": "large language models"},
{"topic": "vector databases"},
{"topic": "prompt engineering"},
{"topic": "retrieval-augmented generation"},
{"topic": "agentic systems"},
# ... 可能还有数千个
]
start_time = time.time()
# 使用LCEL的.batch()方法进行并行调用
# max_concurrency 控制并行线程的数量
try:
# 使用 return_exceptions=True 将错误捕获到结果列表中,而不是中止执行
results_batch = chain.batch(inputs, config=RunnableConfig(max_concurrency=10), return_exceptions=True)
# 过滤结果,将成功输出与异常分开
successful_results = [res.content for res in results_batch if not isinstance(res, Exception)]
errors = [err for err in results_batch if isinstance(err, Exception)]
print(f"Processed {len(inputs)} items. Success: {len(successful_results)}, Errors: {len(errors)}")
if errors:
print("Sample Error:", errors[0])
except Exception as e:
# 捕获批处理设置过程中可能出现的错误
print(f"An error occurred during batch processing setup: {e}")
successful_results = [] # 确保即使批处理提前失败,结果列表也存在
end_time = time.time()
print(f"Batch processing took: {end_time - start_time:.2f} seconds")
# 'successful_results' 包含成功调用的内容
使用.batch()(或.map())通过并发运行多个调用,可以显著减少执行时间。max_concurrency参数对于根据API速率限制和资源约束调整性能非常重要。
对于超出单个机器内存或计算能力的超大型数据集,请考虑分布式计算框架:
pyspark.sql.functions.pandas_udf。与这些系统的集成需要更多设置,但可以在更大规模上进行处理。
批量处理通常涉及快速连续地进行多次API调用。
tenacity等库)以优雅地处理瞬时速率限制错误。根据您的API计划仔细配置并发(.batch()中的max_concurrency,或ThreadPoolExecutor中的工作线程数)。.batch()中使用return_exceptions=True是管理此问题的一种方法。请考虑以下优化点:
gpt-4o-mini)可能足以进行批量分类。InMemoryCache、SQLiteCache、RedisCache)。批量处理是扩展LangChain应用的一种有效技术。通过仔细选择您的实现策略(顺序、并行、分布式)、管理API限制和错误,以及优化成本和效率,您可以使用LangChain有效处理大规模离线任务。这补充了您应用的实时能力,为各种生产场景提供了全面的工具集。
简洁的语法。内置调试功能。从第一天起就可投入生产。
为 ApX 背后的 AI 系统而构建
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造