趋近智
流水线并行 (PP) 通过垂直划分整个模型来分担大型语言模型的计算负荷。这种方法将连续的层分配给不同设备,形成类似装配线的处理流水线。与复制模型的数据并行或拆分层内单一操作的张量并行不同,流水线并行提供了一种独特的模型分发策略。
设想一个包含许多层的大型Transformer模型。与试图将所有层放入一个设备或在层内拆分复杂的矩阵乘法不同,流水线并行将例如第1-12层分配给GPU 0,第13-24层分配给GPU 1,第25-36层分配给GPU 2,以此类推。在单个设备上执行的每组层被称为一个阶段或分区。
在流水线并行设置中,数据批次首先被分解成更小的微批次。这对于使流水线阶段有效发挥作用很重要,我们很快就会看到这一点。
此过程如下进行:
这种流程使得不同设备可以同时处理不同的微批次,实现了模型深度方向的并行计算。
一个四阶段流水线,显示了GPU上的前向(fwd)激活流和反向(bwd)梯度流。
流水线并行中的一个重要问题是流水线气泡或空闲时间。在处理批次开始时,只有阶段0是活跃的。阶段1必须等待阶段0完成第一个微批次,阶段2必须等待阶段1,以此类推。同样,在反向传播 (backpropagation)期间,初始阶段会因等待来自后续阶段的梯度而变得空闲。这种启动和结束期间会导致硬件利用率不足。
这个气泡的大小取决于流水线阶段数 () 和微批次数 ()。如果每个微批次通过一个阶段大约需要相同的时间 (),那么简单的前向-后向顺序调度,其前向传播的总时间近似为 ,反向传播也类似。总有效工作量是 。效率(设备忙碌时间所占比例)大约是 ,对于较大的 ,这可以简化为 。气泡部分(空闲时间)近似为 。
为了减少气泡,我们需要相对于阶段数()增加微批次数()。然而,增加意味着更小的微批次,这可能无法充分发挥每个GPU的计算能力,并且还会增加所有正在处理的微批次所需的总激活内存。
为了减轻气泡问题,已经开发出各种调度策略,超越了简单的“所有前向,然后所有反向”方法(通常与GPipe相关)。一种常见且有效的策略是**1F1B(一次前向,一次反向)**调度,它通过PipeDream等框架而广为人知。
在1F1B调度中,阶段轮流执行即将到来的微批次的前向传播和已完成微批次的反向传播 (backpropagation)。一旦一个阶段完成微批次 i 的前向传播,它可能会立即执行微批次 i-k 的反向传播(其中 k 与阶段数有关),前提是下一阶段的梯度已可用。这使得设备更忙碌,与简单调度相比,显著减少了空闲时间气泡。
简单调度与交错式1F1B调度在3个阶段和多个微批次下的GPU时间利用率比较。蓝色(Fwd)代表前向传播,红色(Bwd)代表反向传播,灰色(Idle)代表气泡时间。1F1B显著减少了空闲时间。
有效实现流水线并行需要仔细考虑几个因素:
这是一个高度简化的PyTorch代码片段,用来说明阶段和数据传递的思路(实际实现要复杂得多):
import torch
import torch.nn as nn
# --- 假设这些在其他地方定义 ---
# get_my_stage_id() -> int
# get_num_stages() -> int
# get_device_for_stage(stage_id) -> torch.device
# send_tensor(tensor, to_stage_id)
# recv_tensor(from_stage_id) -> tensor
# global_micro_batch_size = ...
# model_layers = [...] # 所有模型层的列表
class PipelineStage(nn.Module):
def __init__(self, layers, stage_id):
super().__init__()
self.layers = nn.ModuleList(layers)
self.stage_id = stage_id
self.device = get_device_for_stage(stage_id)
self.to(self.device)
def forward(self, x):
# 简化:假设x是从前一阶段接收的
# 如果 stage_id > 0
if x is not None:
x = x.to(self.device)
for layer in self.layers:
x = layer(x)
return x
# --- 模型分区(例子) ---
my_stage_id = get_my_stage_id()
num_stages = get_num_stages()
# 简化负载均衡
layers_per_stage = len(model_layers) // num_stages
start_layer = my_stage_id * layers_per_stage
if my_stage_id < num_stages - 1:
end_layer = (my_stage_id + 1) * layers_per_stage
else:
end_layer = len(model_layers)
my_layers = model_layers[start_layer:end_layer]
pipeline_module = PipelineStage(my_layers, my_stage_id)
# --- 简化训练步骤(无调度逻辑) ---
def training_step(micro_batch_data):
activations = None
if my_stage_id == 0:
activations = micro_batch_data # 第一阶段的输入数据
else:
# 从前一阶段接收激活值
activations = recv_tensor(from_stage_id=my_stage_id - 1)
# 通过此阶段的层进行前向传播
output_activations = pipeline_module(activations)
if my_stage_id < num_stages - 1:
# 将激活值发送到下一阶段
send_tensor(output_activations, to_stage_id=my_stage_id + 1)
# 如果使用1F1B等调度,需要存储output_activations用于反向传播
# 如果使用1F1B等调度
else:
# 最后阶段计算损失
# 假设目标标签可用
loss = compute_loss(output_activations, target_labels)
# 开始反向传播
loss.backward()
# 将梯度发送回前一阶段
# 简化处理 - 实际梯度取决于损失的输入
grad_to_send = output_activations.grad
# send_tensor(grad_to_send, to_stage_id=my_stage_id - 1)
# ... 反向传播逻辑对中间阶段继续 ...
# 接收梯度,计算局部梯度,发送梯度回传
return loss # 或相关指标
注意:此代码仅为说明用途。实际实现需要复杂的调度逻辑(如1F1B)、处理激活值检查点或重新计算、微批次间的梯度累积以及通信原语。
优点:
缺点:
流水线并行很少单独用于大型模型。相反,它通常与数据并行和张量并行结合,采用混合方法。例如,一个常见的配置是在不同多GPU节点之间使用数据并行,同时在每个节点内部使用流水线并行和/或张量并行来管理节点内GPU上的模型大小。这允许同时扩展批次大小(通过数据并行)和模型大小(通过流水线并行/张量并行)。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造