Data Modeling for ML: Feature Engineering & Training Data
Difficulty: Senior Level | Companies: Netflix, Uber, Stripe, Airbnb, Spotify
1. ML Data Pipeline Architecture
2. Training Data Preparation
Point-in-Time Correct Dataset
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("MLDataPipeline").getOrCreate()
def create_training_dataset(events_df, labels_df, feature_store):
"""
Create a training dataset with point-in-time correctness.
For each event (label time), join features computed BEFORE that time.
"""
# Get all label events
labels = labels_df.select(
"user_id",
"event_timestamp", # When the label was generated
"label" # The target variable
)
# For each label, get the features computed before the label time
training_data = labels.alias("l").join(
feature_store.alias("f"),
on=(labels["user_id"] == feature_store["user_id"]) &
(feature_store["computed_at"] <= labels["event_timestamp"]),
how="left"
)
# Window to get the most recent features before each label
window_spec = Window \
.partitionBy("l.user_id", "l.event_timestamp") \
.orderBy(F.col("f.computed_at").desc())
training_data = training_data \
.withColumn("rn", F.row_number().over(window_spec)) \
.filter(F.col("rn") == 1) \
.drop("rn", "f.user_id", "f.computed_at")
return training_data
β οΈ
Critical: Without point-in-time correctness, your model sees future information during training. This creates overly optimistic metrics that don't translate to production.
3. Feature Engineering Patterns
User-Level Features
def compute_user_features(interactions_df):
"""Compute user-level features from interaction data"""
user_features = interactions_df \
.groupBy("user_id") \
.agg(
# Engagement features
F.count("*").alias("total_interactions"),
F.countDistinct("session_id").alias("total_sessions"),
F.avg("session_duration_seconds").alias("avg_session_duration"),
# Recency features
F.max("event_timestamp").alias("last_active_timestamp"),
F.min("event_timestamp").alias("first_active_timestamp"),
# Behavioral features
F.avg("clicks_per_session").alias("avg_clicks_per_session"),
F.sum("purchases").alias("total_purchases"),
F.sum("revenue").alias("lifetime_revenue"),
# Diversity features
F.countDistinct("category").alias("unique_categories"),
F.countDistinct("device").alias("unique_devices"),
) \
.withColumn(
"days_since_first_active",
F.datediff(F.current_date(), F.col("first_active_timestamp"))
) \
.withColumn(
"days_since_last_active",
F.datediff(F.current_date(), F.col("last_active_timestamp"))
) \
.withColumn(
"engagement_rate",
F.col("total_sessions") / F.col("days_since_first_active")
)
return user_features
Windowed Features
def compute_windowed_features(events_df):
"""Compute features over different time windows"""
windows = {
"1d": 1,
"7d": 7,
"30d": 30,
"90d": 90,
}
features = events_df
for window_name, days in windows.items():
window_spec = Window \
.partitionBy("user_id") \
.orderBy(F.col("event_timestamp").cast("long")) \
.rangeBetween(-days * 86400, 0)
features = features \
.withColumn(
f"spend_{window_name}",
F.sum("amount").over(window_spec)
) \
.withColumn(
f"purchase_count_{window_name}",
F.sum(F.when(F.col("event_type") == "PURCHASE", 1).otherwise(0))
.over(window_spec)
) \
.withColumn(
f"avg_order_value_{window_name}",
F.avg(F.when(F.col("event_type") == "PURCHASE", F.col("amount")))
.over(window_spec)
)
return features
4. Data Leakage Detection
Common Leakage Patterns
class LeakageDetector:
"""Detect data leakage in training datasets"""
def __init__(self):
self.leakage_patterns = []
def check_future_features(self, df, label_timestamp_col, feature_timestamp_cols):
"""Check if any features are computed after the label timestamp"""
for feat_col in feature_timestamp_cols:
if feat_col in df.columns:
leakage = df.filter(F.col(feat_col) > F.col(label_timestamp_col)).count()
if leakage > 0:
self.leakage_patterns.append({
"type": "future_feature",
"column": feat_col,
"affected_rows": leakage
})
def check_target_leakage(self, df, target_col, feature_cols):
"""Check for features that are too correlated with the target"""
from pyspark.ml.stat import Correlation
for feat_col in feature_cols:
corr = df.stat.corr(target_col, feat_col)
if abs(corr) > 0.9:
self.leakage_patterns.append({
"type": "target_leakage",
"column": feat_col,
"correlation": corr
})
def check_temporal_leakage(self, df, date_col, target_col):
"""Check if data from the future is used to predict the past"""
# Group by date and check if predictions improve over time
# (they shouldn't if there's no leakage)
pass
β οΈ
Common Leakage Patterns:
- Using future data to compute features (e.g., "next month's revenue" to predict churn)
- Including the target variable as a feature (e.g., using purchase_amount to predict purchase)
- Including identifiers that encode the target (e.g., user_id when users are sampled by label)
5. Label Engineering
class LabelEngineer:
"""Create labels for different ML tasks"""
@staticmethod
def create_churn_label(events_df, churn_window_days=30, observation_date=None):
"""Create binary churn label"""
if observation_date is None:
observation_date = events_df.agg(F.max("event_timestamp")).collect()[0][0]
# Users who were active before observation date
active_users = events_df.filter(
F.col("event_timestamp") <= observation_date
).select("user_id").distinct()
# Users who were active in the churn window
retained_users = events_df.filter(
(F.col("event_timestamp") <= observation_date) &
(F.col("event_timestamp") > F.date_sub(observation_date, churn_window_days))
).select("user_id").distinct()
# Churn = active before but not in churn window
churn_labels = active_users.join(
retained_users, "user_id", "left_anti"
).withColumn("churned", F.lit(1))
return churn_labels
@staticmethod
def create_regression_label(events_df, target_col, window_days=30):
"""Create regression label (e.g., predicted spend in next N days)"""
# This requires careful handling to avoid leakage
pass
6. ML Data Pipeline Orchestration
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
with DAG(
'ml_data_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
) as dag:
# Step 1: Compute features
compute_features = SparkSubmitOperator(
task_id='compute_features',
application='s3://apps/compute_features.py',
conf={'spark.sql.shuffle.partitions': '200'},
)
# Step 2: Validate features
validate_features = SparkSubmitOperator(
task_id='validate_features',
application='s3://apps/validate_features.py',
)
# Step 3: Create training dataset
create_training = SparkSubmitOperator(
task_id='create_training_dataset',
application='s3://apps/create_training.py',
)
# Step 4: Train model
train_model = SparkSubmitOperator(
task_id='train_model',
application='s3://apps/train_model.py',
)
# Step 5: Evaluate and register
evaluate = SparkSubmitOperator(
task_id='evaluate_model',
application='s3://apps/evaluate.py',
)
compute_features >> validate_features >> create_training >> train_model >> evaluate
βΉοΈ
Best Practice: Always separate feature computation from training data creation. Features should be reusable across models, while training data is specific to each model.
Follow-Up Questions
- How would you handle class imbalance in training data creation?
- Design a feature store that supports both batch and streaming features.
- How do you version training datasets for reproducibility?
- Design a data pipeline for a real-time recommendation system.
- How would you detect and prevent data leakage in a complex ML pipeline?