数据并行(DP)是分配深度学习模型(包括大型语言模型)训练计算负担的最直接和常用策略。在单个设备上训练这些大型模型通常会遇到计算时间和内存限制的瓶颈。数据并行通过在多个设备上同时处理数据的不同部分,直接解决计算时间方面的问题。核心原理很简单:将整个模型复制到每个可用的处理单元(如GPU)上,然后将每个全局数据批次分成更小的部分,称为微批次。每个设备独立处理其分配到的微批次。数据并行如何工作让我们分析使用$K$个设备进行数据并行训练的一个迭代中的典型步骤:模型复制: 训练开始前,整个模型(参数、缓冲区)的相同副本被加载到$K$个设备中的每一个上。数据分片: 全局训练数据批次被分割成$K$个微批次。每个设备接收一个微批次。例如,如果全局批次大小为$B$,我们有$K$个设备,每个设备通常处理大小为$B/K$的微批次。前向传播: 每个设备使用其本地模型副本和分配到的微批次执行前向传播。这会计算该部分数据的损失。反向传播: 每个设备通过反向传播计算损失相对于其本地模型参数的梯度。此时,每个设备$i$持有一组梯度$g_i$,仅根据其微批次计算得出。梯度同步: 这是数据并行的定义性步骤。在每个设备上独立计算的梯度必须结合起来,以代表整个全局批次的梯度。这通常通过一种称为AllReduce的集体通信操作实现。AllReduce操作将所有$K$个设备中的梯度$g_i$求和(或平均),并将得到的同步梯度$g_{sync}$分发回每个设备。通常使用平均: $$ g_{sync} = \frac{1}{K} \sum_{i=1}^{K} g_i $$优化器步骤: 每个设备使用同步梯度$g_{sync}$,通过所选优化器(例如AdamW)更新其本地的模型参数副本。因为所有设备初始参数相同,并基于同步梯度应用相同的更新,所以在优化器步骤之后,模型副本保持相同。这个循环针对训练数据集中的每个批次重复。digraph DP_Flow { rankdir=TB; // 设置所有元素的全局字体大小 fontsize=12; // 节点设置,明确字体大小 node [shape=box, style=rounded, fontname="sans-serif", margin=0.2, color="#ced4da", fontsize=12]; // 边设置,明确字体大小 edge [fontname="sans-serif", color="#868e96", fontsize=12]; subgraph cluster_data { label = "数据输入"; fontsize=12; // Explicitly set cluster label fontsize style = "filled"; color = "#e9ecef"; GlobalBatch [label="全局批次", shape=cylinder, style=filled, color="#a5d8ff"]; } Splitter [label="分割数据", shape=invtriangle, style=filled, color="#ffd8a8"]; subgraph cluster_workers { label = "并行处理(K个设备)"; fontsize=12; // Explicitly set cluster label fontsize style = "filled"; color = "#e9ecef"; rank=same; Device1 [label="设备 1\n微批次 1\n模型副本\n前向/反向\n(本地梯度 g₁)", shape=Mrecord, style=filled, color="#b2f2bb"]; DeviceK [label="设备 K\n微批次 K\n模型副本\n前向/反向\n(本地梯度 gₖ)", shape=Mrecord, style=filled, color="#b2f2bb"]; Dots [label="...", shape=plaintext]; } AllReduce [label="AllReduce\n(同步梯度)", shape=diamond, style=filled, color="#ffc9c9"]; Optimizer [label="优化器步骤\n(更新模型)", shape=cds, style=filled, color="#d0bfff"]; GlobalBatch -> Splitter; Splitter -> Device1 [label="微批次 1"]; Splitter -> DeviceK [label="微批次 K"]; Splitter -> Dots [style=invis]; Device1 -> AllReduce [label="g₁"]; DeviceK -> AllReduce [label="gₖ"]; Dots -> AllReduce [style=invis]; AllReduce -> Optimizer [label="g_sync"]; Optimizer -> Device1 [label="更新权重", style=dashed]; Optimizer -> DeviceK [label="更新权重", style=dashed]; }数据并行工作流程:全局批次被分割,在持有模型副本的设备上并行处理,梯度通过AllReduce同步,并将同步更新应用到每个副本。数据并行的优点简洁性: PyTorch等框架(如DistributedDataParallel)提供了高级抽象,使得数据并行的实现相对简单,通常只需要对标准单设备训练代码进行少量修改。吞吐量增加: 通过并行处理数据,数据并行大幅减少了每个训练步骤的时间,从而能够更快地遍历大型数据集或使用更大的有效批次大小,这有时可以改进模型收敛和泛化能力。广泛适用性: 数据并行本身不需要对模型架构进行根本性改变。它对大多数标准网络设计来说是“开箱即用”的。数据并行的局限性尽管有其优点,数据并行存在一个显著的局限性,特别是对于本课程关注的大型语言模型:内存限制: 主要缺点是每个设备都必须保存模型的参数、梯度、优化器状态的完整副本,以及其微批次在前向传播期间计算的激活值。对于具有数十亿或数万亿参数的模型,所需的内存通常会超出即使是最大可用加速器(GPU/TPU)的容量。这从根本上限制了仅使用数据并行可以训练的最大模型大小。通信开销: AllReduce操作需要在所有相关设备之间通信梯度。传输的数据量与模型参数的大小成比例。随着设备数量($K$)的增加,或者如果设备之间的互联带宽有限,这个同步步骤可能成为一个显著瓶颈,从而减弱并行计算带来的加速效果。AllReduce所需的时间有时会占用大部分计算时间,特别是对于较小的模型或每个设备计算速度较快的情况。使用PyTorch DistributedDataParallel (DDP) 的实现概述PyTorch 的 torch.nn.parallel.DistributedDataParallel 模块是在多进程设置中实现数据并行的标准方式,通常优于旧的 torch.nn.DataParallel,因为它具有更好的性能和灵活性。以下是一个概要:import torch import torch.distributed as dist import torch.multiprocessing as mp import torch.nn as nn from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader, DistributedSampler import os # 假设一个简单的模型定义存在: # class MyLargeModel(nn.Module): ... def setup(rank, world_size): """初始化进程组。""" os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12355' # 初始化进程组 # 对NVIDIA GPU使用'nccl'后端 dist.init_process_group("nccl", rank=rank, world_size=world_size) torch.cuda.set_device(rank) def cleanup(): """销毁进程组。""" dist.destroy_process_group() def train_worker(rank, world_size, model_args, data_args, train_args): """单个工作进程的主要训练函数。""" print(f"在 {rank} 进程上运行DDP训练。") setup(rank, world_size) # 实例化模型并将其移动到分配的GPU model = MyLargeModel(**model_args).to(rank) # 使用DDP包装模型 # DDP会自动处理梯度同步 ddp_model = DDP(model, device_ids=[rank]) # 准备数据集和DistributedSampler # DistributedSampler确保每个进程获取不同的 # 数据切片 dataset = YourDataset(**data_args) sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank) # 使用pin_memory=True和num_workers > 0以优化数据加载 dataloader = DataLoader( dataset, batch_size=train_args['micro_batch_size'], sampler=sampler, pin_memory=True, num_workers=4 ) optimizer = torch.optim.AdamW( ddp_model.parameters(), lr=train_args['learning_rate'] ) # --- 简化训练循环 --- ddp_model.train() for epoch in range(train_args['num_epochs']): sampler.set_epoch(epoch) # 对于DDP中的洗牌很重要 for batch in dataloader: inputs = batch['input_ids'].to(rank) labels = batch['labels'].to(rank) optimizer.zero_grad() outputs = ddp_model(inputs, labels=labels) # 前向传播 loss = outputs.loss # 反向传播 - DDP自动平均梯度 loss.backward() # 优化器步骤 - 根据平均梯度应用更新 optimizer.step() if rank == 0: # 仅在主进程上记录 print(f"Epoch: {epoch}, Loss: {loss.item()}") # --- 简化循环结束 --- cleanup() if __name__ == '__main__': # 示例配置(请替换为实际参数) world_size = torch.cuda.device_count() # 例如,4个GPU model_args = {'vocab_size': 50257, 'hidden_size': 768, 'num_layers': 12} data_args = {'data_path': '/path/to/data'} train_args = { 'micro_batch_size': 8, 'learning_rate': 1e-4, 'num_epochs': 3 } # 注意:全局批次大小 = micro_batch_size * world_size # 生成工作进程 mp.spawn( train_worker, args=(world_size, model_args, data_args, train_args), nprocs=world_size, join=True )在这个概述中:setup 初始化分布式环境。每个进程获得一个从0到world_size - 1的唯一rank。DistributedSampler 与 DataLoader 一起使用,以确保每个进程获得不重叠的数据分区。主要变化是包装模型:ddp_model = DDP(model, device_ids=[rank])。DDP会自动拦截反向传播,对梯度执行AllReduce操作,并确保在调用optimizer.step()之前所有进程都拥有平均后的梯度。数据并行是扩展训练吞吐量的一种基本技术。然而,其内存限制使得有必要考虑张量并行和流水线并行等其他策略,尤其是在处理现代大型语言模型的庞大规模时。通常,最有效的方法是将数据并行与其他这些技术结合使用,我们接下来将研究这些技术。