Setting up a distributed training job involves coordinating multiple processes, often running on different devices (GPUs), to work together on training a single model. A foundational distributed training environment can be established using PyTorch's native DistributedDataParallel (DDP) module, which implements the data parallelism strategy. While frameworks like DeepSpeed offer more advanced features, understanding the core mechanics with DDP provides a solid base.We'll simulate a scenario where we train a simple model across multiple GPUs (even if you only have one physical GPU, PyTorch allows simulating multiple processes on it, though performance benefits only appear with actual multiple devices).PrerequisitesEnsure you have PyTorch installed (pip install torch torchvision torchaudio). For actual multi-GPU training, you'll need an appropriate CUDA setup and NVIDIA drivers.Core Components of a PyTorch DDP SetupInitialization: Each process needs to join a communication group. This is done using torch.distributed.init_process_group(). This function requires specifying a backend (like nccl for NVIDIA GPUs or gloo for CPU/mixed environments), and it typically relies on environment variables (MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE) set by the launch utility.Model Wrapping: The standard PyTorch model needs to be wrapped with torch.nn.parallel.DistributedDataParallel. This wrapper handles gradient synchronization across processes during the backward pass. It also ensures the model is placed on the correct device for each process.Data Partitioning: Each process should operate on a unique subset of the data for each epoch. torch.utils.data.distributed.DistributedSampler is used with the DataLoader to achieve this partitioning automatically.Launch Utility: A utility like torchrun (the recommended tool, superseding torch.distributed.launch) is used to spawn multiple worker processes and set the necessary environment variables for initialization.Example ImplementationLet's create a Python script (basic_ddp_train.py) demonstrating these components. We'll use a very simple model and synthetic data to keep the focus on the distributed setup.import torch import torch.nn as nn import torch.optim as optim import torch.distributed as dist import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader, TensorDataset from torch.utils.data.distributed import DistributedSampler import os import argparse # 1. Define a Simple Model class SimpleModel(nn.Module): def __init__(self): super(SimpleModel, self).__init__() self.linear1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.linear2 = nn.Linear(10, 5) def forward(self, x): x = self.linear1(x) x = self.relu(x) x = self.linear2(x) return x # 2. Setup Function for Distributed Environment def setup(rank, world_size): """Initializes the distributed environment.""" os.environ['MASTER_ADDR'] = 'localhost' # Address of the master node os.environ['MASTER_PORT'] = '12355' # Port for communication # Initialize the process group # 'nccl' is preferred for NVIDIA GPUs dist.init_process_group("nccl", rank=rank, world_size=world_size) torch.cuda.set_device(rank) # Pin the process to a specific GPU # 3. Cleanup Function def cleanup(): """Destroys the process group.""" dist.destroy_process_group() # 4. Main Training Function for a Single Process def train_process(rank, world_size, epochs=3): """The core training logic executed by each process.""" print(f"Initializing process {rank} of {world_size}...") setup(rank, world_size) # Create model and move it to the GPU specific to this rank model = SimpleModel().to(rank) # Wrap the model with DDP ddp_model = DDP(model, device_ids=[rank]) # Loss function and optimizer criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.01) # Create synthetic data # Use a fixed seed for reproducibility across processes if needed for data generation # torch.manual_seed(42) # Optional: for consistent random data generation inputs = torch.randn(128, 10) # 128 samples, 10 features labels = torch.randint(0, 5, (128,)) # 128 labels for 5 classes dataset = TensorDataset(inputs, labels) # Use DistributedSampler to partition data sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank) # Set shuffle=False because DistributedSampler handles shuffling dataloader = DataLoader(dataset, batch_size=32, sampler=sampler, shuffle=False) print(f"Rank {rank} starting training...") for epoch in range(epochs): sampler.set_epoch(epoch) # Ensure proper shuffling each epoch epoch_loss = 0.0 num_batches = 0 for batch_inputs, batch_labels in dataloader: # Move data to the process's assigned GPU batch_inputs = batch_inputs.to(rank) batch_labels = batch_labels.to(rank) # Forward pass outputs = ddp_model(batch_inputs) loss = criterion(outputs, batch_labels) # Backward pass and optimization optimizer.zero_grad() loss.backward() # DDP handles gradient averaging here optimizer.step() epoch_loss += loss.item() num_batches += 1 avg_loss = epoch_loss / num_batches # Print loss only from rank 0 to avoid clutter if rank == 0: print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}") print(f"Rank {rank} finished training.") cleanup() # 5. Launcher Code (using torchrun convention) if __name__ == "__main__": # torchrun automatically sets RANK, LOCAL_RANK, WORLD_SIZE local_rank = int(os.environ["LOCAL_RANK"]) world_size = int(os.environ["WORLD_SIZE"]) # Total number of processes across all nodes print(f"Starting DDP example on rank {local_rank} (size {world_size}).") train_process(rank=local_rank, world_size=world_size, epochs=5) print("DDP Example Finished.") Running the ExampleTo run this script simulating 2 GPU processes on a machine (even if it only has one GPU, for demonstration), save it as basic_ddp_train.py and execute it from your terminal using torchrun:# Run with 2 processes (simulating 2 GPUs) on the current machine torchrun --standalone --nnodes=1 --nproc_per_node=2 basic_ddp_train.py--standalone: Indicates running without an external cluster manager like Slurm.--nnodes=1: Specifies running on a single machine (node).--nproc_per_node=2: Sets the number of processes (and implicitly GPUs used, one per process) to launch on this node.You should see output indicating initialization from two processes (Rank 0 and Rank 1), followed by epoch loss printed only by Rank 0, and finally cleanup messages from both.Explanation Breakdownsetup(rank, world_size): Initializes the connection between processes. MASTER_ADDR and MASTER_PORT tell processes where to find the coordination point (process rank 0). init_process_group establishes the communication backend (nccl is highly optimized for NVIDIA GPUs). torch.cuda.set_device(rank) ensures each process controls a specific GPU, preventing conflicts.SimpleModel: A standard PyTorch model. No changes are needed here for DDP itself.DDP(model, device_ids=[rank]): This is the important wrapper. It takes the local model replica and the GPU ID it should run on. DDP automatically adds hooks to synchronize gradients during loss.backward() across all processes in the group. It also ensures model parameters stay synchronized.DistributedSampler: Important for data parallelism. It ensures each process receives a distinct, non-overlapping portion of the dataset in each epoch. Without it, all processes would train on the same data, defeating the purpose. sampler.set_epoch(epoch) is necessary to ensure shuffling works correctly across epochs.DataLoader: The sampler is passed to the DataLoader. Note that shuffle=True in the DataLoader should be False because the DistributedSampler handles the shuffling logic in a distributed manner.Training Loop: The core loop looks very similar to single-GPU training. Main differences:Data is moved to the specific rank device (batch_inputs.to(rank)).loss.backward() implicitly triggers gradient synchronization via the DDP wrapper.Logging/Saving: Often, you only perform actions like printing logs or saving checkpoints from one process (usually rank == 0) to avoid redundancy and potential race conditions.torchrun: This utility handles launching the nproc_per_node copies of your script and injecting the necessary environment variables (RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT). LOCAL_RANK refers to the process rank within the current node, while RANK is the global rank across all nodes. For our single-node example (--nnodes=1), RANK and LOCAL_RANK are the same.Connecting to Advanced ConceptsThis example uses basic Data Parallelism. As discussed earlier in the chapter:Model Size Limits: DDP replicates the entire model on each GPU. This works well until the model becomes too large to fit even on a single GPU's memory.Frameworks like DeepSpeed: DeepSpeed builds upon these concepts, integrating techniques like ZeRO (Zero Redundancy Optimizer) which partitions optimizer states, gradients, and even model parameters across GPUs (a more advanced form of data parallelism combined with elements of model parallelism), drastically reducing the memory footprint per GPU. It also integrates support for Tensor and Pipeline Parallelism for models that are too large even for ZeRO. "* Orchestration: In a multi-node scenario, torchrun would be used alongside cluster managers (like Slurm, Kubernetes) to launch processes across different machines, coordinating MASTER_ADDR and network configurations, which relates back to the infrastructure sections."This hands-on exercise provides the fundamental code structure for distributed training. Building complex LLM training pipelines involves scaling this pattern, integrating specialized frameworks, managing checkpoints robustly (torch.distributed.save_checkpoint / framework-specific utilities), and handling potential hardware failures within these long-running jobs.