当神经网络变得过大以至于单个设备无法容纳其任何一层,或者需要以不同方式重叠计算和通信时,流水线并行提供了一种替代的扩展策略。不同于复制整个模型或拆分单个层的方法,流水线并行将模型本身按顺序分配到多个设备上。每个设备或设备组都成为流水线中的一个“阶段”,负责运行模型层的一个子集。流水线并行的机制设想一个由多个连续层或模块组成的模型。在流水线并行中,你将连续的模块分配给不同的设备。例如,一个在四个GPU上运行的四层模型:GPU 0 (阶段 0): 运行第 1 层。GPU 1 (阶段 1): 运行第 2 层。GPU 2 (阶段 2): 运行第 3 层。GPU 3 (阶段 3): 运行第 4 层并计算损失。输入数据进入第一阶段(GPU 0)。处理后,输出激活被发送到第二阶段(GPU 1)。这会一直持续,直到最后阶段计算出输出和损失。随后,梯度以相反的顺序反向流经流水线。GPU 3 计算第 4 层的梯度,并将第 3 层输出的梯度发回给 GPU 2,然后 GPU 2 计算第 3 层的梯度并将其发回给 GPU 1,依此类推,直到梯度到达第一阶段。流水线气泡问题此过程的简单实现效率低下。思考时间线:当阶段 1 处理第一个数据批次时,阶段 0 处于空闲状态,等待下一个批次。类似地,当阶段 2 处理时,阶段 0 和 1 处于空闲状态(假设只有一个批次流过)。在反向传播期间,各阶段会再次处于空闲状态,因为它们在等待来自后续阶段的梯度。这种空闲时间,被称为“流水线气泡”,大幅降低了硬件利用率。digraph G { rankdir=LR; node [shape=box, style=filled, color="#ced4da"]; splines=ortho; newrank=true; subgraph cluster_gpus { label="设备"; color="#dee2e6"; bgcolor="#e9ecef"; rank=same; GPU0 [label="GPU 0\n(阶段 0)"]; GPU1 [label="GPU 1\n(阶段 1)"]; GPU2 [label="GPU 2\n(阶段 2)"]; GPU3 [label="GPU 3\n(阶段 3)"]; } subgraph cluster_time { label="时间步长"; color="#dee2e6"; bgcolor="#e9ecef"; T1 -> T2 -> T3 -> T4 -> T5 -> T6 -> T7 [style=invis]; node [style=filled]; T1 [label="T1", shape=plaintext]; T2 [label="T2", shape=plaintext]; T3 [label="T3", shape=plaintext]; T4 [label="T4", shape=plaintext]; T5 [label="T5", shape=plaintext]; T6 [label="T6", shape=plaintext]; T7 [label="T7", shape=plaintext]; } F0 [label="正向 0", shape=box, style=filled, color="#a5d8ff"]; F1 [label="正向 1", shape=box, style=filled, color="#74c0fc"]; F2 [label="正向 2", shape=box, style=filled, color="#4dabf7"]; F3 [label="正向 3", shape=box, style=filled, color="#339af0"]; B3 [label="反向 3", shape=box, style=filled, color="#ffc9c9"]; B2 [label="反向 2", shape=box, style=filled, color="#ffa8a8"]; B1 [label="反向 1", shape=box, style=filled, color="#ff8787"]; B0 [label="反向 0", shape=box, style=filled, color="#ff6b6b"]; BubbleF [label="空闲", shape=point, color="#adb5bd", width=0.1, height=0.1]; BubbleB [label="空闲", shape=point, color="#adb5bd", width=0.1, height=0.1]; {rank=same; T1; F0;} {rank=same; T2; BubbleF; F1;} {rank=same; T3; BubbleF; BubbleF; F2;} {rank=same; T4; BubbleF; BubbleF; BubbleF; F3;} {rank=same; T5; BubbleB; BubbleB; BubbleB; B3;} {rank=same; T6; BubbleB; BubbleB; B2; BubbleB;} {rank=same; T7; BubbleB; B1; BubbleB; BubbleB;} {rank=same; T8; B0; BubbleB; BubbleB; BubbleB;} T1 -> F0 [style=invis]; F0 -> GPU0 [style=invis]; T2 -> BubbleF -> F1 [style=invis]; F1 -> GPU1 [style=invis]; T3 -> BubbleF -> BubbleF -> F2 [style=invis]; F2 -> GPU2 [style=invis]; T4 -> BubbleF -> BubbleF -> BubbleF -> F3 [style=invis]; F3 -> GPU3 [style=invis]; T5 -> BubbleB -> BubbleB -> BubbleB -> B3 [style=invis]; B3 -> GPU3 [style=invis]; T6 -> BubbleB -> BubbleB -> B2 -> BubbleB [style=invis]; B2 -> GPU2 [style=invis]; T7 -> BubbleB -> B1 -> BubbleB -> BubbleB [style=invis]; B1 -> GPU1 [style=invis]; T8 -> B0 -> BubbleB -> BubbleB -> BubbleB [style=invis]; B0 -> GPU0 [style=invis]; }单批次简单流水线运行示意图,显示了在时间步长(T1-T8)内的正向(Fwd)和反向(Bwd)传播过程中,GPU上存在大量空闲时间(气泡)。通过微批次处理减少气泡解决流水线气泡问题的标准方法是微批次处理。我们不是一次性将整个小批次数据送入流水线,而是将其分成更小的块,称为微批次。流水线并发处理这些微批次。一旦阶段 0 完成处理第一个微批次并将其激活发送到阶段 1,阶段 0 就可以立即开始处理第二个微批次。这使得多个微批次可以在流水线中同时“飞行”,重叠各阶段的计算,并大幅减少空闲时间。微批次的数量($m$)是一个超参数;更大的 $m$ 通常会带来更好的利用率,但会增加通信开销,并可能因存储每个微批次的中间激活和梯度而增加内存使用量。digraph G { rankdir=LR; node [shape=box, style=filled, color="#ced4da"]; splines=ortho; newrank=true; subgraph cluster_gpus { label="设备"; color="#dee2e6"; bgcolor="#e9ecef"; rank=same; GPU0 [label="GPU 0\n(阶段 0)"]; GPU1 [label="GPU 1\n(阶段 1)"]; GPU2 [label="GPU 2\n(阶段 2)"]; GPU3 [label="GPU 3\n(阶段 3)"]; } subgraph cluster_time { label="时间步长(微批次 m0-m3)"; color="#dee2e6"; bgcolor="#e9ecef"; T1 -> T2 -> T3 -> T4 -> T5 -> T6 -> T7 -> T8 -> T9 -> T10 -> T11 -> T12 -> T13 [style=invis]; node [style=filled, width=0.8, height=0.4, fontsize=8]; T1 [label="T1", shape=plaintext]; T2 [label="T2", shape=plaintext]; T3 [label="T3", shape=plaintext]; T4 [label="T4", shape=plaintext]; T5 [label="T5", shape=plaintext]; T6 [label="T6", shape=plaintext]; T7 [label="T7", shape=plaintext]; T8 [label="T8", shape=plaintext]; T9 [label="T9", shape=plaintext]; T10 [label="T10", shape=plaintext]; T11 [label="T11", shape=plaintext]; T12 [label="T12", shape=plaintext]; T13 [label="T13", shape=plaintext]; } F00 [label="F0(m0)", shape=box, style=filled, color="#a5d8ff"]; F01 [label="F0(m1)", shape=box, style=filled, color="#a5d8ff"]; F02 [label="F0(m2)", shape=box, style=filled, color="#a5d8ff"]; F03 [label="F0(m3)", shape=box, style=filled, color="#a5d8ff"]; F10 [label="F1(m0)", shape=box, style=filled, color="#74c0fc"]; F11 [label="F1(m1)", shape=box, style=filled, color="#74c0fc"]; F12 [label="F1(m2)", shape=box, style=filled, color="#74c0fc"]; F13 [label="F1(m3)", shape=box, style=filled, color="#74c0fc"]; F20 [label="F2(m0)", shape=box, style=filled, color="#4dabf7"]; F21 [label="F2(m1)", shape=box, style=filled, color="#4dabf7"]; F22 [label="F2(m2)", shape=box, style=filled, color="#4dabf7"]; F23 [label="F2(m3)", shape=box, style=filled, color="#4dabf7"]; F30 [label="F3(m0)", shape=box, style=filled, color="#339af0"]; F31 [label="F3(m1)", shape=box, style=filled, color="#339af0"]; F32 [label="F3(m2)", shape=box, style=filled, color="#339af0"]; F33 [label="F3(m3)", shape=box, style=filled, color="#339af0"]; B30 [label="B3(m0)", shape=box, style=filled, color="#ffc9c9"]; B31 [label="B3(m1)", shape=box, style=filled, color="#ffc9c9"]; B32 [label="B3(m2)", shape=box, style=filled, color="#ffc9c9"]; B33 [label="B3(m3)", shape=box, style=filled, color="#ffc9c9"]; B20 [label="B2(m0)", shape=box, style=filled, color="#ffa8a8"]; B21 [label="B2(m1)", shape=box, style=filled, color="#ffa8a8"]; B22 [label="B2(m2)", shape=box, style=filled, color="#ffa8a8"]; B23 [label="B2(m3)", shape=box, style=filled, color="#ffa8a8"]; B10 [label="B1(m0)", shape=box, style=filled, color="#ff8787"]; B11 [label="B1(m1)", shape=box, style=filled, color="#ff8787"]; B12 [label="B1(m2)", shape=box, style=filled, color="#ff8787"]; B13 [label="B1(m3)", shape=box, style=filled, color="#ff8787"]; B00 [label="B0(m0)", shape=box, style=filled, color="#ff6b6b"]; B01 [label="B0(m1)", shape=box, style=filled, color="#ff6b6b"]; B02 [label="B0(m2)", shape=box, style=filled, color="#ff6b6b"]; B03 [label="B0(m3)", shape=box, style=filled, color="#ff6b6b"]; Idle [label="", shape=point, color="#adb5bd", width=0.1, height=0.1]; {rank=same; T1; F00; Idle; Idle; Idle;} {rank=same; T2; F01; F10; Idle; Idle;} {rank=same; T3; F02; F11; F20; Idle;} {rank=same; T4; F03; F12; F21; F30;} {rank=same; T5; Idle; F13; F22; F31;} {rank=same; T6; Idle; Idle; F23; F32;} {rank=same; T7; Idle; Idle; Idle; F33;} {rank=same; T8; Idle; Idle; Idle; B30;} {rank=same; T9; Idle; Idle; B20; B31;} {rank=same; T10; Idle; B10; B21; B32;} {rank=same; T11; B00; B11; B22; B33;} {rank=same; T12; B01; B12; B23; Idle;} {rank=same; T13; B02; B13; Idle; Idle;} {rank=same; T14; B03; Idle; Idle; Idle;} T1 -> F00 -> Idle -> Idle -> Idle [style=invis]; F00 -> GPU0 [style=invis]; T2 -> F01 -> F10 -> Idle -> Idle [style=invis]; F10 -> GPU1 [style=invis]; T3 -> F02 -> F11 -> F20 -> Idle [style=invis]; F20 -> GPU2 [style=invis]; T4 -> F03 -> F12 -> F21 -> F30 [style=invis]; F30 -> GPU3 [style=invis]; T5 -> Idle -> F13 -> F22 -> F31 [style=invis]; F13 -> GPU1 [style=invis]; T6 -> Idle -> Idle -> F23 -> F32 [style=invis]; F23 -> GPU2 [style=invis]; T7 -> Idle -> Idle -> Idle -> F33 [style=invis]; F33 -> GPU3 [style=invis]; T8 -> Idle -> Idle -> Idle -> B30 [style=invis]; B30 -> GPU3 [style=invis]; T9 -> Idle -> Idle -> B20 -> B31 [style=invis]; B20 -> GPU2 [style=invis]; T10 -> Idle -> B10 -> B21 -> B32 [style=invis]; B10 -> GPU1 [style=invis]; T11 -> B00 -> B11 -> B22 -> B33 [style=invis]; B00 -> GPU0 [style=invis]; T12 -> B01 -> B12 -> B23 -> Idle [style=invis]; B12 -> GPU1 [style=invis]; T13 -> B02 -> B13 -> Idle -> Idle [style=invis]; B13 -> GPU1 [style=invis]; T14 -> B03 -> Idle -> Idle -> Idle [style=invis]; B03 -> GPU0 [style=invis]; }使用微批次处理(m0-m3)的流水线运行示意图。不同微批次的正向(F)和反向(B)传播在各阶段(GPU)之间重叠,与简单方法相比,减少了空闲时间。初始填充和最终排空阶段仍存在一些气泡。在PyTorch中实现流水线并行PyTorch提供了实现流水线并行的工具,尽管它通常比DDP需要更多手动设置。其主要构成部分包括:模型划分: 将你的 nn.Module 手动拆分为连续的 nn.Sequential 模块,每个阶段一个。将每个模块放置在其指定设备上。通信: 使用 torch.distributed.send 和 torch.distributed.recv 操作,在相邻阶段之间正向传输激活和反向传输梯度。请记住这些操作是阻塞的。微批次调度: 实现一个循环,遍历微批次,管理每个微批次在各阶段的正向和反向传播。这需要仔细同步。梯度同步: 每个微批次在反向传播期间计算的梯度需要累积。优化器步骤通常仅在小批次中的所有微批次都完成其正向和反向传播后才执行。我们来概述一个带有微批次处理的简单两阶段流水线(GPU 0 和 GPU 1)的流程:import torch import torch.nn as nn import torch.distributed as dist # 假设分布式环境已初始化(rank 0 在 GPU 0,rank 1 在 GPU 1) # 假设模型已拆分为 stage0 和 stage1,并放置在各自的设备上 def run_pipeline_step(stage0, stage1, micro_batches_data, micro_batches_labels, loss_fn, optimizer): num_micro_batches = len(micro_batches_data) activations_storage = [None] * num_micro_batches # 存储用于反向传播的激活 gradients_storage = [None] * num_micro_batches # 存储用于反向传播的梯度 current_rank = dist.get_rank() world_size = dist.get_world_size() # 在此示例中假设 world_size = 2 # --- 正向传播 --- for i in range(num_micro_batches): micro_batch = micro_batches_data[i] if current_rank == 0: # 第一阶段 # 计算阶段 0 的激活 activations = stage0(micro_batch.to(current_rank)) # 将激活发送到下一阶段(rank 1) dist.send(activations.cpu(), dst=1, tag=i) # 发送 CPU 张量以避免 GPU 同步问题 activations_storage[i] = activations # 存储用于反向传播 elif current_rank == 1: # 最后阶段 # 从上一阶段(rank 0)接收激活 received_activations = torch.empty_like(some_prototype_tensor_shape, device='cpu') # 需要形状信息 dist.recv(received_activations, src=0, tag=i) received_activations = received_activations.to(current_rank) received_activations.requires_grad_() # 重要:为接收到的张量启用梯度 # 计算阶段 1 的激活(最终输出) outputs = stage1(received_activations) # 计算损失 labels = micro_batches_labels[i].to(current_rank) loss = loss_fn(outputs, labels) # 存储反向传播所需信息 activations_storage[i] = received_activations # 此阶段的输入 gradients_storage[i] = loss # 存储损失以供稍后启动反向传播 # --- 反向传播 --- # 为确保正确性,对微批次进行反向迭代(GPipe 调度) for i in range(num_micro_batches - 1, -1, -1): if current_rank == 1: # 最后阶段 loss = gradients_storage[i] input_activation = activations_storage[i] # 为此微批次的损失启动反向传播 # 局部计算阶段 1 参数的梯度 # 如果不是此阶段输入的最后一个微批次,则需要保留计算图 retain_graph_flag = (i != 0) loss.backward(retain_graph=retain_graph_flag) # 将输入激活的梯度发回上一阶段(rank 0) grad_to_send = input_activation.grad.cpu() dist.send(grad_to_send, dst=0, tag=i) elif current_rank == 0: # 第一阶段 # 从下一阶段(rank 1)接收梯度 grad_received = torch.empty_like(some_prototype_grad_shape, device='cpu') # 需要形状信息 dist.recv(grad_received, src=1, tag=i) grad_received = grad_received.to(current_rank) # 使用接收到的梯度继续反向传播 output_activation = activations_storage[i] # 局部计算阶段 0 参数的梯度 output_activation.backward(gradient=grad_received) # --- 优化器步骤 --- # 处理完所有微批次后,更新权重 optimizer.step() optimizer.zero_grad() # --- 注意 --- # 1. 这是一个简化示例(GPipe 风格的调度)。 # 2. 需要机制来确定接收缓冲区中的张量形状。 # 3. 错误处理、正确的设备放置和同步很关键。 # 4. 存在更高级的调度(例如,交错式)。 # 5. 像 `torch.distributed.pipeline`(实验性)这样的库旨在简化此过程。这种手动实现突出了其复杂性:显式通信调用,管理每个微批次的中间激活及其梯度,以及阶段之间细致的同步。PyTorch 还有一个实验性的 torch.distributed.pipeline.sync.Pipe 模块,旨在抽象化部分这种复杂性,根据拆分为阶段的模型定义,自动处理微批次处理、通信和梯度传播。然而,使用基本操作理解手动过程,能帮助更好了解其内部机制。考量与权衡负载均衡: 每个阶段的计算成本应大致相等,以避免阶段成为瓶颈。这通常需要仔细的模型划分。通信开销: 在设备之间传输激活和梯度会引入开销。如果张量尺寸较大或互连带宽较低,则这种开销会更加明显。微批次处理由于更频繁、更小的传输而增加了这种开销。内存: 虽然每个设备的峰值内存有所减少(因为每个设备只保存模型的一部分),但存储多个正在处理的微批次的激活的需求,与简单的流水线相比,可能增加整体内存需求。复杂性: 正确实现流水线并行,尤其是在微批次处理和优化调度方面,比使用 DDP 复杂得多。流水线并行对于超大型模型最有用,即使张量并行也不足以应对,或者需要对设备运行和内存进行精细控制时。它通常与数据并行结合使用(例如,在每个流水线阶段内运行 DDP),以实现进一步的扩展。