This section provides a hands-on exercise to implement basic lineage tracking for a small set of features. Building upon the concepts discussed earlier in the chapter regarding governance and reproducibility, we will manually instrument a simplified feature engineering process to capture metadata that traces the origins and transformations of our features. The goal is to understand the fundamental mechanics of lineage tracking, which can then be scaled and automated using more sophisticated tools and frameworks in a production environment.
We'll simulate a scenario where we compute a user's 7-day click-through rate (CTR) based on raw impression and click logs, and user profile data.
Assume we have the following conceptual data sources:
impression_logs
: Records containing user_id
, timestamp
, ad_id
.click_logs
: Records containing user_id
, timestamp
, ad_id
.user_profiles
: Static data with user_id
, country
, registration_date
.Our goal is to create the feature user_ctr_7d
.
pip install graphviz
) if you want to render the visualization locally.First, let's represent our raw data sources and associate some initial lineage metadata. In a real system, this might point to specific database tables, file paths, or streaming topics.
# Lineage metadata store (a simple dictionary for this example)
lineage_registry = {}
# Representing raw data sources
impression_log_source = {
"id": "raw_impressions_kafka_topic",
"type": "kafka_topic",
"schema": ["user_id", "timestamp", "ad_id"],
"description": "Raw ad impression events stream",
"owner": "ad_platform_team"
}
lineage_registry[impression_log_source["id"]] = {
"type": "DataSource",
"details": impression_log_source
}
click_log_source = {
"id": "raw_clicks_kafka_topic",
"type": "kafka_topic",
"schema": ["user_id", "timestamp", "ad_id"],
"description": "Raw ad click events stream",
"owner": "ad_platform_team"
}
lineage_registry[click_log_source["id"]] = {
"type": "DataSource",
"details": click_log_source
}
user_profile_source = {
"id": "user_profiles_postgres_table",
"type": "database_table",
"schema": ["user_id", "country", "registration_date"],
"description": "User demographic information",
"owner": "user_service_team"
}
lineage_registry[user_profile_source["id"]] = {
"type": "DataSource",
"details": user_profile_source
}
print("Initial Lineage Registry:")
import json
print(json.dumps(lineage_registry, indent=2))
This establishes the roots of our lineage graph. We store basic metadata about each source.
Now, let's simulate the feature engineering steps and record lineage information as we go.
Transformation 1: Calculate 7-day Impressions and Clicks per User
Imagine a function or job that processes the logs. We need to capture which sources it reads from and what features it produces.
# Simulate processing function (pseudo-code)
def calculate_user_counts_7d(impressions_data, clicks_data):
# ... complex logic using Spark, Flink, or Pandas ...
# Output: user_id, impressions_7d, clicks_7d
user_counts = {
"user1": {"impressions_7d": 100, "clicks_7d": 5},
"user2": {"impressions_7d": 200, "clicks_7d": 8},
# ... other users
}
return user_counts
# Define the output features and their lineage
feature_impressions_7d = {
"id": "user_impressions_7d",
"description": "Total impressions per user over the last 7 days",
"value_type": "integer",
"entity": "user" # Associated entity
}
feature_clicks_7d = {
"id": "user_clicks_7d",
"description": "Total clicks per user over the last 7 days",
"value_type": "integer",
"entity": "user"
}
# Record lineage for these features
transformation_id_counts = "calculate_user_counts_7d_job_v1.2"
lineage_registry[feature_impressions_7d["id"]] = {
"type": "Feature",
"details": feature_impressions_7d,
"generated_by": {
"transformation_id": transformation_id_counts,
"inputs": [impression_log_source["id"]] # Derived primarily from impressions
}
}
lineage_registry[feature_clicks_7d["id"]] = {
"type": "Feature",
"details": feature_clicks_7d,
"generated_by": {
"transformation_id": transformation_id_counts,
"inputs": [click_log_source["id"]] # Derived primarily from clicks
}
}
# Add details about the transformation itself
lineage_registry[transformation_id_counts] = {
"type": "Transformation",
"details": {
"description": "Aggregates user impressions and clicks over a 7-day window.",
"code_reference": "git://repo/jobs/user_aggregation.py#tag=v1.2",
"compute_framework": "Spark"
}
}
print("\nLineage Registry after Transformation 1:")
print(json.dumps(lineage_registry, indent=2))
We've added the two new features (user_impressions_7d
, user_clicks_7d
) and linked them back to the specific transformation (calculate_user_counts_7d_job_v1.2
) and their respective primary input sources. We also added metadata about the transformation itself.
Transformation 2: Calculate 7-day CTR
This step takes the outputs from the previous transformation and computes the final CTR feature.
# Simulate CTR calculation function (pseudo-code)
def calculate_ctr(user_counts_data):
# ... logic to calculate clicks / impressions, handle division by zero ...
user_ctr = {
"user1": {"user_ctr_7d": 0.05}, # 5 / 100
"user2": {"user_ctr_7d": 0.04}, # 8 / 200
# ... other users
}
return user_ctr
# Define the final CTR feature
feature_ctr_7d = {
"id": "user_ctr_7d",
"description": "User click-through rate over the last 7 days",
"value_type": "float",
"entity": "user"
}
# Record lineage for the CTR feature
transformation_id_ctr = "calculate_user_ctr_job_v1.0"
lineage_registry[feature_ctr_7d["id"]] = {
"type": "Feature",
"details": feature_ctr_7d,
"generated_by": {
"transformation_id": transformation_id_ctr,
"inputs": [feature_impressions_7d["id"], feature_clicks_7d["id"]] # Input features
}
}
# Add details about the CTR transformation
lineage_registry[transformation_id_ctr] = {
"type": "Transformation",
"details": {
"description": "Calculates CTR based on 7-day impression and click counts.",
"code_reference": "git://repo/jobs/ctr_calculation.py#tag=v1.0",
"compute_framework": "Python/Pandas"
}
}
print("\nFinal Lineage Registry:")
print(json.dumps(lineage_registry, indent=2))
Now, the lineage for user_ctr_7d
correctly points to its transformation, which in turn depends on the intermediate features user_impressions_7d
and user_clicks_7d
.
We can use the collected lineage information to visualize the dependencies. Let's generate a Graphviz DOT representation.
def generate_lineage_graph(registry):
dot_lines = ['digraph G {', 'rankdir=LR;']
node_shapes = {
"DataSource": "folder",
"Feature": "ellipse",
"Transformation": "box"
}
node_colors = {
"DataSource": "#a5d8ff", # blue
"Feature": "#b2f2bb", # green
"Transformation": "#ffd8a8" # orange
}
nodes = set()
for item_id, item_data in registry.items():
node_type = item_data.get("type")
if not node_type: continue
shape = node_shapes.get(node_type, "plaintext")
color = node_colors.get(node_type, "#e9ecef") # default gray
label = item_id.replace("_", "\\n") # Basic formatting
nodes.add(f'"{item_id}" [label="{label}", shape={shape}, style=filled, fillcolor="{color}"];')
if node_type == "Feature":
generator = item_data.get("generated_by")
if generator:
transformation_id = generator.get("transformation_id")
if transformation_id in registry:
dot_lines.append(f'"{transformation_id}" -> "{item_id}";')
input_ids = generator.get("inputs", [])
for input_id in input_ids:
if input_id in registry:
dot_lines.append(f'"{input_id}" -> "{transformation_id}";')
dot_lines.extend(list(nodes))
dot_lines.append('}')
return "\n".join(dot_lines)
dot_graph = generate_lineage_graph(lineage_registry)
print("\nGraphviz DOT format:")
print(dot_graph)
# Save to a file to render with Graphviz (optional)
# with open("feature_lineage.dot", "w") as f:
# f.write(dot_graph)
# You can render this using: dot -Tpng feature_lineage.dot -o feature_lineage.png
Here is the generated Graphviz definition:
The generated diagram shows the flow from raw data sources (folders) through transformation steps (boxes) to the final features (ellipses). Note that
user_profiles_postgres_table
is currently disconnected as it wasn't used in these specific transformations.
This exercise demonstrates the core idea of lineage tracking: associating metadata with data sources, features, and the transformations that connect them.
log_lineage(inputs=[...], outputs=[...])
) to capture this information automatically during pipeline execution.lineage_registry
dictionary is a minimal representation. Real-world systems use dedicated metadata databases or integrate with platforms like OpenLineage, Amundsen, or DataHub, which provide APIs for storing, querying, and visualizing lineage alongside other metadata (schemas, ownership, descriptions).By completing this practice, you've implemented the fundamental building blocks of feature lineage. The next step in a real system would involve integrating this capture mechanism into your feature engineering code and leveraging a dedicated metadata platform to manage and expose the lineage information effectively.
© 2025 ApX Machine Learning