Okay, let's put the theory into practice. Setting up a distributed training job involves coordinating multiple processes, often running on different devices (GPUs), to work together on training a single model. This practical focuses on establishing a foundational distributed training environment using PyTorch's native DistributedDataParallel
(DDP) module, which implements the data parallelism strategy discussed earlier. While frameworks like DeepSpeed offer more advanced features, understanding the core mechanics with DDP provides a solid base.
We'll simulate a scenario where we train a simple model across multiple GPUs (even if you only have one physical GPU, PyTorch allows simulating multiple processes on it, though performance benefits only appear with actual multiple devices).
Ensure you have PyTorch installed (pip install torch torchvision torchaudio
). For actual multi-GPU training, you'll need an appropriate CUDA setup and NVIDIA drivers.
torch.distributed.init_process_group()
. This function requires specifying a backend (like nccl
for NVIDIA GPUs or gloo
for CPU/mixed environments), and it typically relies on environment variables (MASTER_ADDR
, MASTER_PORT
, RANK
, WORLD_SIZE
) set by the launch utility.torch.nn.parallel.DistributedDataParallel
. This wrapper handles gradient synchronization across processes during the backward pass. It also ensures the model is placed on the correct device for each process.torch.utils.data.distributed.DistributedSampler
is used with the DataLoader to achieve this partitioning automatically.torchrun
(the recommended tool, superseding torch.distributed.launch
) is used to spawn multiple worker processes and set the necessary environment variables for initialization.Let's create a Python script (basic_ddp_train.py
) demonstrating these components. We'll use a very simple model and synthetic data to keep the focus on the distributed setup.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
import os
import argparse
# 1. Define a Simple Model
class SimpleModel(nn.Module):
def __init__(self):
super(SimpleModel, self).__init__()
self.linear1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.linear2 = nn.Linear(10, 5)
def forward(self, x):
x = self.linear1(x)
x = self.relu(x)
x = self.linear2(x)
return x
# 2. Setup Function for Distributed Environment
def setup(rank, world_size):
"""Initializes the distributed environment."""
os.environ['MASTER_ADDR'] = 'localhost' # Address of the master node
os.environ['MASTER_PORT'] = '12355' # Port for communication
# Initialize the process group
# 'nccl' is preferred for NVIDIA GPUs
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank) # Pin the process to a specific GPU
# 3. Cleanup Function
def cleanup():
"""Destroys the process group."""
dist.destroy_process_group()
# 4. Main Training Function for a Single Process
def train_process(rank, world_size, epochs=3):
"""The core training logic executed by each process."""
print(f"Initializing process {rank} of {world_size}...")
setup(rank, world_size)
# Create model and move it to the GPU specific to this rank
model = SimpleModel().to(rank)
# Wrap the model with DDP
ddp_model = DDP(model, device_ids=[rank])
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
# Create synthetic data
# Use a fixed seed for reproducibility across processes if needed for data generation
# torch.manual_seed(42) # Optional: for consistent random data generation
inputs = torch.randn(128, 10) # 128 samples, 10 features
labels = torch.randint(0, 5, (128,)) # 128 labels for 5 classes
dataset = TensorDataset(inputs, labels)
# Use DistributedSampler to partition data
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
# Set shuffle=False because DistributedSampler handles shuffling
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler, shuffle=False)
print(f"Rank {rank} starting training...")
for epoch in range(epochs):
sampler.set_epoch(epoch) # Ensure proper shuffling each epoch
epoch_loss = 0.0
num_batches = 0
for batch_inputs, batch_labels in dataloader:
# Move data to the process's assigned GPU
batch_inputs = batch_inputs.to(rank)
batch_labels = batch_labels.to(rank)
# Forward pass
outputs = ddp_model(batch_inputs)
loss = criterion(outputs, batch_labels)
# Backward pass and optimization
optimizer.zero_grad()
loss.backward() # DDP handles gradient averaging here
optimizer.step()
epoch_loss += loss.item()
num_batches += 1
avg_loss = epoch_loss / num_batches
# Print loss only from rank 0 to avoid clutter
if rank == 0:
print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
print(f"Rank {rank} finished training.")
cleanup()
# 5. Launcher Code (using torchrun convention)
if __name__ == "__main__":
# torchrun automatically sets RANK, LOCAL_RANK, WORLD_SIZE
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"]) # Total number of processes across all nodes
print(f"Starting DDP example on rank {local_rank} (world size {world_size}).")
train_process(rank=local_rank, world_size=world_size, epochs=5)
print("DDP Example Finished.")
To run this script simulating 2 GPU processes on a machine (even if it only has one GPU, for demonstration), save it as basic_ddp_train.py
and execute it from your terminal using torchrun
:
# Run with 2 processes (simulating 2 GPUs) on the current machine
torchrun --standalone --nnodes=1 --nproc_per_node=2 basic_ddp_train.py
--standalone
: Indicates running without an external cluster manager like Slurm.--nnodes=1
: Specifies running on a single machine (node).--nproc_per_node=2
: Sets the number of processes (and implicitly GPUs used, one per process) to launch on this node.You should see output indicating initialization from two processes (Rank 0 and Rank 1), followed by epoch loss printed only by Rank 0, and finally cleanup messages from both.
setup(rank, world_size)
: Initializes the connection between processes. MASTER_ADDR
and MASTER_PORT
tell processes where to find the coordination point (process rank 0). init_process_group
establishes the communication backend (nccl
is highly optimized for NVIDIA GPUs). torch.cuda.set_device(rank)
ensures each process controls a specific GPU, preventing conflicts.SimpleModel
: A standard PyTorch model. No changes are needed here for DDP itself.DDP(model, device_ids=[rank])
: This is the important wrapper. It takes the local model replica and the GPU ID it should run on. DDP automatically adds hooks to synchronize gradients during loss.backward()
across all processes in the group. It also ensures model parameters stay synchronized.DistributedSampler
: Crucial for data parallelism. It ensures each process receives a distinct, non-overlapping portion of the dataset in each epoch. Without it, all processes would train on the same data, defeating the purpose. sampler.set_epoch(epoch)
is necessary to ensure shuffling works correctly across epochs.DataLoader
: The sampler is passed to the DataLoader. Note that shuffle=True
in the DataLoader should be False
because the DistributedSampler
handles the shuffling logic in a distributed manner.rank
device (batch_inputs.to(rank)
).loss.backward()
implicitly triggers gradient synchronization via the DDP wrapper.rank == 0
) to avoid redundancy and potential race conditions.torchrun
: This utility handles launching the nproc_per_node
copies of your script and injecting the necessary environment variables (RANK
, LOCAL_RANK
, WORLD_SIZE
, MASTER_ADDR
, MASTER_PORT
). LOCAL_RANK
refers to the process rank within the current node, while RANK
is the global rank across all nodes. For our single-node example (--nnodes=1
), RANK
and LOCAL_RANK
are the same.This example uses basic Data Parallelism. As discussed earlier in the chapter:
torchrun
would be used alongside cluster managers (like Slurm, Kubernetes) to launch processes across different machines, coordinating MASTER_ADDR
and network configurations, which relates back to the infrastructure sections.This hands-on exercise provides the fundamental code structure for distributed training. Building complex LLM training pipelines involves scaling this pattern, integrating specialized frameworks, managing checkpoints robustly (torch.distributed.save_checkpoint
/ framework-specific utilities), and handling potential hardware failures within these long-running jobs.
© 2025 ApX Machine Learning