Developing a production-grade Change Data Capture (CDC) pipeline involves architecting a system capable of handling transaction log parsing, event serialization, and state reconstruction, surpassing the capabilities of simple replication scripts. A practical implementation involves building a pipeline that streams database changes from a transactional source (PostgreSQL) to an analytical destination (such as Snowflake or BigQuery). The main engineering challenges include decoding Write-Ahead Logs (WAL), handling schema variations, and applying idempotent merge operations to maintain data consistency.Architecture of a Log-Based PipelineThe fundamental component of this architecture is the decoupling of log extraction from log application. We do not run SELECT * queries against the source. Instead, we attach a listener to the database's transaction log, the binary history of every operation committed to the disk.The pipeline consists of three distinct stages:Extraction: A connector reads the WAL segments and converts binary events into a structured format (JSON/Avro).Transport & Buffering: A message bus (e.g., Kafka or Pub/Sub) creates a persistent buffer, allowing the source and destination to operate at different velocities.Application: The warehouse consumes the stream, lands data in a staging area, and merges it into the target tables.digraph G { rankdir=TB; node [shape=box, style=filled, fontname="Helvetica", fontsize=10, color=white]; edge [fontname="Helvetica", fontsize=8, color="#adb5bd"]; subgraph cluster_source { label="Source System"; style=filled; color="#e9ecef"; pg [label="PostgreSQL\n(Source)", fillcolor="#4dabf7", fontcolor="white"]; wal [label="Write-Ahead Log\n(WAL)", fillcolor="#74c0fc", fontcolor="white"]; } subgraph cluster_transport { label="Transport Layer"; style=filled; color="#f1f3f5"; connector [label="Debezium/CDC\nConnector", fillcolor="#9775fa", fontcolor="white"]; topic [label="Event Log\n(Kafka/PubSub)", fillcolor="#b197fc", fontcolor="white"]; } subgraph cluster_target { label="Data Warehouse"; style=filled; color="#e9ecef"; stage [label="Staging Table\n(Variant/JSON)", fillcolor="#20c997", fontcolor="white"]; merge [label="Merge Task\n(Dedup & Apply)", fillcolor="#12b886", fontcolor="white"]; final [label="Target Table\n(Final State)", fillcolor="#0ca678", fontcolor="white"]; } pg -> wal [label="Committed Txs"]; wal -> connector [label="Logical Decoding"]; connector -> topic [label="JSON Envelopes"]; topic -> stage [label="Batch Load"]; stage -> merge [label="Scheduled Task"]; merge -> final [label="Upsert/Delete"]; }Data flow from binary logs to analytical tables using an intermediate event bus.Configuring Logical ReplicationTo enable log extraction in PostgreSQL, we must configure the wal_level to logical. This setting instructs the database to log enough information to reconstruct the row changes, rather than just the physical disk block changes used for crash recovery.You must also create a replication slot. This slot acts as a cursor in the transaction log. It ensures the database engine does not purge WAL segments that have not yet been consumed by the connector.-- Verify current WAL level SHOW wal_level; -- Create a logical replication slot using the pgoutput plugin SELECT * FROM pg_create_logical_replication_slot('warehouse_cdc_slot', 'pgoutput');If the consumer (the CDC connector) goes offline, the replication slot will cause the WAL files to accumulate on the source database disk. This is a critical failure mode to monitor. If the disk fills up, the source database will stop accepting writes to prevent corruption.The Anatomy of a Change EventWhen the connector processes a log entry, it emits a message wrapping the data change. Understanding this structure is necessary for writing the merge logic later. A standard Debezium-style envelope contains before and after states, along with metadata describing the operation type (op) and the transaction timestamp (ts_ms).Consider an update operation on a users table. The resulting JSON payload structure typically looks like this:{ "payload": { "before": { "id": 101, "email": "user@old-domain.com", "status": "active" }, "after": { "id": 101, "email": "user@new-domain.com", "status": "active" }, "source": { "version": "1.9.5.Final", "connector": "postgresql", "name": "dbserver1", "ts_ms": 1678892345000, "lsn": 23405934 }, "op": "u", "ts_ms": 1678892345123 } }In this structure:op: Indicates the operation type (c for create, u for update, d for delete).ts_ms: The epoch timestamp when the connector processed the event.source.ts_ms: The epoch timestamp when the transaction occurred in the source database.source.lsn: The Log Sequence Number, strictly increasing, which provides the ordering guarantee.Staging and Idempotent MergingThe warehouse ingestion layer receives these JSON events and appends them to a raw staging table. This table is an append-only log of events, not a representation of the current state. It will contain multiple rows for the same primary key if that record changed multiple times.To materialize the final table, we apply a merge strategy. This is where idempotency becomes required. If the pipeline crashes and replays the last 1000 messages, our merge logic must handle the duplicates gracefully without creating data anomalies.We use the source.ts_ms (transaction timestamp) or source.lsn to resolve conflicts. The logic is: only update the target table if the incoming event is newer than what is currently stored.The Merge Query PatternThe following SQL pattern demonstrates how to merge a batch of CDC events from a staging table (stg_users) into a final table (dim_users). This pattern handles Inserts, Updates, and Deletes (via soft deletes) in a single atomic transaction.MERGE INTO dim_users AS target USING ( -- Window function to select only the latest event per Primary Key -- in the current micro-batch SELECT * FROM ( SELECT payload:after:id::INT as user_id, payload:after:email::STRING as email, payload:after:status::STRING as status, payload:op::STRING as op_code, payload:source:ts_ms::INT as event_ts, ROW_NUMBER() OVER ( PARTITION BY payload:after:id ORDER BY payload:source:ts_ms DESC ) as rn FROM raw_cdc_staging WHERE ingested_at > :last_watermark ) WHERE rn = 1 ) AS source ON target.user_id = source.user_id WHEN MATCHED AND source.op_code = 'd' THEN -- Soft delete logic UPDATE SET target.is_deleted = TRUE, target.updated_at = source.event_ts WHEN MATCHED AND source.event_ts > target.updated_at THEN -- Idempotent update: only update if source is newer UPDATE SET target.email = source.email, target.status = source.status, target.updated_at = source.event_ts, target.is_deleted = FALSE WHEN NOT MATCHED AND source.op_code != 'd' THEN INSERT (user_id, email, status, updated_at, is_deleted) VALUES (source.user_id, source.email, source.status, source.event_ts, FALSE);This query performs several critical functions:Deduplication within the batch: The ROW_NUMBER() window function ensures that if a user updated their profile five times in one hour, we only attempt to merge the final state of that hour.Ordering enforcement: The ORDER BY payload:source:ts_ms DESC guarantees we respect the chronological order of the source database.Delete handling: We check the op code. If it is d (delete), we perform a logical delete in the warehouse rather than physically removing the row, preserving audit history.Latency vs. Batch EfficiencyWhen building this pipeline, you must tune the buffer size and flush frequency. There is a non-linear relationship between the batch size (how many CDC events you process in one MERGE statement) and the end-to-end latency.Small batches provide lower latency but incur high overhead due to the setup and teardown costs of distributed query engines. Large batches are efficient for throughput but introduce lag.{"layout": {"width": 600, "height": 400, "title": {"text": "Ingestion Latency vs Batch Size"}, "xaxis": {"title": "Batch Size (Rows)", "type": "log"}, "yaxis": {"title": "Latency (Seconds)"}, "template": "simple_white", "colorscale": [{"color": "#339af0"}]}, "data": [{"x": [100, 500, 1000, 5000, 10000, 50000, 100000], "y": [2.5, 1.8, 1.5, 2.2, 4.5, 12.0, 22.0], "type": "scatter", "mode": "lines+markers", "line": {"color": "#339af0", "width": 3}, "marker": {"size": 8, "color": "#1c7ed6"}}]}Relationship between batch size and processing latency. Note the "sweet spot" where overhead is minimized before processing volume increases duration.In the chart above, notice the inflection point. At very small batch sizes (left), latency is dominated by connection overhead and query compilation time. As batch size increases, efficiency improves until the sheer volume of data processing starts to increase duration linearly. For most high-throughput MPP systems, a batch interval between 1 to 5 minutes (or 50k–100k rows) often yields the optimal balance for near real-time reporting.Handling Schema DriftCDC pipelines are brittle regarding schema changes. If a column is added to the source PostgreSQL table, the binary log will immediately start containing data for this new column. If your staging table or MERGE logic does not anticipate this, the pipeline may fail.To mitigate this, we utilize a "Schema-on-Read" approach for the staging layer. By storing the CDC payload in a semi-structured column (like VARIANT in Snowflake or JSON in BigQuery), the ingestion job does not fail when the schema evolves. The failure is pushed downstream to the transformation layer (dbt or SQL views), which is easier to fix and replay than a broken ingestion connector.Using the payload:after:new_column syntax in the view definition allows the pipeline to continue flowing even if the target schema definitions lag behind the source system changes. This decoupling is essential for maintaining high availability in data platform operations.