高效处理并发请求是服务大型语言模型(LLMs)时的一项重要挑战。与典型的无状态网络服务不同,LLM推理有其独有的特点:计算成本高昂: 在自回归序列中生成每个token都需要通过大型模型进行一次完整的正向传播。内存占用庞大: 模型参数占用大量GPU内存,并且为避免重复计算先前token的注意力而需要的中间键值(KV)缓存会随每个请求的序列长度增长。请求长度可变: 用户提交不同长度的提示,且需要不同长度的生成序列,这会导致每个请求的计算负载不均衡。简单地按顺序处理请求会导致极低的吞吐量和糟糕的硬件利用率,因为昂贵的GPU大部分时间处于空闲状态。为每个并发请求启动独立的模型实例成本高得令人望而却步,由于庞大的内存需求。因此,批处理技术对于优化吞吐量和成本非常重要。静态批处理最直接的方法是静态批处理。传入的请求会被收集,直到达到预设的批大小,或发生超时。服务器随后会将批次中所有序列进行填充,填充到最长序列的长度,并在一次正向传播中处理整个批次。优点:相比顺序处理,通过利用并行计算能力提升GPU利用率。相对简单易于实现。缺点:队头阻塞: 较快的请求可能不必要地等待较慢的请求完成或批次填满。计算资源浪费: 相当多的计算资源浪费在填充token上,特别是如果批次内序列长度差异很大。整个批次的处理时间由最长序列决定。GPU对填充token执行计算,但这些token最终会被丢弃。延迟增加: 批次形成和处理的等待时间增加了单个请求的延迟。考虑一个包含两个序列的批次:A(10个token)和 B(100个token)。使用静态批处理,序列 A 将被填充到 100 个token。GPU 会为两个序列的所有 100 个位置计算输出,即使序列 A 的 90 个位置只是填充。动态批处理动态批处理提供了一种改进,通过更灵活地形成批次。它不是等待固定的批大小,而是收集在短时间窗口内(例如 10 毫秒)到达的请求,并将它们动态地批处理在一起,直到达到最大批大小限制。digraph G { rankdir=LR; node [shape=box, style=filled, color="#ced4da", fontsize=12]; edge [color="#495057", fontsize=12]; subgraph cluster_0 { label = "传入请求"; bgcolor="#e9ecef"; R1 [label="请求 1 (t=0)"]; R2 [label="请求 2 (t=1)"]; R3 [label="请求 3 (t=3)"]; R4 [label="请求 4 (t=6)"]; R5 [label="请求 5 (t=7)"]; } subgraph cluster_1 { label = "批处理窗口(例如 5 毫秒)"; bgcolor="#e9ecef"; node [shape=box, style=filled, color="#a5d8ff"]; Batch1 [label="批次 1\n(R1, R2, R3)"]; Batch2 [label="批次 2\n(R4, R5)"]; } subgraph cluster_2 { label = "GPU 处理"; bgcolor="#e9ecef"; node [shape=parallelogram, style=filled, color="#b2f2bb"]; Proc1 [label="处理批次 1"]; Proc2 [label="处理批次 2"]; } R1 -> Batch1; R2 -> Batch1; R3 -> Batch1; R4 -> Batch2; R5 -> Batch2; Batch1 -> Proc1; Batch2 -> Proc2; } 在定义的时间窗口内到达的请求会被分组进行批处理。优点:通常比静态批处理具有更高的吞吐量,因为它能更好地适应变化的请求到达率。相比等待大型静态批次填满,它减少了平均等待时间。缺点:在批处理窗口内仍然容易发生队头阻塞。填充问题依然存在;计算浪费仍然相当可观,因为动态形成的批次中最长的序列仍然决定了该批次的整体处理时间。连续批处理(在途批处理)连续批处理(也称为在途批处理或迭代级批处理)是一种更先进的技术,在现代LLM服务框架中实现,例如 vLLM、Text Generation Inference (TGI) 和 NVIDIA Triton 的 TensorRT-LLM 后端。它从根本上改变了自回归解码期间批处理的执行方式。连续批处理不是将整个请求进行批处理,而是在单个生成步骤(迭代)的层面进行操作。其核心思路是:维护一个当前活跃请求(正在生成中的序列)池。在每一步,服务器会收集所有活跃序列,那些需要预测“下一个”token的序列。这些序列会形成一个批次,用于模型的一次正向传播。这个批次的大小可以在每一步中变化。新到达的请求可以立即添加到池中,如果GPU容量允许,并包含在“下一次”迭代的批次中。一旦序列生成完成(例如,生成了EOS token或达到最大长度),它会从活跃池中移除,释放资源。digraph G { rankdir=LR; node [shape=record, style=filled, color="#ced4da", fontsize=12]; edge [color="#495057", fontsize=12]; subgraph cluster_0 { label = "活跃序列(迭代 T)"; bgcolor="#e9ecef"; SeqA [label="{序列 A | 长度: 5 | 下一个 Token}", color="#a5d8ff"]; SeqB [label="{序列 B | 长度: 8 | 下一个 Token}", color="#a5d8ff"]; SeqC [label="{序列 C | 长度: 3 | 下一个 Token}", color="#a5d8ff"]; } subgraph cluster_1 { label = "迭代 T+1"; bgcolor="#e9ecef"; GPU [label="GPU 正向传播\n(批次: A, B, C)", shape=cylinder, color="#b2f2bb"]; SeqA1 [label="{序列 A | 长度: 6 | 下一个 Token}", color="#ffc9c9"]; SeqB1 [label="{序列 B | 长度: 9 | 下一个 Token}", color="#ffc9c9"]; SeqC1 [label="{序列 C | 长度: 4 | 下一个 Token}", color="#ffc9c9"]; NewReqD [label="{新请求 D | 长度: 1 | 已添加}", color="#ffe066"]; // New request added } subgraph cluster_2 { label = "迭代 T+2"; bgcolor="#e9ecef"; GPU2 [label="GPU 正向传播\n(批次: A, B, C, D)", shape=cylinder, color="#b2f2bb"]; // 表示 T+2 传递后的序列节点 } SeqA -> GPU; SeqB -> GPU; SeqC -> GPU; GPU -> SeqA1; GPU -> SeqB1; GPU -> SeqC1; NewReqD -> GPU2; // D 加入下一迭代 T+2 的批次 SeqA1 -> GPU2; SeqB1 -> GPU2; SeqC1 -> GPU2; {rank=same; SeqA; SeqB; SeqC;} {rank=same; SeqA1; SeqB1; SeqC1; NewReqD;} {rank=same; GPU; GPU2;} } 连续批处理在每次迭代中处理下一个token的生成,针对所有活跃序列,从而允许新请求高效加入。优点:最大化GPU利用率: 通过不断向GPU提供需要下一个token的活跃序列批次,使其持续进行有效计算。最小化填充浪费: 填充基本被消除,因为每个序列在每一步中都只处理到其当前长度。高吞吐量: 相比静态或动态批处理,显著增加单位时间内处理的请求数量。更低的平均延迟: 新请求通常可以几乎立即开始处理,无需等待批处理窗口结束或长序列完成。缺点:实现复杂性: 需要复杂的调度逻辑和内存管理。内存管理开销: 有效管理大量并发、动态变化的序列的KV缓存要求很高。并发内存管理:分页注意力(PagedAttention)连续批处理的有效性在很大程度上取决于高效的KV缓存管理。由于内存碎片化,将数百个并发序列的整个KV缓存连续存储通常不可行。vLLM 首创的 分页注意力(PagedAttention) 等技术解决了这个问题。受到操作系统中虚拟内存和分页机制的启发,PagedAttention 以非连续的固定大小块(页)来分配KV缓存。这使得能够:通过避免碎片化,在GPU内存中存储更多序列。通过将逻辑块映射到相同的物理块,更高效地在不同请求之间共享上下文(例如,用于束搜索或从相同提示进行并行采样)。尽管详细的实现很复杂,但理解这个原理可以说明内存管理与LLM服务中实现高并发息息相关。实现考量从头开始实现这些高级批处理策略是复杂的。幸运的是,专业的服务框架处理了大部分这种复杂性:NVIDIA Triton Inference Server 与 TensorRT-LLM 后端提供了优化的管道,集成了连续批处理和高效的内存管理。vLLM 是一个专门为快速LLM推理和服务设计的开源库,具有分页注意力(PagedAttention)和连续批处理功能。Hugging Face Text Generation Inference (TGI) 是另一个流行的开源方案,提供连续批处理和其他优化功能。配置这些系统时,你通常会遇到以下参数:max_num_batched_tokens:在单次迭代批处理中可以处理的最大token总数(所有序列的总和)。这有助于控制GPU内存使用。max_num_seqs:服务器可以处理的最大并发序列数。max_seq_len:支持的最大序列长度。KV缓存配置(例如,分配的GPU内存百分比)。这是一个高度简化的Python代码片段,说明了管理请求的核心循环逻辑,未详细说明KV缓存分页或框架具体细节:# Python 服务器代码片段(示意性——连续批处理思想) import torch import queue import threading import time import uuid # 假设 'model' 是一个已加载的LLM,支持逐步正向传播方法 # 假设 'tokenizer' 可用 request_queue = queue.Queue() # active_requests 存储状态: # {req_id: {'prompt_tokens', 'output_tokens', 'kv_cache', 'is_finished'}} active_requests = {} # result_store 存储已完成的输出:{req_id: '完整输出字符串'} result_store = {} MAX_CONCURRENT_REQUESTS = 16 # 基于内存/计算的限制 SCHEDULING_INTERVAL = 0.005 # 调度器运行的频率(5毫秒) def get_next_batch(): """ 收集准备好进行下一步推理的请求。 """ batch_req_ids = [] batch_input_tokens = [] batch_kv_caches = [] # 优先处理现有活跃请求 for req_id, state in list(active_requests.items()): # Iterate over a copy if not state['is_finished']: batch_req_ids.append(req_id) # 确定下一步要馈送的token if not state['output_tokens']: # 提示后的第一步 input_token = state['prompt_tokens'][:, -1:] else: # 后续步骤 input_token = state['output_tokens'][:, -1:] batch_input_tokens.append(input_token) batch_kv_caches.append(state['kv_cache']) # 可能初始为 None # 如果容量允许,添加新请求 available_slots = MAX_CONCURRENT_REQUESTS - len(batch_req_ids) for _ in range(available_slots): try: new_req_id, prompt_str = request_queue.get_nowait() prompt_tokens = tokenizer.encode( prompt_str, return_tensors="pt" ).to(model.device) active_requests[new_req_id] = { 'prompt_tokens': prompt_tokens, 'output_tokens': None, # 初始化输出 'kv_cache': None, # 初始化KV缓存 'is_finished': False } # 添加到当前批次进行其第一步 batch_req_ids.append(new_req_id) batch_input_tokens.append(prompt_tokens[:, -1:]) # 使用最后一个提示token batch_kv_caches.append(None) print(f"调度器:已添加新请求 {new_req_id}") except queue.Empty: break # 没有更多新请求在等待 return ( batch_req_ids, batch_input_tokens, batch_kv_caches ) def inference_scheduler_loop(): """ 调度和运行推理批次的主循环。 """ while True: start_time = time.time() # 1. 获取下一步请求 ( batch_req_ids, batch_input_tokens, batch_kv_caches ) = get_next_batch() if not batch_req_ids: time.sleep(SCHEDULING_INTERVAL) # 如果没有要处理的,则等待 continue # 2. 为模型准备批次 # (简化:如果模型需要,则需要适当的整理/填充) # 假设模型处理张量列表和KV缓存 # 简单连接不适用于没有填充的不同长度 # input_ids = torch.cat(batch_input_tokens, dim=0) # 3. 运行推理步骤(高度简化) # 这是实际模型正向传播发生的地方。 # 它接收当前输入token和过去的KV状态, # 返回下一个token的logits和更新后的KV状态。 # === 模拟实现 === logits = torch.randn( len(batch_req_ids), 1, tokenizer.vocab_size ).to(model.device) next_token_ids = torch.argmax(logits, dim=-1) # 形状:[批大小, 1] updated_kv_caches = [ f"kv_{req_id}_step_{len(active_requests[req_id].get('output_tokens',[]))}" for req_id in batch_req_ids ] # 模拟KV更新 # === 模拟结束 === print(f"调度器:已处理批次大小为 {len(batch_req_ids)} 的批次") # 4. 更新请求状态 finished_ids = [] for i, req_id in enumerate(batch_req_ids): state = active_requests[req_id] current_next_token = next_token_ids[i:i+1] # 保持形状 [1, 1] if state['output_tokens'] is None: state['output_tokens'] = current_next_token else: state['output_tokens'] = torch.cat( [state['output_tokens'], current_next_token], dim=1 ) state['kv_cache'] = updated_kv_caches[i] # 存储更新后的缓存 # 检查终止条件(EOS token或最大长度) # 简化:假设EOS token ID为2 if (current_next_token.item() == 2 or (state['output_tokens'] is not None and state['output_tokens'].shape[1] > 100)): state['is_finished'] = True full_sequence = torch.cat( [state['prompt_tokens'], state['output_tokens']], dim=1 ) result_store[req_id] = tokenizer.decode( full_sequence[0], skip_special_tokens=True ) finished_ids.append(req_id) print(f"调度器:请求 {req_id} 已完成") # 5. 从活跃池中清理已完成的请求 for req_id in finished_ids: if req_id in active_requests: del active_requests[req_id] # 确保循环大致以期望的间隔运行 elapsed_time = time.time() - start_time sleep_time = max(0, SCHEDULING_INTERVAL - elapsed_time) time.sleep(sleep_time) # --- 示例用法 --- # 在后台线程中启动调度器 # scheduler_thread = threading.Thread( # target=inference_scheduler_loop, daemon=True # ) # scheduler_thread.start() # 模拟传入请求 # req_id_1 = str(uuid.uuid4()) # request_queue.put(( # req_id_1, # "Explain the theory of relativity in simple terms:" # )) # req_id_2 = str(uuid.uuid4()) # request_queue.put(( # req_id_2, # "Write a short poem about autumn leaves:" # )) # 稍后,检查结果 # if req_id_1 in result_store: # print("结果 1:", result_store[req_id_1])综上所述,处理LLM服务的并发请求需要超越简单的静态或动态批处理。连续批处理,结合像分页注意力(PagedAttention)这样精密的内存管理,通过迭代处理所有活跃请求的生成步骤,提供了最高的吞吐量和最佳的资源利用率。使用专业的服务框架通常是在生产环境中实现这些高级技术最实际的方法。