While native deep learning frameworks provide foundational tools for data parallelism, their setup can be verbose, requiring significant boilerplate code to manage process groups and communication. Horovod, originally developed at Uber, was created to simplify this process. It provides a high-level, framework-agnostic API that sits on top of a communication backend like MPI (Message Passing Interface) or Gloo, making it straightforward to convert a single-GPU training script into a multi-GPU distributed one.
Horovod operates on a single-program, multiple-data (SPMD) model. You write one script, and horovodrun launches multiple copies of it as independent processes. Each process, identified by its unique "rank," runs the same code but operates on a different shard of the training data.
To adapt a script for Horovod, you only need to insert a few important function calls. The core components of the Horovod API (hvd) are:
hvd.init(): Initializes Horovod. This must be the first Horovod operation. It establishes the communication backend and allows processes to discover each other.hvd.rank(): Returns the unique ID or rank of the current process, from 0 to hvd.size() - 1. The process with rank 0 typically handles special tasks like logging or saving checkpoints.hvd.local_rank(): Returns the unique rank of the process within a single server node. This is essential for pinning a process to a specific GPU.hvd.size(): Returns the total number of processes participating in the distributed job.The most important task in data parallelism is synchronizing the model's gradients across all workers after the backward pass. A naive approach might designate one worker (e.g., rank 0) as a parameter server to collect gradients, average them, and send them back. This creates a communication bottleneck, as rank 0's bandwidth limits the entire cluster's performance.
Horovod solves this with a highly efficient, decentralized algorithm called Ring-Allreduce. Instead of a central server, processes are arranged in a logical ring. The gradient tensor on each GPU is split into chunks. In the first phase, scatter-reduce, each process sends a chunk to its neighbor in the ring while receiving a different chunk from its other neighbor, summing them as they arrive. After N-1 steps (where N is the number of GPUs), each process holds the complete sum for a specific chunk. In the second phase, all-gather, processes exchange these summed chunks around the ring until all processes have a copy of the entire, fully-averaged gradient tensor. This approach ensures that network bandwidth usage is optimal and independent of the number of workers.
The Ring-Allreduce algorithm avoids a central bottleneck by organizing workers in a logical ring. Gradients are efficiently averaged in two phases without overwhelming a single node.
Let's walk through the five modifications needed to convert a standard PyTorch training script to use Horovod for distributed training.
Add hvd.init() at the beginning of your script to set up the communication backend.
To prevent processes on the same machine from competing for GPUs, each process must be pinned to an exclusive GPU. Use hvd.local_rank() for this.
# Pin GPU to be used to process local rank (one GPU per process)
torch.cuda.set_device(hvd.local_rank())
When using data parallelism, the global batch size becomes local_batch_size * hvd.size(). A larger batch size often requires a higher learning rate to maintain convergence speed. A common heuristic is to scale the base learning rate by the number of workers.
# A common practice is to scale learning rate by the number of workers.
optimizer = torch.optim.Adam(model.parameters(), lr=base_lr * hvd.size())
This is the central step that enables gradient synchronization. Horovod provides a DistributedOptimizer wrapper that intercepts the optimizer.step() call. Before updating the weights, it automatically performs the All-Reduce operation on the gradients in the background.
import horovod.torch as hvd
# ... (define model and optimizer)
# Wrap the optimizer with Horovod's DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters())
The named_parameters argument helps Horovod map gradients to model parameters, which is important for certain advanced features like gradient compression.
To ensure a consistent starting point, the initial model weights from rank 0 must be broadcast to all other processes. This prevents the workers from diverging due to different random initializations. Horovod handles this with a single call after the optimizer has been wrapped.
# Broadcast parameters from rank 0 to all other processes.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
Additionally, you must ensure that each process receives a unique, non-overlapping subset of the data for each epoch. PyTorch's DistributedSampler integrates directly with Horovod for this purpose.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
Here is a condensed PyTorch script showing these five modifications in context.
import torch
import horovod.torch as hvd
# 1. Initialize Horovod
hvd.init()
# 2. Pin GPU to local rank
torch.cuda.set_device(hvd.local_rank())
# Define a simple model
model = torch.nn.Sequential(torch.nn.Linear(784, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 10))
model.cuda()
# Define loss function and optimizer
base_lr = 0.001
criterion = torch.nn.CrossEntropyLoss()
# 3. Scale learning rate by the number of workers
optimizer = torch.optim.Adam(model.parameters(), lr=base_lr * hvd.size())
# 4. Wrap optimizer with DistributedOptimizer
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
)
# 5. Broadcast initial state from rank 0
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Create a dummy dataset and sampler for demonstration
# 5. Partition data using DistributedSampler
data = torch.randn(1000, 784)
labels = torch.randint(0, 10, (1000,))
dataset = torch.utils.data.TensorDataset(data, labels)
sampler = torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=hvd.size(), rank=hvd.rank())
data_loader = torch.utils.data.DataLoader(dataset, batch_size=32, sampler=sampler)
# Training Loop
model.train()
for epoch in range(5):
for batch_idx, (data, target) in enumerate(data_loader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step() # All-reduce is handled by the wrapper here
if hvd.rank() == 0:
print(f"Epoch {epoch}: loss={loss.item()}")
To run this script, you use the horovodrun command-line tool. It manages the launch of the MPI processes. To run the script on a single machine with four GPUs, the command would be:
# -np specifies the total number of processes (workers)
# -H specifies the hosts. 'localhost:4' means 4 processes on the local machine.
horovodrun -np 4 -H localhost:4 python train_horovod.py
Horovod's strength lies in its simplicity and efficiency for pure data-parallel workloads. By adding just a few lines of code, you can distribute a single-GPU script across many accelerators. However, its model assumes the entire model replica, along with its optimizer states and gradients, can fit into the memory of a single accelerator. As we will see next, for models that exceed this limit, we need more advanced memory optimization and model-sharding techniques provided by frameworks like Microsoft DeepSpeed.
Was this section helpful?
DistributedSampler, which is used for data partitioning in Horovod-integrated scripts.© 2026 ApX Machine LearningEngineered with