Now that we have explored the theoretical underpinnings of Denoising Diffusion Probabilistic Models (DDPMs), let's translate that theory into a practical implementation. This section guides you through building a basic DDPM, focusing on the essential components required to generate images. We will use PyTorch for demonstration purposes and target a standard dataset like MNIST or CIFAR-10, assuming you have dataloaders prepared.
Our goal is to implement the core mechanics: the forward noise process, the U-Net model for noise prediction, the simplified training loss calculation, and the reverse sampling process.
The diffusion process relies on a predefined variance schedule βt for t=1,…,T. Common choices include linear or cosine schedules. From βt, we derive αt=1−βt and αˉt=∏s=1tαs. These values control the noise level at each timestep and are essential for both the forward and reverse processes.
Let's define a function to precompute these values for a linear schedule over T timesteps.
import torch
import torch.nn.functional as F
def linear_beta_schedule(timesteps, beta_start=0.0001, beta_end=0.02):
"""
Generates a linear schedule for beta values.
"""
return torch.linspace(beta_start, beta_end, timesteps)
# Example setup
T = 200 # Number of diffusion timesteps
betas = linear_beta_schedule(timesteps=T)
# Precompute alphas and cumulative alphas
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
# Shifted cumulative product for easy indexing (alpha_bar_{t-1})
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
# Precompute terms needed for q_sample and posterior calculation
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas) # Needed for sampling step
# Variance term for the posterior q(x_{t-1} | x_t, x_0)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
# We often need to extract the correct value for a given batch of timesteps t
def extract(a, t, x_shape):
"""
Extracts the values from 'a' corresponding to the timesteps 't'
and reshapes them to broadcast across the image dimensions.
"""
batch_size = t.shape[0]
out = a.gather(-1, t.cpu()) # Get values corresponding to timesteps t
# Reshape to [batch_size, 1, 1, 1] for broadcasting
return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)
It can be helpful to visualize the cumulative product αˉt. As t increases, αˉt decreases towards zero, signifying more noise is added.
The cumulative product αˉt decreases smoothly as the timestep t increases, indicating a gradual addition of noise according to the linear variance schedule βt.
The forward process q(xt∣x0) adds Gaussian noise to an image x0 to produce xt at a given timestep t. This is defined by the equation:
xt=αˉtx0+1−αˉtϵ,where ϵ∼N(0,I)We can implement this directly using the precomputed values.
def q_sample(x_start, t, noise=None):
"""
Samples x_t given x_0 and t using the reparameterization trick.
x_start: The initial image (x_0) [Batch, Channels, Height, Width]
t: Timestep tensor [Batch]
noise: Optional noise tensor; if None, sample standard Gaussian noise.
"""
if noise is None:
noise = torch.randn_like(x_start)
# Extract sqrt(alpha_bar_t) and sqrt(1 - alpha_bar_t) for the given timesteps
sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
sqrt_one_minus_alphas_cumprod_t = extract(sqrt_one_minus_alphas_cumprod, t, x_start.shape)
# Apply the formula: x_t = sqrt(alpha_bar_t)*x_0 + sqrt(1-alpha_bar_t)*noise
noisy_image = sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise
return noisy_image
This function allows us to generate noisy versions of our input data for any timestep t, which is necessary for training the noise prediction network.
The core of the DDPM is a neural network, typically a U-Net architecture, trained to predict the noise ϵ that was added to an image at a given timestep t. The model ϵθ(xt,t) takes the noisy image xt and the timestep t as input.
The U-Net architecture is well-suited for this task due to its combination of downsampling (encoder), upsampling (decoder), and skip connections. The skip connections allow the decoder to reuse high-resolution features from the encoder, helping preserve image details while processing information across multiple scales.
A critical adaptation for diffusion models is incorporating the timestep information t. This is usually done by converting t into a time embedding vector (often using sinusoidal positional embeddings, similar to Transformers) and adding this embedding to the intermediate feature maps within the U-Net blocks.
# Placeholder for U-Net definition.
# Assume a U-Net class `UNetModel` exists, accepting image dimensions and time embedding dimension.
# Example signature: model = UNetModel(image_channels=1, model_channels=64, time_embed_dim=256, num_res_blocks=2)
# The model's forward pass would look like: predicted_noise = model(noisy_image, time_steps)
# Where `time_steps` is processed internally to generate embeddings.
# Implementing a full U-Net is beyond this snippet, but many standard implementations exist.
# Refer to repositories like 'denoising-diffusion-pytorch' or papers for details.
# Example usage (assuming model is defined and instantiated)
# model = UNetModel(...)
# noisy_image = ... # from q_sample
# t = ... # sampled timesteps tensor
# predicted_noise = model(noisy_image, t)
As discussed earlier, we often use the simplified training objective:
Lsimple(θ)=Et,x0,ϵ[∥ϵ−ϵθ(xt,t)∥2]where xt=αˉtx0+1−αˉtϵ.
In practice, the expectation is approximated using mini-batches. For each image x0 in a batch, we sample a random timestep t∼Uniform(1,…,T), sample noise ϵ∼N(0,I), compute xt using q_sample
, predict the noise ϵθ(xt,t) using the U-Net, and calculate the Mean Squared Error (MSE) between the true noise ϵ and the predicted noise ϵθ.
def p_losses(denoise_model, x_start, t):
"""
Calculates the loss for training the noise prediction model.
denoise_model: The U-Net model (epsilon_theta).
x_start: The initial image (x_0).
t: Sampled timesteps for the batch.
"""
# 1. Sample noise epsilon ~ N(0, I)
noise = torch.randn_like(x_start)
# 2. Compute x_t using q_sample
x_noisy = q_sample(x_start=x_start, t=t, noise=noise)
# 3. Predict the noise using the U-Net model
predicted_noise = denoise_model(x_noisy, t)
# 4. Calculate the MSE loss between the true noise and predicted noise
loss = F.mse_loss(noise, predicted_noise)
return loss
Generating new images involves reversing the diffusion process. We start with pure Gaussian noise xT∼N(0,I) and iteratively denoise it using the trained model ϵθ to estimate xt−1 from xt. The basic DDPM sampling step is:
xt−1=αt1(xt−1−αˉt1−αtϵθ(xt,t))+σtzwhere z∼N(0,I) for t>1, and z=0 for t=1. The variance σt2 is often set to βt or β~t=1−αˉt1−αˉt−1βt. Using β~t corresponds to the variance of the true posterior q(xt−1∣xt,x0).
Let's implement one step of this reverse process.
@torch.no_grad() # Important: disable gradients during sampling
def p_sample(model, x, t, t_index):
"""
Performs one sampling step (denoising) from x_t to x_{t-1}.
model: The trained U-Net model.
x: The current noisy image (x_t).
t: The current timestep (scalar tensor, e.g., torch.tensor([t])).
t_index: The index corresponding to timestep t (for accessing precomputed values).
"""
# Get model prediction for noise (epsilon_theta(x_t, t))
betas_t = extract(betas, t, x.shape)
sqrt_one_minus_alphas_cumprod_t = extract(sqrt_one_minus_alphas_cumprod, t, x.shape)
sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)
# Equation 11 in DDPM paper: Calculate mean based on model output
model_mean = sqrt_recip_alphas_t * (x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t)
if t_index == 0:
# If t=1 (index 0), no noise is added (z=0)
return model_mean
else:
# Calculate posterior variance sigma_t^2
posterior_variance_t = extract(posterior_variance, t, x.shape)
noise = torch.randn_like(x)
# Add noise: sigma_t * z
return model_mean + torch.sqrt(posterior_variance_t) * noise
@torch.no_grad()
def p_sample_loop(model, shape, device):
"""
Full sampling loop from T to 0.
model: Trained U-Net.
shape: Shape of the image to generate (e.g., [batch_size, channels, height, width]).
device: Computation device (e.g., 'cuda').
"""
b = shape[0] # Batch size
# Start with random noise N(0, I) at T
img = torch.randn(shape, device=device)
imgs = [] # Store intermediate images if desired
# Iterate backwards from T-1 down to 0
for i in reversed(range(0, T)):
timestep = torch.full((b,), i, device=device, dtype=torch.long)
img = p_sample(model, img, timestep, i)
# Optional: append intermediate results
# if i % 50 == 0: # Append every 50 steps
# imgs.append(img.cpu())
return img # Return the final generated image x_0
The following diagram illustrates the relationship between the forward (q) and reverse (p) processes.
The forward process transforms data x0 into noise xT through fixed steps q. The reverse process learns to undo this, starting from noise xT and using the model ϵθ at each step pθ to progressively generate data x0.
The training process combines these components:
p_losses
function: L=p_losses(ϵθ,x0,t).loss.backward()
.optimizer.step()
.optimizer.zero_grad()
.p_sample_loop
to monitor progress.# Simplified Training Loop Sketch
# Assume dataloader, model, optimizer are initialized
# model.train() # Set model to training mode
# for epoch in range(num_epochs):
# for step, batch in enumerate(dataloader):
# optimizer.zero_grad()
#
# batch = batch.to(device) # Assuming batch contains images x_0
# batch_size = batch.shape[0]
#
# # Sample timesteps t for the batch
# t = torch.randint(0, T, (batch_size,), device=device).long()
#
# # Calculate loss
# loss = p_losses(model, batch, t)
#
# # Backpropagate and update
# loss.backward()
# optimizer.step()
#
# # Logging, checkpointing, sampling...
# if step % log_interval == 0:
# print(f"Epoch {epoch} Step {step}: Loss = {loss.item()}")
# if step % sample_interval == 0:
# # Generate samples
# model.eval() # Set model to evaluation mode for sampling
# samples = p_sample_loop(model, shape=[num_samples, channels, height, width], device=device)
# # Save or display samples
# model.train() # Set back to training mode
With these components (diffusion schedule setup, q_sample
, U-Net, p_losses
, p_sample
, p_sample_loop
, and the training loop), you have a complete, albeit basic, DDPM implementation. Training these models requires significant computational resources and time, especially for larger images and deeper networks. Start with smaller datasets like MNIST and a moderate number of timesteps (T≈200−1000) to verify your implementation before scaling up.
After sufficient training, the p_sample_loop
function should generate images that resemble the training data distribution. The quality will depend heavily on the model capacity, dataset complexity, number of timesteps T, and training duration.
This hands-on implementation provides a foundation for understanding DDPMs. From here, you can explore the improvements discussed elsewhere in this course, such as faster sampling with DDIM, conditional generation using classifier or classifier-free guidance, and applying more sophisticated U-Net architectures.
© 2025 ApX Machine Learning