A foundational data pipeline will be constructed that ingests raw data, applies a set of defined transformations, and saves the resulting features to a structured, file-based feature store. This process creates a single, reliable source of truth for features, forming the first line of defense against training-serving skew. The objective is to centralize the feature creation logic, ensuring that the transformation function, let's call it f, produces identical outputs for both training data (xtrain) and serving data (xserve).
Our pipeline will be a Python script that can be executed manually, scheduled as a cron job, or integrated into a larger orchestration system like KubeFlow or Airflow. While we will use the pandas library for simplicity, the core logic is designed to be portable to distributed frameworks like Spark or Ray.
Imagine we are building a model to predict the duration of a taxi ride. Our raw data source is a collection of Parquet files, each containing records of completed trips. Our task is to ingest this data and compute a set of features that our model will use for training.
Raw Data Schema:
trip_id: A unique identifier for the trip.pickup_datetime_utc: The timestamp when the trip started.dropoff_datetime_utc: The timestamp when the trip ended.passenger_count: The number of passengers.start_location_id: An identifier for the pickup zone.Target Features:
trip_duration_minutes: The total duration of the trip in minutes.hour_of_day: The hour the trip started (0-23).day_of_week: The day of the week the trip started (0=Monday, 6=Sunday).is_weekend: A boolean flag indicating if the trip started on a weekend.The most significant part of this exercise is isolating the feature engineering logic into its own reusable module. This separation is what enables consistency across different environments. Let's create a file named feature_transforms.py.
# feature_transforms.py
import pandas as pd
def compute_trip_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Computes trip-related features from a raw data DataFrame.
This function contains the authoritative logic for our features.
It can be imported by our batch pipeline, a streaming job,
or the online inference service.
"""
# Ensure datetime columns are in the correct format
df['pickup_datetime_utc'] = pd.to_datetime(df['pickup_datetime_utc'])
df['dropoff_datetime_utc'] = pd.to_datetime(df['dropoff_datetime_utc'])
# Feature 1: Trip Duration
duration = df['dropoff_datetime_utc'] - df['pickup_datetime_utc']
df['trip_duration_minutes'] = duration.dt.total_seconds() / 60.0
# Feature 2: Hour of Day
df['hour_of_day'] = df['pickup_datetime_utc'].dt.hour
# Feature 3: Day of Week
df['day_of_week'] = df['pickup_datetime_utc'].dt.dayofweek # Monday=0, Sunday=6
# Feature 4: Is Weekend
df['is_weekend'] = (df['pickup_datetime_utc'].dt.dayofweek >= 5)
# Select and return only the feature columns along with keys
feature_df = df[[
'trip_id',
'pickup_datetime_utc',
'trip_duration_minutes',
'hour_of_day',
'day_of_week',
'is_weekend'
]].copy()
# The pickup_datetime_utc is kept as the event timestamp for point-in-time correctness
return feature_df
This function is self-contained and testable. It takes a DataFrame with the raw schema and returns a new DataFrame containing only the primary key (trip_id), an event timestamp, and the computed features.
Next, we create the script that orchestrates the ingestion process. This script will handle reading raw data, applying our transformation logic from feature_transforms.py, and writing the output to a structured location. We will structure the output directory by date, a common pattern for managing time-series data in data lakes.
Create a file named ingest_features.py.
# ingest_features.py
import argparse
import pandas as pd
from pathlib import Path
from feature_transforms import compute_trip_features
def run_ingestion(raw_data_path: str, feature_store_path: str):
"""
Runs the feature ingestion pipeline for a single raw data file.
"""
print(f"Reading raw data from: {raw_data_path}")
raw_df = pd.read_parquet(raw_data_path)
print("Computing features...")
features_df = compute_trip_features(raw_df)
# Use the date from the pickup timestamp to partition the data
# In a real system, you'd handle data spanning multiple days
event_date = features_df['pickup_datetime_utc'].iloc[0].strftime('%Y-%m-%d')
output_dir = Path(feature_store_path) / f"event_date={event_date}"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / "features.parquet"
print(f"Writing features to: {output_path}")
features_df.to_parquet(output_path, index=False)
print("Ingestion complete.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Feature Ingestion Pipeline")
parser.add_argument(
"--raw-data-path",
type=str,
required=True,
help="Path to the raw input data Parquet file."
)
parser.add_argument(
"--feature-store-path",
type=str,
required=True,
help="Root path to the output feature store."
)
args = parser.parse_args()
run_ingestion(args.raw_data_path, args.feature_store_path)
The process we have just defined can be visualized as a simple, directed flow from the raw data source to the structured feature store.
The ingestion pipeline reads raw data, applies a shared transformation logic module, and writes partitioned feature data into the feature store.
To execute this pipeline, you first need to create some sample raw data.
# Create a sample raw data file for testing
import pandas as pd
import numpy as np
data = {
'trip_id': range(100, 105),
'pickup_datetime_utc': pd.to_datetime(['2023-10-26 08:05:00', '2023-10-26 08:10:00', '2023-10-26 18:30:00', '2023-10-28 11:00:00', '2023-10-28 23:15:00']),
'dropoff_datetime_utc': pd.to_datetime(['2023-10-26 08:25:00', '2023-10-26 08:22:00', '2023-10-26 18:45:00', '2023-10-28 11:18:00', '2023-10-28 23:55:00']),
'passenger_count': [1, 2, 1, 3, 1],
'start_location_id': [10, 12, 5, 22, 50]
}
sample_df = pd.DataFrame(data)
sample_df.to_parquet('raw_trips.parquet')
Now, you can run the ingestion script from your terminal. We will designate a directory named feature_store as the output location.
# Create the output directory
mkdir -p feature_store
# Run the ingestion script
python ingest_features.py \
--raw-data-path raw_trips.parquet \
--feature-store-path feature_store
After running the script, your directory structure will look like this:
.
├── feature_store/
│ └── event_date=2023-10-26/
│ └── features.parquet
├── raw_trips.parquet
├── ingest_features.py
└── feature_transforms.py
Note: Our simple script partitions all records by the date of the first record. A production system would need to handle data that spans multiple days, grouping by date and writing to the corresponding partitions.
Let's inspect the output to verify the results:
import pandas as pd
# Load the feature data we just created
output_df = pd.read_parquet('feature_store/event_date=2023-10-26/features.parquet')
print(output_df)
The output will be a clean, feature-engineered table:
trip_id pickup_datetime_utc trip_duration_minutes hour_of_day day_of_week is_weekend
0 100 2023-10-26 08:05:00 20.0 8 3 False
1 101 2023-10-26 08:10:00 12.0 8 3 False
2 102 2023-10-26 18:30:00 15.0 18 3 False
3 103 2023-10-28 11:00:00 18.0 11 5 True
4 104 2023-10-28 23:15:00 40.0 23 5 True
You have successfully built a foundational feature ingestion pipeline. This simple structure is the building block for a production-grade data system for machine learning.
feature_transforms.py module. A training pipeline would read from the feature_store directory. An online inference service, when receiving a new trip request, would import and call the exact same compute_trip_features function on the incoming data to generate features in real-time. This guarantees that f(xtrain) and f(xserve) are identical.feature_transforms.py, ingest_features.py, and the resulting feature_store data. This gives you full reproducibility.raw_trips.parquet grew to terabytes, the pandas-based script would fail. However, the logic inside compute_trip_features can be almost directly translated into a user-defined function (UDF) for Spark or a remote task for Ray. The architectural pattern remains the same; only the execution engine changes.This exercise demonstrates that a well-designed data system prioritizes the centralization and reusability of its transformation logic. By doing so, you build a reliable foundation for the entire machine learning lifecycle.
Was this section helpful?
© 2026 ApX Machine LearningEngineered with