Establishing a connection between the processes involved in your distributed training job is a fundamental first step. PyTorch's torch.distributed
package provides the necessary tools to manage this communication setup. The core concept revolves around creating a process group, which includes all the processes participating in the job. Each process is assigned a unique rank, and collectively they know the total number of processes, known as the world size.
The primary function for setting up the distributed environment is torch.distributed.init_process_group
. This function initializes the default process group and must be called by every process participating in the distributed job.
import torch
import torch.distributed as dist
import os
def setup_distributed(backend='nccl'):
"""Initializes the distributed environment."""
if not dist.is_available():
print("Distributed training is not available.")
return
if not dist.is_initialized():
# These environment variables are typically set by the launch utility
# (e.g., torchrun, Slurm)
rank = int(os.environ.get("RANK", "0"))
world_size = int(os.environ.get("WORLD_SIZE", "1"))
master_addr = os.environ.get("MASTER_ADDR", "localhost")
master_port = os.environ.get("MASTER_PORT", "29500") # Default port
print(f"Initializing process group: Rank {rank}/{world_size}")
dist.init_process_group(
backend=backend,
init_method=f'tcp://{master_addr}:{master_port}',
rank=rank,
world_size=world_size
)
print(f"Process group initialized ({backend}).")
# Set the device for the current process. This is important!
# Assumes one process per GPU.
if backend == 'nccl' and torch.cuda.is_available():
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
torch.cuda.set_device(local_rank)
print(f"Rank {dist.get_rank()} using GPU {local_rank}")
# Example usage (typically called at the start of your script)
# setup_distributed(backend='nccl') # Or 'gloo'
Let's break down the arguments to init_process_group
:
backend
: Specifies the communication library to use. This choice depends on your hardware and requirements.init_method
: Defines how processes discover each other. The most common method uses TCP and requires the address and port of the rank 0 process (the master). Environment variable-based initialization ('env://'
) is also frequently used, relying on specific environment variables being preset.rank
: The unique identifier for the current process, ranging from 0 to world_size - 1
.world_size
: The total number of processes involved in the training job.PyTorch supports several backends for handling the underlying communication between processes:
NCCL (NVIDIA Collective Communication Library): This is the preferred backend for GPU-based distributed training on NVIDIA hardware. It offers optimized implementations of collective communication operations (like all_reduce
, broadcast
) specifically for CUDA tensors, providing high bandwidth and low latency. Use NCCL when all your processes are running on machines equipped with NVIDIA GPUs connected via high-speed interconnects like NVLink or InfiniBand.
Gloo: Gloo is a more general-purpose backend that works for both CPU and GPU communication. It's a good choice for heterogeneous environments, CPU-only training, or when NCCL is not available or suitable (e.g., certain cloud environments or older hardware). While generally slower than NCCL for GPU-to-GPU communication, it offers broader compatibility.
MPI (Message Passing Interface): MPI is a standard for high-performance computing. If your cluster environment is already configured with an MPI implementation (like Open MPI), you can use the MPI backend. PyTorch needs to be built with MPI support for this option to be available. It's often used in academic or research clusters.
The choice of backend impacts performance significantly. For typical multi-GPU training on NVIDIA hardware, NCCL is almost always the best option.
Process group initialization using a communication backend and a discovery method (like TCP with Master Address/Port). Each process calls
init_process_group
.
Distributed training setups commonly rely on environment variables for configuration, especially when using launch utilities. The most important ones are:
MASTER_ADDR
: The IP address or hostname of the machine hosting process rank 0. All other processes need to connect to this address.MASTER_PORT
: An open network port on the rank 0 machine for processes to coordinate initialization. Choose a port that is unlikely to be in use (e.g., above 10000).RANK
: The unique rank of the current process.WORLD_SIZE
: The total number of processes participating in the job.LOCAL_RANK
(often used): When running multiple processes on the same node (e.g., one process per GPU), LOCAL_RANK
typically identifies the process's index within that node (often from 0 to num_gpus_on_node - 1
). This is frequently used to assign a specific GPU to each process using torch.cuda.set_device(local_rank)
.These variables are usually set automatically by launch scripts or cluster schedulers.
Manually setting environment variables and launching Python scripts on each machine can be tedious and error-prone. PyTorch provides the torchrun
(previously torch.distributed.launch
) utility to simplify this process.
torchrun
handles setting the necessary environment variables (RANK
, WORLD_SIZE
, MASTER_ADDR
, MASTER_PORT
, LOCAL_RANK
) and launches the specified number of processes per node.
Example using torchrun
on a single machine with 2 GPUs:
# Assume your training script is named train.py
# and it contains the setup_distributed() function shown earlier.
# --nproc_per_node specifies how many processes to launch on this machine.
# Usually set to the number of GPUs available.
# --nnodes=1 indicates we are running on a single machine.
# train.py is your script, followed by its arguments.
torchrun --nproc_per_node=2 --nnodes=1 train.py --arg1 value1 --arg2 value2
For multi-node training, you would typically run torchrun
on each node, specifying the total number of nodes (--nnodes
), the rank of the current node (--node_rank
), and the master node's address and port (--rdzv_endpoint
):
# On Master Node (Rank 0):
torchrun \
--nproc_per_node=<gpus_on_master> \
--nnodes=<total_nodes> \
--node_rank=0 \
--rdzv_id=<job_id> \
--rdzv_backend=c10d \
--rdzv_endpoint="<master_node_ip>:<port>" \
train.py --args...
# On Worker Node 1 (Rank 1):
torchrun \
--nproc_per_node=<gpus_on_worker1> \
--nnodes=<total_nodes> \
--node_rank=1 \
--rdzv_id=<job_id> \
--rdzv_backend=c10d \
--rdzv_endpoint="<master_node_ip>:<port>" \
train.py --args...
# ... and so on for other worker nodes
Here, rdzv
stands for rendezvous. torchrun
uses this mechanism (often built on top of torch.distributed
's c10d
backend) for coordinating the launch across nodes. The <job_id>
should be unique to this specific training run.
After training, it's good practice to clean up the distributed environment resources:
def cleanup_distributed():
"""Destroys the default process group."""
if dist.is_initialized():
dist.destroy_process_group()
print("Process group destroyed.")
# Example usage (typically at the end of your script or in a finally block)
# try:
# # setup_distributed(...)
# # ... training loop ...
# finally:
# cleanup_distributed()
Calling dist.destroy_process_group()
releases the resources associated with the process group. While Python's exit often handles this, explicit cleanup is safer, especially in complex applications or long-running services.
Properly configuring the distributed environment is the foundation upon which all parallel training strategies are built. Understanding process groups, ranks, world size, communication backends, and launch utilities like torchrun
is essential for scaling your PyTorch training jobs effectively.
© 2025 ApX Machine Learning