Production environments often contain legacy systems or third-party tools that do not support modern metadata standards like OpenLineage out of the box. Application logs become the primary source of truth for understanding data movement in these situations. Log parsing allows you to retroactively construct the dependency graph by identifying patterns that indicate input and output operations.
This exercise focuses on writing a parser that ingests unstructured application logs, extracts dataset identifiers using regular expressions, and structures them into a directed edge list. This process bridges the gap between static code analysis and runtime observability.
Data pipelines, regardless of the language they are written in, generally emit signals when they interact with storage systems. To reconstruct lineage, we look for two specific types of events:
Consider the following snippet from a standard ETL job log file. The format is unstructured text, but it contains precise references to the data assets involved in the execution.
2023-11-15 08:00:01 [INFO] Job-User-Daily: Starting execution context
2023-11-15 08:00:15 [INFO] Job-User-Daily: Extracted 15000 records from s3://data-lake/raw/users_2023.csv
2023-11-15 08:00:18 [WARN] Job-User-Daily: 15 records dropped due to schema mismatch
2023-11-15 08:02:45 [INFO] Job-User-Daily: Loading data into warehouse.public.dim_users
2023-11-15 08:03:00 [INFO] Job-User-Daily: Materialization complete. Rows effected: 14985
A human reading this log can immediately visualize the flow: s3://data-lake/raw/users_2023.csv Job-User-Daily warehouse.public.dim_users. To automate this, we map these log lines to a graph structure . The vertices are the datasets and the job itself. The edges represent the flow of data.
The extraction process relies on identifying stable anchors in the log text. In the example above, phrases like "Extracted ... from" and "Loading data into" serve as these anchors.
We can define patterns to capture the dataset names. Using Python's re module, we define named capture groups to isolate the specific paths.
Extracted \d+ records from (?P<source>\S+)Loading data into (?P<target>\S+)The following Python implementation demonstrates how to parse a stream of logs to build an adjacency list representing the lineage.
import re
from typing import List, Dict, Set, Tuple
def extract_lineage_from_logs(log_lines: List[str]) -> List[Tuple[str, str, str]]:
"""
Parses log lines to extract (source, job_id, destination) relationships.
Returns a list of edges.
"""
# Define regex patterns with named groups
# We assume the job ID is present in the log prefix
job_pattern = re.compile(r"\[INFO\] (.*?):")
source_pattern = re.compile(r"Extracted .* from (?P<source>\S+)")
target_pattern = re.compile(r"Loading data into (?P<target>\S+)")
lineage_edges = []
# State tracking for the current context
current_job = None
inputs = set()
outputs = set()
for line in log_lines:
# Identify the job context
job_match = job_pattern.search(line)
if job_match:
current_job = job_match.group(1)
# Check for inputs
src_match = source_pattern.search(line)
if src_match:
inputs.add(src_match.group("source"))
# Check for outputs
tgt_match = target_pattern.search(line)
if tgt_match:
outputs.add(tgt_match.group("target"))
# Construct the edges: Input -> Job -> Output
if current_job:
for src in inputs:
lineage_edges.append((src, current_job, "read"))
for tgt in outputs:
lineage_edges.append((current_job, tgt, "write"))
return lineage_edges
# Simulation
logs = [
"2023-11-15 08:00:15 [INFO] Job-User-Daily: Extracted 15000 records from s3://data-lake/raw/users_2023.csv",
"2023-11-15 08:02:45 [INFO] Job-User-Daily: Loading data into warehouse.public.dim_users"
]
edges = extract_lineage_from_logs(logs)
for u, v, action in edges:
print(f"{u} --({action})--> {v}")
A common challenge in log-based lineage is the presence of dynamic parameters. In the example log, the source file is users_2023.csv. If this job runs daily, the filename changes (e.g., users_2024.csv). Naively plotting this would create thousands of disconnected nodes in your lineage graph, cluttering the visualization and making impact analysis difficult.
To solve this, we apply a normalization step before adding the node to the graph. We replace variable components like dates or unique IDs with a static placeholder.
Applying this function transforms s3://data-lake/raw/users_2023.csv into a canonical form s3://data-lake/raw/users_{YYYY}.csv. This ensures that all executions of the Job-User-Daily map to the same logical dataset node, preserving the structural integrity of the lineage graph.
Once the edges are extracted and normalized, we can visualize the relationship. The result is a Directed Acyclic Graph (DAG) that clearly shows the dependencies. While log parsing is reactive (it happens after the code runs), it provides an accurate representation of what actually happened in production, distinct from what the code was intended to do.
The graph illustrates the extracted lineage flow. The normalization step aggregates distinct file versions into a single logical node on the left, connected to the warehouse table on the right via the transformation job.
Log-based extraction is useful but brittle. Changes to log formatting by developers can silently break the regex parsers. To mitigate this, consider the following reliability practices:
{"event": "read", "dataset": "..."} is significantly more effective than parsing free text.By implementing these extraction techniques, you gain visibility into legacy components of your stack, ensuring that your data governance platform covers the entire ecosystem, not just the modern portions.
Was this section helpful?
© 2026 ApX Machine LearningEngineered with