趋近智
高效处理并发请求是服务大型语言模型(LLMs)时的一项重要挑战。与典型的无状态网络服务不同,LLM推理 (inference)有其独有的特点:
简单地按顺序处理请求会导致极低的吞吐量 (throughput)和糟糕的硬件利用率,因为昂贵的GPU大部分时间处于空闲状态。为每个并发请求启动独立的模型实例成本高得令人望而却步,由于庞大的内存需求。因此,批处理技术对于优化吞吐量和成本非常重要。
最直接的方法是静态批处理。传入的请求会被收集,直到达到预设的批大小,或发生超时。服务器随后会将批次中所有序列进行填充,填充到最长序列的长度,并在一次正向传播中处理整个批次。
优点:
缺点:
考虑一个包含两个序列的批次:A(10个token)和 B(100个token)。使用静态批处理,序列 A 将被填充到 100 个token。GPU 会为两个序列的所有 100 个位置计算输出,即使序列 A 的 90 个位置只是填充。
动态批处理提供了一种改进,通过更灵活地形成批次。它不是等待固定的批大小,而是收集在短时间窗口内(例如 10 毫秒)到达的请求,并将它们动态地批处理在一起,直到达到最大批大小限制。
在定义的时间窗口内到达的请求会被分组进行批处理。
优点:
缺点:
连续批处理(也称为在途批处理或迭代级批处理)是一种更先进的技术,在现代LLM服务框架中实现,例如 vLLM、Text Generation Inference (TGI) 和 NVIDIA Triton 的 TensorRT-LLM 后端。它从根本上改变了自回归 (autoregressive)解码期间批处理的执行方式。
连续批处理不是将整个请求进行批处理,而是在单个生成步骤(迭代)的层面进行操作。其核心思路是:
连续批处理在每次迭代中处理下一个token的生成,针对所有活跃序列,从而允许新请求高效加入。
优点:
缺点:
连续批处理的有效性在很大程度上取决于高效的KV缓存管理。由于内存碎片化,将数百个并发序列的整个KV缓存连续存储通常不可行。
vLLM 首创的 分页注意力(PagedAttention) 等技术解决了这个问题。受到操作系统中虚拟内存和分页机制的启发,PagedAttention 以非连续的固定大小块(页)来分配KV缓存。这使得能够:
尽管详细的实现很复杂,但理解这个原理可以说明内存管理与LLM服务中实现高并发息息相关。
从头开始实现这些高级批处理策略是复杂的。幸运的是,专业的服务框架处理了大部分这种复杂性:
配置这些系统时,你通常会遇到以下参数 (parameter):
max_num_batched_tokens:在单次迭代批处理中可以处理的最大token总数(所有序列的总和)。这有助于控制GPU内存使用。max_num_seqs:服务器可以处理的最大并发序列数。max_seq_len:支持的最大序列长度。这是一个高度简化的Python代码片段,说明了管理请求的核心循环逻辑,未详细说明KV缓存分页或框架具体细节:
# Python 服务器代码片段(示意性——连续批处理思想)
import torch
import queue
import threading
import time
import uuid
# 假设 'model' 是一个已加载的LLM,支持逐步正向传播方法
# 假设 'tokenizer' 可用
request_queue = queue.Queue()
# active_requests 存储状态:
# {req_id: {'prompt_tokens', 'output_tokens', 'kv_cache', 'is_finished'}}
active_requests = {}
# result_store 存储已完成的输出:{req_id: '完整输出字符串'}
result_store = {}
MAX_CONCURRENT_REQUESTS = 16 # 基于内存/计算的限制
SCHEDULING_INTERVAL = 0.005 # 调度器运行的频率(5毫秒)
def get_next_batch():
""" 收集准备好进行下一步推理的请求。 """
batch_req_ids = []
batch_input_tokens = []
batch_kv_caches = []
# 优先处理现有活跃请求
for req_id, state in list(active_requests.items()): # Iterate over a copy
if not state['is_finished']:
batch_req_ids.append(req_id)
# 确定下一步要馈送的token
if not state['output_tokens']: # 提示后的第一步
input_token = state['prompt_tokens'][:, -1:]
else: # 后续步骤
input_token = state['output_tokens'][:, -1:]
batch_input_tokens.append(input_token)
batch_kv_caches.append(state['kv_cache']) # 可能初始为 None
# 如果容量允许,添加新请求
available_slots = MAX_CONCURRENT_REQUESTS - len(batch_req_ids)
for _ in range(available_slots):
try:
new_req_id, prompt_str = request_queue.get_nowait()
prompt_tokens = tokenizer.encode(
prompt_str, return_tensors="pt"
).to(model.device)
active_requests[new_req_id] = {
'prompt_tokens': prompt_tokens,
'output_tokens': None, # 初始化输出
'kv_cache': None, # 初始化KV缓存
'is_finished': False
}
# 添加到当前批次进行其第一步
batch_req_ids.append(new_req_id)
batch_input_tokens.append(prompt_tokens[:, -1:]) # 使用最后一个提示token
batch_kv_caches.append(None)
print(f"调度器:已添加新请求 {new_req_id}")
except queue.Empty:
break # 没有更多新请求在等待
return (
batch_req_ids,
batch_input_tokens,
batch_kv_caches
)
def inference_scheduler_loop():
""" 调度和运行推理批次的主循环。 """
while True:
start_time = time.time()
# 1. 获取下一步请求
(
batch_req_ids,
batch_input_tokens,
batch_kv_caches
) = get_next_batch()
if not batch_req_ids:
time.sleep(SCHEDULING_INTERVAL) # 如果没有要处理的,则等待
continue
# 2. 为模型准备批次
# (简化:如果模型需要,则需要适当的整理/填充)
# 假设模型处理张量列表和KV缓存
# 简单连接不适用于没有填充的不同长度
# input_ids = torch.cat(batch_input_tokens, dim=0)
# 3. 运行推理步骤(高度简化)
# 这是实际模型正向传播发生的地方。
# 它接收当前输入token和过去的KV状态,
# 返回下一个token的logits和更新后的KV状态。
# === 模拟实现 ===
logits = torch.randn(
len(batch_req_ids), 1, tokenizer.vocab_size
).to(model.device)
next_token_ids = torch.argmax(logits, dim=-1) # 形状:[批大小, 1]
updated_kv_caches = [
f"kv_{req_id}_step_{len(active_requests[req_id].get('output_tokens',[]))}"
for req_id in batch_req_ids
] # 模拟KV更新
# === 模拟结束 ===
print(f"调度器:已处理批次大小为 {len(batch_req_ids)} 的批次")
# 4. 更新请求状态
finished_ids = []
for i, req_id in enumerate(batch_req_ids):
state = active_requests[req_id]
current_next_token = next_token_ids[i:i+1] # 保持形状 [1, 1]
if state['output_tokens'] is None:
state['output_tokens'] = current_next_token
else:
state['output_tokens'] = torch.cat(
[state['output_tokens'], current_next_token],
dim=1
)
state['kv_cache'] = updated_kv_caches[i] # 存储更新后的缓存
# 检查终止条件(EOS token或最大长度)
# 简化:假设EOS token ID为2
if (current_next_token.item() == 2 or
(state['output_tokens'] is not None and
state['output_tokens'].shape[1] > 100)):
state['is_finished'] = True
full_sequence = torch.cat(
[state['prompt_tokens'], state['output_tokens']],
dim=1
)
result_store[req_id] = tokenizer.decode(
full_sequence[0], skip_special_tokens=True
)
finished_ids.append(req_id)
print(f"调度器:请求 {req_id} 已完成")
# 5. 从活跃池中清理已完成的请求
for req_id in finished_ids:
if req_id in active_requests:
del active_requests[req_id]
# 确保循环大致以期望的间隔运行
elapsed_time = time.time() - start_time
sleep_time = max(0, SCHEDULING_INTERVAL - elapsed_time)
time.sleep(sleep_time)
# --- 示例用法 ---
# 在后台线程中启动调度器
# scheduler_thread = threading.Thread(
# target=inference_scheduler_loop, daemon=True
# )
# scheduler_thread.start()
# 模拟传入请求
# req_id_1 = str(uuid.uuid4())
# request_queue.put((
# req_id_1,
# "Explain the theory of relativity in simple terms:"
# ))
# req_id_2 = str(uuid.uuid4())
# request_queue.put((
# req_id_2,
# "Write a short poem about autumn leaves:"
# ))
# 稍后,检查结果
# if req_id_1 in result_store:
# print("结果 1:", result_store[req_id_1])
综上所述,处理LLM服务的并发请求需要超越简单的静态或动态批处理。连续批处理,结合像分页注意力(PagedAttention)这样精密的内存管理,通过迭代处理所有活跃请求的生成步骤,提供了最高的吞吐量 (throughput)和最佳的资源利用率。使用专业的服务框架通常是在生产环境中实现这些高级技术最实际的方法。
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•