趋近智
原始血缘元数据记录了系统事件,但不足以进行全面的健康分析。尽管一个包含描述输入和输出的JSON事件的存储库能追踪发生的事情,但它不能立即回答有关系统健康的重要问题。例如,要确定“如果此表延迟,哪些仪表板会出问题”,这些事件必须被组织成一个结构化模型。
我们将数据平台建模为有向图。在这种数学结构中,您的数据堆栈组件,如表、流、仪表板和机器学习模型,成为节点,而在它们之间传输数据的管道和转换作业成为边。这种图结构使我们能够应用标准遍历算法,以编程方式确定根本原因和下游影响。
我们把数据生态系统定义为图 。
顶点集 代表数据资产和计算单元。为了保持图的清晰性,我们区分两种类型的节点:
边集 代表信息流。如果数据集是输入,则存在从数据集到作业的边;如果数据集是输出,则存在从作业到数据集的边。
形式上,如果作业 从数据集 读取并写入数据集 ,我们的图包含两条有向边:
这种二分结构,即边严格在数据集和作业之间交替,对于准确的血缘记录是必需的。将表 A 直接连接到表 B () 会使连接它们的转换逻辑模糊不清。通过包含作业节点 (),我们保留了关于数据如何被转换的元数据,包括所使用的代码的具体版本和运行时参数。
一个二分依赖图,显示了从原始数据通过计算作业到下游仪表板的数据流。
构建这个图需要处理上一节中提到的血缘事件流。无论是使用OpenLineage还是自定义格式,解析逻辑都保持一致。我们读取事件流,提取输入和输出,并更新图结构(通常表示为邻接列表)。
在Python实现中,我们可以使用字典来存储邻接列表。键代表上游节点,值是下游节点的列表。
考虑一个简化的OpenLineage事件:
event = {
"job": {"name": "spark_process_sales"},
"inputs": [{"name": "s3://landing/sales.csv"}],
"outputs": [{"name": "snowflake://db/public/fact_sales"}]
}
为了从这个事件构建图,我们解析关系并将它们更新到我们的结构中。我们遍历输入,将它们链接到作业,然后将作业链接到输出。
class LineageGraph:
def __init__(self):
# 邻接列表:节点 -> 下游节点集合
self.graph = {}
def add_edge(self, source, target):
if source not in self.graph:
self.graph[source] = set()
self.graph[source].add(target)
# 确保目标节点存在于图中,即使它没有子节点
if target not in self.graph:
self.graph[target] = set()
def process_event(self, event):
job_name = event['job']['name']
# 链接输入 -> 作业
for input_node in event.get('inputs', []):
self.add_edge(input_node['name'], job_name)
# 链接作业 -> 输出
for output_node in event.get('outputs', []):
self.add_edge(job_name, output_node['name'])
# 使用示例
tracker = LineageGraph()
tracker.process_event(event)
这种方法基于最近的事件构建血缘的静态快照。在生产系统中,您通常会将这些数据持久化到图数据库(如Neo4j或Amazon Neptune)或具有递归查询能力的RDBMS中,以处理数千个节点的规模。
大多数数据管道被设计为有向无环图(DAG),这意味着数据向前流动,从不循环回前一个步骤。然而,在复杂的环境中可能会出现循环。例如,一个“客户360”表可能由原始日志构建,但原始日志的摄取过程可能会查询前一天的“客户360”表以进行用户去重。
在构建图时,我们必须决定如何处理这些循环。对于可靠性工程,我们通常关注的是执行DAG,而不是抽象定义。如果作业 A 在上午 10:00 运行,作业 B 在上午 10:05 使用作业 A 的输出运行,则依赖关系很明确。如果作业 A 在上午 11:00 再次使用作业 B 的输出运行,则这是一个新的实例。
为了准确地建模,我们通常在图中的作业节点上附加一个运行ID或时间戳:
这创建了一个时间序列图,我们可以在其中追踪特定数据行的血缘,精确到转换代码执行的毫秒。
图构建完成后,我们用它来解决可靠性问题。两个主要操作是上游遍历(根本原因分析)和下游遍历(影响分析)。
当在数据集 中检测到异常(例如,数据质量测试失败)时,我们需要找到源头。我们通过反转边的方向进行搜索(广度优先或深度优先)。
此遍历标识了所有对 当前状态有贡献的表和作业。通过将此列表与最近的警报日志进行交叉比对,我们可以查明上游原始摄取作业中的故障是否导致了最终报告中的指标偏差。
在部署模式更改或弃用表之前,我们执行下游遍历。这标识了更改的“影响范围”。
有效实现这一点需要递归图遍历。
def get_impacted_nodes(graph, start_node):
visited = set()
stack = [start_node]
while stack:
node = stack.pop()
if node not in visited:
visited.add(node)
# 将所有子节点添加到栈中
children = graph.get(node, [])
stack.extend(children)
return visited
此函数返回依赖于 start_node 的所有资产。在治理环境中,这种程序化检查可以在CI/CD管道中运行。如果开发人员试图删除 Descendants 集中某个节点引用的列,管道可以自动拒绝更改请求。
上述示例描述的是表级血缘。尽管有用,但它缺乏精确性。如果您有一个包含200列的表,并且您更改其中一列,表级血缘会标记所有下游仪表板为“受影响”,即使这些仪表板只使用了其他199列。
列级血缘将每一列视为图中的一个节点。边表示SQL或DataFrame操作中的转换逻辑。
构建列级血缘要复杂得多,因为它需要解析SQL查询的抽象语法树(AST),以理解哪些输入列贡献于哪些输出列。SQLGlot或OpenLineage的列级方面等工具用于自动化这种提取。尽管存在这种复杂性,图构建逻辑仍然相同:我们只需增加节点 和边 的数量,以表示更细的粒度。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造