Processing transaction logs and applying them to a data lake table requires a shift from standard append-only logic. A typical log-based architecture aims to reconstruct the current state of an entity (such as a customer or order) by replaying a sequence of changes, including inserts, updates, and deletes, onto a target table. Here is a demonstration of how to implement a Change Data Capture (CDC) pipeline using Apache Spark and Delta Lake.
We focus on the "Merge" pattern. Unlike a standard insert into a data lake, a merge operation must look up existing records by a primary key and decide whether to update the row, insert a new one, or remove it entirely. This approach allows the data lake to act as a synchronized replica of an operational database.
The flow of data in a CDC pipeline generally follows a specific path: capture, transport, and apply. The capture phase reads the database logs (e.g., Postgres Write-Ahead Logs or MySQL Binlog). The transport layer (often Kafka or Kinesis) buffers these events. The apply phase, which we build here, reads the events and updates the storage layer.
Architecture flow showing the movement of data from source transaction logs to the final target table in the data lake.
Raw CDC events typically contain metadata about the operation alongside the data payload. A common structure includes:
op): Indicates if the change was a create (c), update (u), or delete (d).ts_ms): The exact time the change occurred in the source.For this practical exercise, we simulate an incoming stream of customer data changes. We assume the data has already been ingested into a raw DataFrame and now requires processing into a refined table.
Before processing changes, the target table must exist. In a production environment, you might perform an initial snapshot load (bootstrap) to populate the table. Here, we initialize an empty Delta table representing our customers dataset.
from delta.tables import *
from pyspark.sql.functions import *
# Define the schema for our customer table
# In a real scenario, this location would be on S3, ADLS, or GCS
table_path = "/tmp/delta/customers"
# Create an empty Delta table if it does not exist
if not DeltaTable.isDeltaTable(spark, table_path):
spark.createDataFrame([], schema="id INT, name STRING, email STRING, updated_at TIMESTAMP") \
.write \
.format("delta") \
.mode("overwrite") \
.save(table_path)
target_table = DeltaTable.forPath(spark, table_path)
One of the most critical challenges in distributed CDC pipelines is handling multiple changes for the same record within a single processing batch. If a customer updates their email address three times in one minute, the ingestion batch might contain all three events.
Applying these events blindly can lead to race conditions or incorrect final states. You must deduplicate the incoming micro-batch to keep only the most recent change for each primary key before merging.
# Simulated batch of CDC events
# Operations:
# 1. Insert Customer 101
# 2. Insert Customer 102
# 3. Update Customer 101 (Correcting name)
# 4. Delete Customer 103 (Assume 103 existed previously)
cdc_data = [
(101, "John Doe", "[email protected]", "2023-10-27 10:00:00", "c"),
(102, "Jane Smith", "[email protected]", "2023-10-27 10:05:00", "c"),
(101, "Johnathan Doe", "[email protected]", "2023-10-27 10:10:00", "u"),
(103, None, None, "2023-10-27 10:15:00", "d")
]
columns = ["id", "name", "email", "updated_at", "op"]
updates_df = spark.createDataFrame(cdc_data, columns)
# Deduplication Logic:
# Window function to rank changes by timestamp descending for each ID
from pyspark.sql.window import Window
window_spec = Window.partitionBy("id").orderBy(col("updated_at").desc())
deduplicated_updates = updates_df \
.withColumn("rank", row_number().over(window_spec)) \
.filter(col("rank") == 1) \
.drop("rank")
In this logic, we define a window over the primary key id and order by updated_at. By filtering for rank == 1, we ensure that only the final state of Customer 101 (the update) is passed to the merge operation, discarding the initial insert event from the same batch.
With a clean set of updates, we apply the changes to the target Delta table. The MERGE statement in Delta Lake allows us to handle inserts, updates, and deletes in a single atomic transaction. This guarantees that the data lake remains consistent even if the job fails midway.
The logic follows these rules:
d (delete), remove the row.u (update), update the row values.d, insert the new row.target_table.alias("target") \
.merge(
deduplicated_updates.alias("source"),
"target.id = source.id"
) \
.whenMatchedDelete(
condition = "source.op = 'd'"
) \
.whenMatchedUpdate(
set = {
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at"
}
) \
.whenNotMatchedInsert(
condition = "source.op != 'd'",
values = {
"id": "source.id",
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at"
}
) \
.execute()
When Spark executes a merge, it must locate the files containing the matching IDs. If the target table is large, this can trigger an expensive full table scan or massive data shuffling across the cluster.
To optimize this, data engineers often partition the target table by a high-level attribute (like date or region) or use Z-Order indexing. However, CDC pipelines often operate purely on primary keys (IDs), which typically do not align with partition columns.
The chart below illustrates the cost difference between a standard merge (searching all files) and a partition-pruned merge (searching only relevant files). While we cannot always prune by partition in random-access UUID updates, enabling features like Deletion Vectors or Bloom Filters in Delta Lake or Iceberg helps reduce the I/O overhead significantly.
Comparison of execution time between a standard merge and an optimized merge using file-skipping techniques (Z-Order) as data volume grows.
CDC pipelines are sensitive to upstream changes. If the operational database adds a new column, the CDC stream will include it, but your merge statement might fail if the target schema is rigid.
To handle this robustly, you can enable automatic schema evolution. In Delta Lake, this is done by setting the configuration spark.databricks.delta.schema.autoMerge.enabled to true in the Spark session configuration.
With this enabled, if the source DataFrame contains new columns that do not exist in the target table, the merge operation will alter the target table schema to include them before applying the data.
Because the merge operation rewrites entire files even if only a single row in that file has changed, frequent small merges can lead to the "small file problem." It is best practice to avoid running CDC merges continuously (e.g., every second). Instead, batching updates into windows of 5 to 15 minutes creates a balance between data freshness and storage efficiency.
After the pipeline runs, you verify the state of the data. The expectation for our simulated data is that Customer 101 exists with the name "Johnathan Doe" (result of the update), Customer 102 exists, and Customer 103 (if they existed previously) is removed.
This implementation pattern provides a resilient foundation for ingestion, ensuring that the analytical data lake accurately reflects the operational reality.
Was this section helpful?
© 2026 ApX Machine LearningEngineered with