流水线并行 (PP) 通过垂直划分整个模型来分担大型语言模型的计算负荷。这种方法将连续的层分配给不同设备,形成类似装配线的处理流水线。与复制模型的数据并行或拆分层内单一操作的张量并行不同,流水线并行提供了一种独特的模型分发策略。设想一个包含许多层的大型Transformer模型。与试图将所有层放入一个设备或在层内拆分复杂的矩阵乘法不同,流水线并行将例如第1-12层分配给GPU 0,第13-24层分配给GPU 1,第25-36层分配给GPU 2,以此类推。在单个设备上执行的每组层被称为一个阶段或分区。流水线处理的机制在流水线并行设置中,数据批次首先被分解成更小的微批次。这对于使流水线阶段有效发挥作用很重要,我们很快就会看到这一点。此过程如下进行:前向传播: 微批次1进入阶段0(GPU 0)。在阶段0的层处理完毕后,产生的激活值被发送到阶段1(GPU 1)。阶段1处理激活值并将其输出发送到阶段2,这个过程一直持续到最后一个阶段计算出微批次1的输出。重要的是,一旦阶段0完成微批次1的处理,它就可以立即开始处理微批次2。反向传播: 一旦为一个微批次计算出损失(在它通过所有阶段后),反向传播就开始了。梯度从最后一个阶段开始计算并反向传播。阶段N计算梯度并将所需的梯度信息发送回阶段N-1。阶段N-1使用这些信息,计算自己的梯度,然后将梯度发送回阶段N-2,以此类推,直到梯度到达阶段0。这种流程使得不同设备可以同时处理不同的微批次,实现了模型深度方向的并行计算。digraph G { rankdir=TB; // 设置全局字体大小 fontsize=12; // 节点设置,带有明确的字体大小 node [shape=box, style=filled, fontname="sans-serif", color="#ced4da", fillcolor="#e9ecef", fontsize=12]; // 边设置,带有明确的字体大小 edge [fontname="sans-serif", fontsize=12]; subgraph cluster_0 { label = "GPU 0 (阶段 0)"; fontsize=12; // 明确设置集群标签字体大小 style=filled; color="#dee2e6"; layers0 [label="第1-12层"]; } subgraph cluster_1 { label = "GPU 1 (阶段 1)"; fontsize=12; // 明确设置集群标签字体大小 style=filled; color="#dee2e6"; layers1 [label="第13-24层"]; } subgraph cluster_2 { label = "GPU 2 (阶段 2)"; fontsize=12; // 明确设置集群标签字体大小 style=filled; color="#dee2e6"; layers2 [label="第25-36层"]; } subgraph cluster_3 { label = "GPU 3 (阶段 3)"; fontsize=12; // 明确设置集群标签字体大小 style=filled; color="#dee2e6"; layers3 [label="第37-48层"]; } Input [label="微批次", shape=ellipse, color="#a5d8ff", fillcolor="#e7f5ff"]; Output [label="输出/损失", shape=ellipse, color="#a5d8ff", fillcolor="#e7f5ff"]; Input -> layers0 [label="前向"]; layers0 -> layers1 [label="前向 (激活值)"]; layers1 -> layers2 [label="前向 (激活值)"]; layers2 -> layers3 [label="前向 (激活值)"]; layers3 -> Output [label="前向"]; Output -> layers3 [label="反向 (梯度)", style=dashed, color="#f03e3e"]; layers3 -> layers2 [label="反向 (梯度)", style=dashed, color="#f03e3e"]; layers2 -> layers1 [label="反向 (梯度)", style=dashed, color="#f03e3e"]; layers1 -> layers0 [label="反向 (梯度)", style=dashed, color="#f03e3e"]; }一个四阶段流水线,显示了GPU上的前向(fwd)激活流和反向(bwd)梯度流。流水线气泡流水线并行中的一个重要问题是流水线气泡或空闲时间。在处理批次开始时,只有阶段0是活跃的。阶段1必须等待阶段0完成第一个微批次,阶段2必须等待阶段1,以此类推。同样,在反向传播期间,初始阶段会因等待来自后续阶段的梯度而变得空闲。这种启动和结束期间会导致硬件利用率不足。这个气泡的大小取决于流水线阶段数 ($S$) 和微批次数 ($M$)。如果每个微批次通过一个阶段大约需要相同的时间 ($t$),那么简单的前向-后向顺序调度,其前向传播的总时间近似为 $T \approx (S + M - 1) \times t$,反向传播也类似。总有效工作量是 $S \times M \times t$。效率(设备忙碌时间所占比例)大约是 $SM / (S(S+M-1))$,对于较大的 $S$,这可以简化为 $M/(M+S-1)$。气泡部分(空闲时间)近似为 $(S-1)/(M+S-1)$。为了减少气泡,我们需要相对于阶段数($S$)增加微批次数($M$)。然而,增加$M$意味着更小的微批次,这可能无法充分发挥每个GPU的计算能力,并且还会增加所有正在处理的微批次所需的总激活内存。流水线调度为了减轻气泡问题,已经开发出各种调度策略,超越了简单的“所有前向,然后所有反向”方法(通常与GPipe相关)。一种常见且有效的策略是**1F1B(一次前向,一次反向)**调度,它通过PipeDream等框架而广为人知。在1F1B调度中,阶段轮流执行即将到来的微批次的前向传播和已完成微批次的反向传播。一旦一个阶段完成微批次 i 的前向传播,它可能会立即执行微批次 i-k 的反向传播(其中 k 与阶段数有关),前提是下一阶段的梯度已可用。这使得设备更忙碌,与简单调度相比,显著减少了空闲时间气泡。// 简单调度图示 digraph NaiveSchedule { rankdir=TD; // 从上到下布局 node [shape=rect, style=filled, fontname="sans-serif", fontsize=22, width=1.5, height=0.5]; edge [fontname="sans-serif", fontsize=22]; // --- 简单调度子图 --- subgraph cluster_naive { label = "简单调度(类似GPipe)"; fontsize=24; // 略大一点的集群标签字体大小 bgcolor="#f8f9fa"; node [color="#adb5bd", style="filled"]; // 确保节点被填充 // 时间轴行 (Rank 0) { rank = same; T0 [label="时间 0", style=filled, fillcolor="#ced4da"]; // 给时间轴背景色 T1 [label="1", style=filled, fillcolor="#ced4da"]; T2 [label="2", style=filled, fillcolor="#ced4da"]; T3 [label="3", style=filled, fillcolor="#ced4da"]; T4 [label="4", style=filled, fillcolor="#ced4da"]; T5 [label="5", style=filled, fillcolor="#ced4da"]; T6 [label="6", style=filled, fillcolor="#ced4da"]; T7 [label="7", style=filled, fillcolor="#ced4da"]; T8 [label="8", style=filled, fillcolor="#ced4da"]; } T0 -> T1 -> T2 -> T3 -> T4 -> T5 -> T6 -> T7 -> T8 [style=invis]; // 水平对齐 // GPU 0 行 (Rank 1) { rank=same; S0_label [label="GPU 0", shape=plaintext, fontcolor="black"]; S0_T0 [label="前向 0", fillcolor="#74c0fc"]; S0_T1 [label="前向 1", fillcolor="#74c0fc"]; S0_T2 [label="前向 2", fillcolor="#74c0fc"]; S0_T3 [label="前向 3", fillcolor="#74c0fc"]; S0_T4 [label="空闲", fillcolor="#e9ecef"]; S0_T5 [label="反向 3", fillcolor="#ffa8a8"]; S0_T6 [label="反向 2", fillcolor="#ffa8a8"]; S0_T7 [label="反向 1", fillcolor="#ffa8a8"]; S0_T8 [label="反向 0", fillcolor="#ffa8a8"]; } S0_label -> S0_T0 [style=invis]; // 标签与第一个块对齐 S0_T0 -> S0_T1 -> S0_T2 -> S0_T3 -> S0_T4 -> S0_T5 -> S0_T6 -> S0_T7 -> S0_T8 [style=invis]; // GPU 1 行 (Rank 2) { rank=same; S1_label [label="GPU 1", shape=plaintext, fontcolor="black"]; S1_T0 [label="空闲", fillcolor="#e9ecef"]; S1_T1 [label="前向 0", fillcolor="#74c0fc"]; S1_T2 [label="前向 1", fillcolor="#74c0fc"]; S1_T3 [label="前向 2", fillcolor="#74c0fc"]; S1_T4 [label="前向 3", fillcolor="#74c0fc"]; S1_T5 [label="空闲", fillcolor="#e9ecef"]; S1_T6 [label="反向 3", fillcolor="#ffa8a8"]; S1_T7 [label="反向 2", fillcolor="#ffa8a8"]; S1_T8 [label="反向 1", fillcolor="#ffa8a8"]; } S1_label -> S1_T0 [style=invis]; S1_T0 -> S1_T1 -> S1_T2 -> S1_T3 -> S1_T4 -> S1_T5 -> S1_T6 -> S1_T7 -> S1_T8 [style=invis]; // GPU 2 行 (Rank 3) { rank=same; S2_label [label="GPU 2", shape=plaintext, fontcolor="black"]; S2_T0 [label="空闲", fillcolor="#e9ecef"]; S2_T1 [label="空闲", fillcolor="#e9ecef"]; S2_T2 [label="前向 0", fillcolor="#74c0fc"]; S2_T3 [label="前向 1", fillcolor="#74c0fc"]; S2_T4 [label="前向 2", fillcolor="#74c0fc"]; S2_T5 [label="前向 3", fillcolor="#74c0fc"]; S2_T6 [label="空闲", fillcolor="#e9ecef"]; S2_T7 [label="反向 3", fillcolor="#ffa8a8"]; S2_T8 [label="反向 2", fillcolor="#ffa8a8"]; } S2_label -> S2_T0 [style=invis]; S2_T0 -> S2_T1 -> S2_T2 -> S2_T3 -> S2_T4 -> S2_T5 -> S2_T6 -> S2_T7 -> S2_T8 [style=invis]; // 用于行垂直对齐的不可见边 T0 -> S0_T0 [style=invis]; S0_T0 -> S1_T0 [style=invis]; S1_T0 -> S2_T0 [style=invis]; } } // 交错式1F1B调度图示 digraph InterleavedSchedule { rankdir=TD; // 从上到下布局 node [shape=rect, style=filled, fontname="sans-serif", fontsize=22, width=1.5, height=0.5]; edge [fontname="sans-serif", fontsize=22]; // --- 交错式1F1B调度子图 --- subgraph cluster_1f1b { label = "交错式1F1B调度"; fontsize=24; // 略大一点的集群标签字体大小 bgcolor="#f8f9fa"; node [color="#adb5bd", style="filled"]; // 确保节点被填充 // 时间轴行 (Rank 0) { rank = same; T10 [label="时间 0", style=filled, fillcolor="#ced4da"]; T11 [label="1", style=filled, fillcolor="#ced4da"]; T12 [label="2", style=filled, fillcolor="#ced4da"]; T13 [label="3", style=filled, fillcolor="#ced4da"]; T14 [label="4", style=filled, fillcolor="#ced4da"]; T15 [label="5", style=filled, fillcolor="#ced4da"]; T16 [label="6", style=filled, fillcolor="#ced4da"]; T17 [label="7", style=filled, fillcolor="#ced4da"]; T18 [label="8", style=filled, fillcolor="#ced4da"]; } T10 -> T11 -> T12 -> T13 -> T14 -> T15 -> T16 -> T17 -> T18 [style=invis]; // 水平对齐 // GPU 0 行 (Rank 1) { rank=same; S10_label [label="GPU 0", shape=plaintext, fontcolor="black"]; S10_T0 [label="前向 0", fillcolor="#74c0fc"]; S10_T1 [label="前向 1", fillcolor="#74c0fc"]; S10_T2 [label="前向 2", fillcolor="#74c0fc"]; S10_T3 [label="反向 0", fillcolor="#ffa8a8"]; S10_T4 [label="前向 3", fillcolor="#74c0fc"]; S10_T5 [label="反向 1", fillcolor="#ffa8a8"]; S10_T6 [label="前向 4", fillcolor="#74c0fc"]; S10_T7 [label="反向 2", fillcolor="#ffa8a8"]; S10_T8 [label="反向 3", fillcolor="#ffa8a8"]; } S10_label -> S10_T0 [style=invis]; S10_T0 -> S10_T1 -> S10_T2 -> S10_T3 -> S10_T4 -> S10_T5 -> S10_T6 -> S10_T7 -> S10_T8 [style=invis]; // GPU 1 行 (Rank 2) { rank=same; S11_label [label="GPU 1", shape=plaintext, fontcolor="black"]; S11_T0 [label="空闲", fillcolor="#e9ecef"]; S11_T1 [label="前向 0", fillcolor="#74c0fc"]; S11_T2 [label="前向 1", fillcolor="#74c0fc"]; S11_T3 [label="反向 0", fillcolor="#ffa8a8"]; S11_T4 [label="前向 2", fillcolor="#74c0fc"]; S11_T5 [label="反向 1", fillcolor="#ffa8a8"]; S11_T6 [label="前向 3", fillcolor="#74c0fc"]; S11_T7 [label="反向 2", fillcolor="#ffa8a8"]; S11_T8 [label="前向 4", fillcolor="#74c0fc"]; } S11_label -> S11_T0 [style=invis]; S11_T0 -> S11_T1 -> S11_T2 -> S11_T3 -> S11_T4 -> S11_T5 -> S11_T6 -> S11_T7 -> S11_T8 [style=invis]; // GPU 2 行 (Rank 3) { rank=same; S12_label [label="GPU 2", shape=plaintext, fontcolor="black"]; S12_T0 [label="空闲", fillcolor="#e9ecef"]; S12_T1 [label="空闲", fillcolor="#e9ecef"]; S12_T2 [label="前向 0", fillcolor="#74c0fc"]; S12_T3 [label="前向 1", fillcolor="#74c0fc"]; S12_T4 [label="反向 0", fillcolor="#ffa8a8"]; S12_T5 [label="前向 2", fillcolor="#74c0fc"]; S12_T6 [label="反向 1", fillcolor="#ffa8a8"]; S12_T7 [label="前向 3", fillcolor="#74c0fc"]; S12_T8 [label="反向 2", fillcolor="#ffa8a8"]; } S12_label -> S12_T0 [style=invis]; S12_T0 -> S12_T1 -> S12_T2 -> S12_T3 -> S12_T4 -> S12_T5 -> S12_T6 -> S12_T7 -> S12_T8 [style=invis]; // 用于行垂直对齐的不可见边 T10 -> S10_T0 [style=invis]; S10_T0 -> S11_T0 [style=invis]; S11_T0 -> S12_T0 [style=invis]; } }简单调度与交错式1F1B调度在3个阶段和多个微批次下的GPU时间利用率比较。蓝色(Fwd)代表前向传播,红色(Bwd)代表反向传播,灰色(Idle)代表气泡时间。1F1B显著减少了空闲时间。实现方面有效实现流水线并行需要仔细考虑几个因素:负载均衡: 各阶段的计算成本(时间)应大致相等。如果一个阶段耗时比其他阶段长很多,它就会成为瓶颈,其他阶段将空闲等待。由于Transformer模型中注意力块和前馈块的计算差异,平衡层间负载可能不易实现。通信: 通信发生在相邻阶段之间,主要是前向传输激活值和反向传输梯度。这些传输的大小取决于隐藏维度和序列长度。虽然可能不如张量并行所需的层内通信频繁,但这些传输仍然可能较大。微批次大小: 如前所述,这个参数($M$)直接影响气泡大小和内存使用。更大的$M$会减少气泡,但会增加存储所有处理中微批次激活值所需的内存。最佳的$M$取决于模型大小、阶段数和可用设备内存。框架支持: 实现高效调度、通信和梯度处理是复杂的。DeepSpeed和Megatron-LM等库提供了流水线并行的实现,通常与其他并行策略结合使用。这是一个高度简化的PyTorch代码片段,用来说明阶段和数据传递的思路(实际实现要复杂得多):import torch import torch.nn as nn # --- 假设这些在其他地方定义 --- # get_my_stage_id() -> int # get_num_stages() -> int # get_device_for_stage(stage_id) -> torch.device # send_tensor(tensor, to_stage_id) # recv_tensor(from_stage_id) -> tensor # global_micro_batch_size = ... # model_layers = [...] # 所有模型层的列表 class PipelineStage(nn.Module): def __init__(self, layers, stage_id): super().__init__() self.layers = nn.ModuleList(layers) self.stage_id = stage_id self.device = get_device_for_stage(stage_id) self.to(self.device) def forward(self, x): # 简化:假设x是从前一阶段接收的 # 如果 stage_id > 0 if x is not None: x = x.to(self.device) for layer in self.layers: x = layer(x) return x # --- 模型分区(例子) --- my_stage_id = get_my_stage_id() num_stages = get_num_stages() # 简化负载均衡 layers_per_stage = len(model_layers) // num_stages start_layer = my_stage_id * layers_per_stage if my_stage_id < num_stages - 1: end_layer = (my_stage_id + 1) * layers_per_stage else: end_layer = len(model_layers) my_layers = model_layers[start_layer:end_layer] pipeline_module = PipelineStage(my_layers, my_stage_id) # --- 简化训练步骤(无调度逻辑) --- def training_step(micro_batch_data): activations = None if my_stage_id == 0: activations = micro_batch_data # 第一阶段的输入数据 else: # 从前一阶段接收激活值 activations = recv_tensor(from_stage_id=my_stage_id - 1) # 通过此阶段的层进行前向传播 output_activations = pipeline_module(activations) if my_stage_id < num_stages - 1: # 将激活值发送到下一阶段 send_tensor(output_activations, to_stage_id=my_stage_id + 1) # 如果使用1F1B等调度,需要存储output_activations用于反向传播 # 如果使用1F1B等调度 else: # 最后阶段计算损失 # 假设目标标签可用 loss = compute_loss(output_activations, target_labels) # 开始反向传播 loss.backward() # 将梯度发送回前一阶段 # 简化处理 - 实际梯度取决于损失的输入 grad_to_send = output_activations.grad # send_tensor(grad_to_send, to_stage_id=my_stage_id - 1) # ... 反向传播逻辑对中间阶段继续 ... # 接收梯度,计算局部梯度,发送梯度回传 return loss # 或相关指标注意:此代码仅为说明用途。实际实现需要复杂的调度逻辑(如1F1B)、处理激活值检查点或重新计算、微批次间的梯度累积以及通信原语。流水线并行的优劣优点:模型深度可扩展性: 能够训练模型深度超出单个设备内存限制的模型,即使结合张量并行也能实现。内存效率(激活值): 与纯数据并行相比,流水线并行有时对激活值更具内存效率,因为每个设备只保存模型自身部分的激活值(尽管此优势很大程度上取决于微批次策略)。通信量减少(潜在): 通信只发生在相邻阶段之间,通常涉及激活值或梯度。与数据并行中频繁的AllReduce操作或张量并行中的张量拆分/聚合操作相比,这可能对带宽要求较低,尤其当激活值小于模型参数时。缺点:流水线气泡: 固有的空闲时间会降低硬件利用率,除非通过大量微批次和复杂调度来缓解。负载均衡敏感性: 性能高度依赖于各阶段间计算负载的平衡。负载不均的阶段会产生瓶颈。复杂性: 实现高效调度和管理跨微批次状态会增加显著的复杂性。延迟: 通过阶段的顺序特性增加了每个微批次处理的延迟。流水线并行很少单独用于大型模型。相反,它通常与数据并行和张量并行结合,采用混合方法。例如,一个常见的配置是在不同多GPU节点之间使用数据并行,同时在每个节点内部使用流水线并行和/或张量并行来管理节点内GPU上的模型大小。这允许同时扩展批次大小(通过数据并行)和模型大小(通过流水线并行/张量并行)。