Building a basic Denoising Diffusion Probabilistic Model (DDPM) focuses on the essential components required to generate images. PyTorch is used for demonstration purposes, targeting a standard dataset like MNIST or CIFAR-10. Dataloaders are assumed to be prepared.Our goal is to implement the core mechanics: the forward noise process, the U-Net model for noise prediction, the simplified training loss calculation, and the reverse sampling process.Setting Up the Diffusion ScheduleThe diffusion process relies on a predefined variance schedule $\beta_t$ for $t=1, \dots, T$. Common choices include linear or cosine schedules. From $\beta_t$, we derive $\alpha_t = 1 - \beta_t$ and $\bar{\alpha}t = \prod{s=1}^t \alpha_s$. These values control the noise level at each timestep and are essential for both the forward and reverse processes.Let's define a function to precompute these values for a linear schedule over $T$ timesteps.import torch import torch.nn.functional as F def linear_beta_schedule(timesteps, beta_start=0.0001, beta_end=0.02): """ Generates a linear schedule for beta values. """ return torch.linspace(beta_start, beta_end, timesteps) # Example setup T = 200 # Number of diffusion timesteps betas = linear_beta_schedule(timesteps=T) # Precompute alphas and cumulative alphas alphas = 1. - betas alphas_cumprod = torch.cumprod(alphas, axis=0) # Shifted cumulative product for easy indexing (alpha_bar_{t-1}) alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0) # Precompute terms needed for q_sample and posterior calculation sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod) sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod) sqrt_recip_alphas = torch.sqrt(1.0 / alphas) # Needed for sampling step # Variance term for the posterior q(x_{t-1} | x_t, x_0) posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod) # We often need to extract the correct value for a given batch of timesteps t def extract(a, t, x_shape): """ Extracts the values from 'a' corresponding to the timesteps 't' and reshapes them to broadcast across the image dimensions. """ batch_size = t.shape[0] out = a.gather(-1, t.cpu()) # Get values corresponding to timesteps t # Reshape to [batch_size, 1, 1, 1] for broadcasting return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device) It can be helpful to visualize the cumulative product $\bar{\alpha}_t$. As $t$ increases, $\bar{\alpha}_t$ decreases towards zero, signifying more noise is added.{"data": [{"x": [0, 20, 40, 60, 80, 100, 120, 140, 160, 180, 199], "y": [1.0, 0.978, 0.915, 0.818, 0.698, 0.568, 0.440, 0.325, 0.227, 0.150, 0.096], "type": "scatter", "mode": "lines", "name": "alpha_bar_t", "line": {"color": "#4263eb"}}], "layout": {"title": "Cumulative Alpha Schedule (alpha_bar_t)", "xaxis": {"title": "Timestep (t)"}, "yaxis": {"title": "alpha_bar_t Value"}, "height": 350, "width": 600}}The cumulative product $\bar{\alpha}_t$ decreases smoothly as the timestep $t$ increases, indicating a gradual addition of noise according to the linear variance schedule $\beta_t$.Implementing the Forward Process (q_sample)The forward process $q(\mathbf{x}_t | \mathbf{x}_0)$ adds Gaussian noise to an image $\mathbf{x}_0$ to produce $\mathbf{x}_t$ at a given timestep $t$. This is defined by the equation: $$ \mathbf{x}_t = \sqrt{\bar{\alpha}_t}\mathbf{x}_0 + \sqrt{1-\bar{\alpha}_t}\boldsymbol{\epsilon}, \quad \text{where } \boldsymbol{\epsilon} \sim \mathcal{N}(0, \mathbf{I}) $$ We can implement this directly using the precomputed values.def q_sample(x_start, t, noise=None): """ Samples x_t given x_0 and t using the reparameterization trick. x_start: The initial image (x_0) [Batch, Channels, Height, Width] t: Timestep tensor [Batch] noise: Optional noise tensor; if None, sample standard Gaussian noise. """ if noise is None: noise = torch.randn_like(x_start) # Extract sqrt(alpha_bar_t) and sqrt(1 - alpha_bar_t) for the given timesteps sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape) sqrt_one_minus_alphas_cumprod_t = extract(sqrt_one_minus_alphas_cumprod, t, x_start.shape) # Apply the formula: x_t = sqrt(alpha_bar_t)*x_0 + sqrt(1-alpha_bar_t)*noise noisy_image = sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise return noisy_image This function allows us to generate noisy versions of our input data for any timestep $t$, which is necessary for training the noise prediction network.Model Architecture: U-Net for Noise PredictionThe core of the DDPM is a neural network, typically a U-Net architecture, trained to predict the noise $\boldsymbol{\epsilon}$ that was added to an image at a given timestep $t$. The model $\boldsymbol{\epsilon}_\theta(\mathbf{x}_t, t)$ takes the noisy image $\mathbf{x}_t$ and the timestep $t$ as input.The U-Net architecture is well-suited for this task due to its combination of downsampling (encoder), upsampling (decoder), and skip connections. The skip connections allow the decoder to reuse high-resolution features from the encoder, helping preserve image details while processing information across multiple scales.A critical adaptation for diffusion models is incorporating the timestep information $t$. This is usually done by converting $t$ into a time embedding vector (often using sinusoidal positional embeddings, similar to Transformers) and adding this embedding to the intermediate feature maps within the U-Net blocks.# Placeholder for U-Net definition. # Assume a U-Net class `UNetModel` exists, accepting image dimensions and time embedding dimension. # Example signature: model = UNetModel(image_channels=1, model_channels=64, time_embed_dim=256, num_res_blocks=2) # The model's forward pass would look like: predicted_noise = model(noisy_image, time_steps) # Where `time_steps` is processed internally to generate embeddings. # Implementing a full U-Net, many standard implementations exist. # Refer to repositories like 'denoising-diffusion-pytorch' or papers for details. # Example usage (assuming model is defined and instantiated) # model = UNetModel(...) # noisy_image = ... # from q_sample # t = ... # sampled timesteps tensor # predicted_noise = model(noisy_image, t)Training Loss ImplementationAs discussed earlier, we often use the simplified training objective: $$ L_{simple}(\theta) = \mathbb{E}_{t, \mathbf{x}0, \boldsymbol{\epsilon}} \left[ | \boldsymbol{\epsilon} - \boldsymbol{\epsilon}\theta(\mathbf{x}_t, t) |^2 \right] $$ where $\mathbf{x}_t = \sqrt{\bar{\alpha}_t}\mathbf{x}_0 + \sqrt{1-\bar{\alpha}_t}\boldsymbol{\epsilon}$.In practice, the expectation is approximated using mini-batches. For each image $\mathbf{x}_0$ in a batch, we sample a random timestep $t \sim \text{Uniform}(1, \dots, T)$, sample noise $\boldsymbol{\epsilon} \sim \mathcal{N}(0, \mathbf{I})$, compute $\mathbf{x}t$ using q_sample, predict the noise $\boldsymbol{\epsilon}\theta(\mathbf{x}t, t)$ using the U-Net, and calculate the Mean Squared Error (MSE) between the true noise $\boldsymbol{\epsilon}$ and the predicted noise $\boldsymbol{\epsilon}\theta$.def p_losses(denoise_model, x_start, t): """ Calculates the loss for training the noise prediction model. denoise_model: The U-Net model (epsilon_theta). x_start: The initial image (x_0). t: Sampled timesteps for the batch. """ # 1. Sample noise epsilon ~ N(0, I) noise = torch.randn_like(x_start) # 2. Compute x_t using q_sample x_noisy = q_sample(x_start=x_start, t=t, noise=noise) # 3. Predict the noise using the U-Net model predicted_noise = denoise_model(x_noisy, t) # 4. Calculate the MSE loss between the true noise and predicted noise loss = F.mse_loss(noise, predicted_noise) return loss Implementing the Sampling Process (Reverse)Generating new images involves reversing the diffusion process. We start with pure Gaussian noise $\mathbf{x}T \sim \mathcal{N}(0, \mathbf{I})$ and iteratively denoise it using the trained model $\boldsymbol{\epsilon}\theta$ to estimate $\mathbf{x}_{t-1}$ from $\mathbf{x}t$. The basic DDPM sampling step is: $$ \mathbf{x}{t-1} = \frac{1}{\sqrt{\alpha_t}} \left( \mathbf{x}_t - \frac{1-\alpha_t}{\sqrt{1-\bar{\alpha}t}} \boldsymbol{\epsilon}\theta(\mathbf{x}_t, t) \right) + \sigma_t \mathbf{z} $$ where $\mathbf{z} \sim \mathcal{N}(0, \mathbf{I})$ for $t > 1$, and $\mathbf{z} = 0$ for $t=1$. The variance $\sigma_t^2$ is often set to $\beta_t$ or $\tilde{\beta}t = \frac{1-\bar{\alpha}{t-1}}{1-\bar{\alpha}_t} \beta_t$. Using $\tilde{\beta}t$ corresponds to the variance of the true posterior $q(\mathbf{x}{t-1} | \mathbf{x}_t, \mathbf{x}_0)$.Let's implement one step of this reverse process.@torch.no_grad() # Important: disable gradients during sampling def p_sample(model, x, t, t_index): """ Performs one sampling step (denoising) from x_t to x_{t-1}. model: The trained U-Net model. x: The current noisy image (x_t). t: The current timestep (scalar tensor, e.g., torch.tensor([t])). t_index: The index corresponding to timestep t (for accessing precomputed values). """ # Get model prediction for noise (epsilon_theta(x_t, t)) betas_t = extract(betas, t, x.shape) sqrt_one_minus_alphas_cumprod_t = extract(sqrt_one_minus_alphas_cumprod, t, x.shape) sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape) # Equation 11 in DDPM paper: Calculate mean based on model output model_mean = sqrt_recip_alphas_t * (x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t) if t_index == 0: # If t=1 (index 0), no noise is added (z=0) return model_mean else: # Calculate posterior variance sigma_t^2 posterior_variance_t = extract(posterior_variance, t, x.shape) noise = torch.randn_like(x) # Add noise: sigma_t * z return model_mean + torch.sqrt(posterior_variance_t) * noise @torch.no_grad() def p_sample_loop(model, shape, device): """ Full sampling loop from T to 0. model: Trained U-Net. shape: Shape of the image to generate (e.g., [batch_size, channels, height, width]). device: Computation device (e.g., 'cuda'). """ b = shape[0] # Batch size # Start with random noise N(0, I) at T img = torch.randn(shape, device=device) imgs = [] # Store intermediate images if desired # Iterate backwards from T-1 down to 0 for i in reversed(range(0, T)): timestep = torch.full((b,), i, device=device, dtype=torch.long) img = p_sample(model, img, timestep, i) # Optional: append intermediate results # if i % 50 == 0: # Append every 50 steps # imgs.append(img.cpu()) return img # Return the final generated image x_0The following diagram illustrates the relationship between the forward (q) and reverse (p) processes.digraph G { bgcolor="transparent"; node [shape=box, style=rounded, fontname="Arial", fontsize=10, margin=0.2]; edge [fontname="Arial", fontsize=9]; subgraph cluster_forward { label = "Forward Process (Known)"; style = "filled"; color = "#e9ecef"; node [fillcolor="#ced4da"]; x0 [label="x_0 (Data)"]; xt_minus_1 [label="x_{t-1}"]; xt [label="x_t"]; xT [label="x_T (Noise)", fillcolor="#adb5bd"]; x0 -> xt_minus_1 [label="q(x_1|x_0)"]; xt_minus_1 -> xt [label="q(x_t|x_{t-1})"]; xt -> xT [label="... q(x_T|x_{T-1})"]; } subgraph cluster_reverse { label = "Reverse Process (Learned)"; style = "filled"; color = "#e9ecef"; node [fillcolor="#a5d8ff"]; p_xT [label="Sample x_T ~ N(0,I)", fillcolor="#74c0fc"]; p_xt [label="x_t"]; p_xt_minus_1 [label="x_{t-1}"]; p_x0 [label="x_0 (Generated)", fillcolor="#4dabf7"]; model [label="ε_θ(x_t, t)", shape=ellipse, fillcolor="#ffec99"]; p_xT -> p_xt [label="... p_θ(x_{T-1}|x_T)"]; p_xt -> p_xt_minus_1 [label="p_θ(x_{t-1}|x_t)"]; p_xt_minus_1 -> p_x0 [label="p_θ(x_0|x_1)"]; p_xt -> model [style=dashed, arrowhead=open, color="#495057"]; p_xt_minus_1 -> model [style=dashed, arrowhead=open, color="#495057"]; } xt -> p_xt [style=invis]; // Align vertically }The forward process transforms data $x_0$ into noise $x_T$ through fixed steps $q$. The reverse process learns to undo this, starting from noise $x_T$ and using the model $\epsilon_\theta$ at each step $p_\theta$ to progressively generate data $x_0$.Training Loop OutlineThe training process combines these components:Initialize the U-Net model $\boldsymbol{\epsilon}_\theta$ and an optimizer (e.g., Adam).Load the dataset (e.g., MNIST).Loop for a desired number of epochs:Iterate through batches of data $\mathbf{x}_0$.For each batch:Sample a random timestep $t$ for each image in the batch: $t \sim \text{Uniform}({1, \dots, T})$.Calculate the loss using the p_losses function: $L = \text{p_losses}(\boldsymbol{\epsilon}_\theta, \mathbf{x}_0, t)$.Perform backpropagation: loss.backward().Update model parameters: optimizer.step().Zero gradients: optimizer.zero_grad().Periodically save model checkpoints and generate sample images using p_sample_loop to monitor progress.# Simplified Training Loop Sketch # Assume dataloader, model, optimizer are initialized # model.train() # Set model to training mode # for epoch in range(num_epochs): # for step, batch in enumerate(dataloader): # optimizer.zero_grad() # # batch = batch.to(device) # Assuming batch contains images x_0 # batch_size = batch.shape[0] # # # Sample timesteps t for the batch # t = torch.randint(0, T, (batch_size,), device=device).long() # # # Calculate loss # loss = p_losses(model, batch, t) # # # Backpropagate and update # loss.backward() # optimizer.step() # # # Logging, checkpointing, sampling... # if step % log_interval == 0: # print(f"Epoch {epoch} Step {step}: Loss = {loss.item()}") # if step % sample_interval == 0: # # Generate samples # model.eval() # Set model to evaluation mode for sampling # samples = p_sample_loop(model, shape=[num_samples, channels, height, width], device=device) # # Save or display samples # model.train() # Set back to training modeRunning the ImplementationWith these components (diffusion schedule setup, q_sample, U-Net, p_losses, p_sample, p_sample_loop, and the training loop), you have a complete, albeit basic, DDPM implementation. Training these models requires significant computational resources and time, especially for larger images and deeper networks. Start with smaller datasets like MNIST and a moderate number of timesteps ($T \approx 200-1000$) to verify your implementation before scaling up.After sufficient training, the p_sample_loop function should generate images that resemble the training data distribution. The quality will depend heavily on the model capacity, dataset complexity, number of timesteps $T$, and training duration.This hands-on implementation provides a foundation for understanding DDPMs. From here, you can explore the improvements discussed elsewhere in this course, such as faster sampling with DDIM, conditional generation using classifier or classifier-free guidance, and applying more sophisticated U-Net architectures.