This section provides a hands-on exercise to solidify your understanding of implementing complex feature transformations within a feature store environment. We move beyond simple lookups or aggregations to tackle scenarios involving multiple data sources, custom logic, and time-sensitive computations, reflecting the challenges encountered in production ML systems. You will apply the concepts discussed earlier in this chapter, such as transformation pipelines, streaming feature processing, and managing different computation cadences.
We will simulate a common scenario: enhancing user features by combining static user profile information with real-time transaction activity. Our goal is to compute features that capture recent user behavior patterns, which are often highly predictive in domains like fraud detection or recommendation systems.
Imagine we have two primary data sources:
Our objective is to create derived features that combine information from both sources and require non-trivial computations, such as:
The process typically involves defining a feature view or transformation function that orchestrates the following steps:
Data flow for computing combined streaming and batch features.
While the specific syntax depends heavily on the chosen feature store framework (e.g., Feast, Tecton, Vertex AI Feature Store, SageMaker Feature Store), the underlying concepts remain similar. Let's illustrate using Python-based pseudo-code, assuming a hypothetical feature store client library.
First, we declare our sources. This usually involves pointing the feature store to the location and format of the data.
# Assume 'fs' is an initialized feature store client
# Batch Source: User profiles stored in S3
user_profiles_source = fs.define_batch_source(
name="user_profiles_batch",
path="s3://my-data-bucket/user_profiles/",
format="parquet",
timestamp_field="profile_update_ts",
created_timestamp_column="account_created_ts"
)
# Streaming Source: User transactions from Kafka
user_transactions_source = fs.define_stream_source(
name="user_transactions_stream",
kafka_bootstrap_servers="kafka.example.com:9092",
topic="user_transactions",
format="json",
timestamp_field="event_timestamp"
)
We define transformations on the streaming source to calculate aggregates over specific time windows.
# Define aggregations on the transaction stream
# Assume a schema like {'user_id': ..., 'event_timestamp': ..., 'amount': ...}
transaction_aggregations = fs.aggregate_stream(
source=user_transactions_source,
entity_key="user_id",
aggregations=[
fs.Aggregation(column="amount", function="mean", time_window="1h"),
fs.Aggregation(column="amount", function="mean", time_window="24h"),
fs.Aggregation(column="*", function="count", time_window="1h"),
fs.Aggregation(column="*", function="count", time_window="24h"),
],
# Define how late data should be handled (e.g., allow 15 mins delay)
event_timestamp_column="event_timestamp",
watermark_delay="15m"
)
# The output might be features like:
# amount_mean_1h, amount_mean_24h, transaction_count_1h, transaction_count_24h
Now, we combine the aggregated stream features with static profile features using a custom transformation function. This function receives data corresponding to an entity (user) from different sources and performs the final calculation.
# Define user profile features needed
user_profile_features = fs.select_features(
source=user_profiles_source,
features=["user_segment", "account_age_days"] # account_age_days might be precomputed daily
)
# Define the custom transformation logic
@fs.transformation(sources=[transaction_aggregations, user_profile_features])
def compute_advanced_user_features(agg_data, profile_data):
"""
Combines transaction aggregates and user profile data.
Args:
agg_data (DataFrame/Dict): Aggregated data from the stream source.
Contains 'user_id', 'amount_mean_1h', etc.
profile_data (DataFrame/Dict): Static data from the batch source.
Contains 'user_id', 'user_segment', 'account_age_days'.
Returns:
DataFrame/Dict: Final computed features for the user.
"""
# Example: Calculate ratio of recent spending vs account age
# Handle potential division by zero or missing data
if profile_data["account_age_days"] and profile_data["account_age_days"] > 0:
spending_per_day_1h = (agg_data.get("amount_mean_1h", 0) * 24) / profile_data["account_age_days"]
else:
spending_per_day_1h = 0.0 # Or None, based on downstream requirements
# Example: Calculate activity burst (count in last hour vs count in last 24 hours)
count_1h = agg_data.get("transaction_count_1h", 0)
count_24h = agg_data.get("transaction_count_24h", 0)
activity_burst_ratio = count_1h / count_24h if count_24h > 0 else 0.0
return {
"user_id": agg_data["user_id"], # Ensure entity key is returned
"avg_spending_1h": agg_data.get("amount_mean_1h"),
"avg_spending_24h": agg_data.get("amount_mean_24h"),
"transaction_count_1h": count_1h,
"transaction_count_24h": count_24h,
"spending_rate_1h": spending_per_day_1h,
"activity_burst_ratio": activity_burst_ratio,
"user_segment": profile_data.get("user_segment") # Pass through profile feature
}
# Register this transformation as a Feature View or equivalent concept
advanced_user_feature_view = fs.register_feature_view(
name="advanced_user_features",
entities=["user_id"],
transformation=compute_advanced_user_features,
# Define materialization targets (online/offline stores)
online_store=my_online_store,
offline_store=my_offline_store,
ttl="90d" # Time-to-live for offline storage
)
This example uses a Python function, but many feature stores allow SQL transformations or integration with distributed compute frameworks like Spark or Flink for executing these pipelines at scale.
Implementing complex transformations necessitates thorough testing:
compute_advanced_user_features
in our example) in isolation with mock input data representing various scenarios (e.g., missing profile data, zero transactions, edge cases in calculations).Now, it's your turn to implement a set of complex transformations. Assume you have access to a feature store environment (this could be a local setup using Docker with Feast, or a cloud provider's platform sandbox).
Provided Data (Simulated):
user_profiles.csv
: Contains user_id
, account_created_ts
, user_segment
.user_transactions.csv
: Contains user_id
, event_timestamp
, amount
, transaction_type
. You can treat this as either a batch file for simulating a stream or configure a tool to feed it row-by-row.Your Task:
user_profiles.csv
to calculate account_age_days
based on a reference date.user_id
:
transaction_count_1h
: Count of transactions in the last hour (streaming aggregation).transaction_count_7d
: Count of transactions in the last 7 days (streaming aggregation).avg_amount_1h
: Average transaction amount in the last hour (streaming aggregation).activity_ratio_1h_vs_7d
: The ratio transaction_count_1h / (transaction_count_7d / 7.0)
. Handle division by zero appropriately (e.g., return 0 or null). This combines multiple stream aggregates.normalized_avg_amount_1h
: The avg_amount_1h
divided by the user's account_age_days
(if available and greater than 0, otherwise handle appropriately). This combines stream aggregates and batch features.feature_store apply
or running a materialization job).user_id
s to check if the features are available and have plausible values.This exercise will give you practical experience in defining data sources, implementing multi-step transformations involving aggregations and custom logic, and integrating these computations within a feature store framework, preparing you for building sophisticated feature engineering pipelines in real-world systems.
© 2025 ApX Machine Learning