趋近智
大师班
数据并行(DP)是分配深度学习模型(包括大型语言模型)训练计算负担的最直接和常用策略。在单个设备上训练这些大型模型通常会遇到计算时间和内存限制的瓶颈。数据并行通过在多个设备上同时处理数据的不同部分,直接解决计算时间方面的问题。
核心原理很简单:将整个模型复制到每个可用的处理单元(如GPU)上,然后将每个全局数据批次分成更小的部分,称为微批次。每个设备独立处理其分配到的微批次。
让我们分析使用K个设备进行数据并行训练的一个迭代中的典型步骤:
AllReduce的集体通信操作实现。AllReduce操作将所有K个设备中的梯度gi求和(或平均),并将得到的同步梯度gsync分发回每个设备。通常使用平均:
gsync=K1i=1∑Kgi
这个循环针对训练数据集中的每个批次重复。
数据并行工作流程:全局批次被分割,在持有模型副本的设备上并行处理,梯度通过AllReduce同步,并将同步更新应用到每个副本。
DistributedDataParallel)提供了高级抽象,使得数据并行的实现相对简单,通常只需要对标准单设备训练代码进行少量修改。尽管有其优点,数据并行存在一个显著的局限性,特别是对于本课程关注的大型语言模型:
AllReduce操作需要在所有相关设备之间通信梯度。传输的数据量与模型参数的大小成比例。随着设备数量(K)的增加,或者如果设备之间的互联带宽有限,这个同步步骤可能成为一个显著瓶颈,从而减弱并行计算带来的加速效果。AllReduce所需的时间有时会占用大部分计算时间,特别是对于较小的模型或每个设备计算速度较快的情况。PyTorch 的 torch.nn.parallel.DistributedDataParallel 模块是在多进程设置中实现数据并行的标准方式,通常优于旧的 torch.nn.DataParallel,因为它具有更好的性能和灵活性。以下是一个概要:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import os
# 假设一个简单的模型定义存在:
# class MyLargeModel(nn.Module): ...
def setup(rank, world_size):
"""初始化进程组。"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# 初始化进程组
# 对NVIDIA GPU使用'nccl'后端
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def cleanup():
"""销毁进程组。"""
dist.destroy_process_group()
def train_worker(rank, world_size, model_args, data_args, train_args):
"""单个工作进程的主要训练函数。"""
print(f"在 {rank} 进程上运行DDP训练。")
setup(rank, world_size)
# 实例化模型并将其移动到分配的GPU
model = MyLargeModel(**model_args).to(rank)
# 使用DDP包装模型
# DDP会自动处理梯度同步
ddp_model = DDP(model, device_ids=[rank])
# 准备数据集和DistributedSampler
# DistributedSampler确保每个进程获取不同的
# 数据切片
dataset = YourDataset(**data_args)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
# 使用pin_memory=True和num_workers > 0以优化数据加载
dataloader = DataLoader(
dataset,
batch_size=train_args['micro_batch_size'],
sampler=sampler,
pin_memory=True,
num_workers=4
)
optimizer = torch.optim.AdamW(
ddp_model.parameters(),
lr=train_args['learning_rate']
)
# --- 简化训练循环 ---
ddp_model.train()
for epoch in range(train_args['num_epochs']):
sampler.set_epoch(epoch) # 对于DDP中的洗牌很重要
for batch in dataloader:
inputs = batch['input_ids'].to(rank)
labels = batch['labels'].to(rank)
optimizer.zero_grad()
outputs = ddp_model(inputs, labels=labels) # 前向传播
loss = outputs.loss
# 反向传播 - DDP自动平均梯度
loss.backward()
# 优化器步骤 - 根据平均梯度应用更新
optimizer.step()
if rank == 0: # 仅在主进程上记录
print(f"Epoch: {epoch}, Loss: {loss.item()}")
# --- 简化循环结束 ---
cleanup()
if __name__ == '__main__':
# 示例配置(请替换为实际参数)
world_size = torch.cuda.device_count() # 例如,4个GPU
model_args = {'vocab_size': 50257, 'hidden_size': 768, 'num_layers': 12}
data_args = {'data_path': '/path/to/data'}
train_args = {
'micro_batch_size': 8,
'learning_rate': 1e-4,
'num_epochs': 3
}
# 注意:全局批次大小 = micro_batch_size * world_size
# 生成工作进程
mp.spawn(
train_worker,
args=(world_size, model_args, data_args, train_args),
nprocs=world_size,
join=True
)
在这个概述中:
setup 初始化分布式环境。每个进程获得一个从0到world_size - 1的唯一rank。DistributedSampler 与 DataLoader 一起使用,以确保每个进程获得不重叠的数据分区。ddp_model = DDP(model, device_ids=[rank])。DDP会自动拦截反向传播,对梯度执行AllReduce操作,并确保在调用optimizer.step()之前所有进程都拥有平均后的梯度。数据并行是扩展训练吞吐量的一种基本技术。然而,其内存限制使得有必要考虑张量并行和流水线并行等其他策略,尤其是在处理现代大型语言模型的庞大规模时。通常,最有效的方法是将数据并行与其他这些技术结合使用,我们接下来将研究这些技术。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造