趋近智
处理事务日志并将其应用到数据湖表,这需要从标准追加写入逻辑进行转变。一个常见的基于日志的架构,其目的是通过在目标表上重放一系列变化(包括插入、更新和删除)来重建实体(如客户或订单)的当前状态。以下是使用Apache Spark和Delta Lake实现变更数据捕获(CDC)管道的演示。
我们侧重于“合并”模式。与向数据湖进行标准插入不同,合并操作必须通过主键查找现有记录,并决定是更新行、插入新行还是完全删除该行。这种方法使得数据湖可以作为业务数据库的一个同步副本。
CDC管道中的数据流通常遵循特定路径:捕获、传输和应用。捕获阶段读取数据库日志(例如Postgres预写日志或MySQL二进制日志)。传输层(通常是Kafka或Kinesis)缓冲这些事件。应用阶段,即我们在此构建的部分,读取事件并更新存储层。
该架构流程显示了数据从源事务日志到数据湖中最终目标表的移动过程。
原始CDC事件通常包含操作元数据以及数据载荷。常见结构包括:
op):表明变化是创建(c)、更新(u)还是删除(d)。ts_ms):变化在源端发生的精确时间。在本次实践中,我们模拟传入的客户数据变化流。我们假设数据已摄入到原始DataFrame中,现在需要处理到精炼表中。
在处理变化之前,目标表必须存在。在生产环境中,您可能需要执行初始快照加载(引导)来填充该表。在这里,我们初始化一个空的Delta表来代表我们的customers数据集。
from delta.tables import *
from pyspark.sql.functions import *
# 定义客户表的模式
# 在实际场景中,此位置将在S3、ADLS或GCS上
table_path = "/tmp/delta/customers"
# 如果Delta表不存在,则创建一个空表
if not DeltaTable.isDeltaTable(spark, table_path):
spark.createDataFrame([], schema="id INT, name STRING, email STRING, updated_at TIMESTAMP") \
.write \
.format("delta") \
.mode("overwrite") \
.save(table_path)
target_table = DeltaTable.forPath(spark, table_path)
分布式CDC管道中的一个核心难题是在单个处理批次内处理同一记录的多次变化。如果客户在一分钟内更新了三次电子邮件地址,摄入批次可能包含所有三个事件。
盲目应用这些事件可能导致竞态条件或不正确的最终状态。在合并之前,您必须对传入的微批次进行去重,为每个主键只保留最新变化。
# 模拟的CDC事件批次
# 操作:
# 1. 插入客户101
# 2. 插入客户102
# 3. 更新客户101(修正姓名)
# 4. 删除客户103(假设103之前存在)
cdc_data = [
(101, "John Doe", "[email protected]", "2023-10-27 10:00:00", "c"),
(102, "Jane Smith", "[email protected]", "2023-10-27 10:05:00", "c"),
(101, "Johnathan Doe", "[email protected]", "2023-10-27 10:10:00", "u"),
(103, None, None, "2023-10-27 10:15:00", "d")
]
columns = ["id", "name", "email", "updated_at", "op"]
updates_df = spark.createDataFrame(cdc_data, columns)
# 去重逻辑:
# 窗口函数,按时间戳降序为每个ID的变化排序
from pyspark.sql.window import Window
window_spec = Window.partitionBy("id").orderBy(col("updated_at").desc())
deduplicated_updates = updates_df \
.withColumn("rank", row_number().over(window_spec)) \
.filter(col("rank") == 1) \
.drop("rank")
在此逻辑中,我们定义了基于主键id的窗口,并按updated_at排序。通过筛选rank == 1,我们确保只有客户101的最终状态(即更新操作)被传递给合并操作,从而丢弃同一批次中的初始插入事件。
有了一组干净的更新,我们将变化应用到目标Delta表中。Delta Lake中的MERGE语句使我们能够在单个原子事务中处理插入、更新和删除。这保证了即使作业中途失败,数据湖也能保持一致。
该逻辑遵循以下规则:
d(删除),则删除该行。u(更新),则更新行值。d,则插入新行。target_table.alias("target") \
.merge(
deduplicated_updates.alias("source"),
"target.id = source.id"
) \
.whenMatchedDelete(
condition = "source.op = 'd'"
) \
.whenMatchedUpdate(
set = {
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at"
}
) \
.whenNotMatchedInsert(
condition = "source.op != 'd'",
values = {
"id": "source.id",
"name": "source.name",
"email": "source.email",
"updated_at": "source.updated_at"
}
) \
.execute()
当Spark执行合并时,它必须定位包含匹配ID的文件。如果目标表很大,这可能触发开销大的全表扫描或跨集群的大量数据混洗。
为了优化此过程,数据工程师通常会通过高级属性(如date或region)对目标表进行分区,或使用Z-Order索引。然而,CDC管道主要操作主键(ID),这通常与分区列不一致。
下方的图表显示了标准合并(搜索所有文件)与分区裁剪合并(仅搜索相关文件)之间的成本差异。虽然在随机访问的UUID更新中我们无法总是通过分区裁剪,但在Delta Lake或Iceberg中启用删除向量 (vector)或布隆过滤器等功能有助于显著降低I/O开销。
数据量增长时,标准合并与使用文件跳过技术(Z-Order)的优化合并之间的执行时间对比。
CDC管道对上游变化敏感。如果业务数据库添加了新列,CDC流将包含它,但如果目标模式是固定的,您的合并语句可能会失败。
为了稳妥处理此情况,您可以启用自动模式演变。在Delta Lake中,这通过在Spark会话配置中将spark.databricks.delta.schema.autoMerge.enabled配置设置为true来实现。
启用此功能后,如果源DataFrame包含目标表中不存在的新列,合并操作将在应用数据之前更改目标表模式以包含这些列。
因为合并操作会重写整个文件,即使该文件中只有一行发生变化,所以频繁的小合并可能导致“小文件问题”。最佳实践是避免持续运行CDC合并(例如每秒运行)。相反地,将更新批量处理到5到15分钟的窗口中,可以在数据新鲜度和存储效率之间取得平衡。
管道运行后,您检查数据状态。我们模拟数据的预期是:客户101以名称“Johnathan Doe”存在(更新的结果),客户102存在,并且客户103(如果之前存在)被删除。
这种实现模式为数据摄入提供了可靠的基础,确保分析型数据湖准确反映业务实际情况。
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•