Real-Time Feature Store
Feature stores bridge the gap between data engineering and ML, ensuring features are computed consistently for training and serving.
Feature Store Architecture
Feast Implementation
from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, PushSource
from feast.feature_store import FeatureStore
from datetime import timedelta
import pandas as pd
# Define entities
driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="Unique driver identifier"
)
# Define features
driver_stats = FeatureView(
name="driver_statistics",
entities=["driver_id"],
ttl=timedelta(hours=1),
features=[
Feature(name="avg_daily_rides", value_type=ValueType.FLOAT),
Feature(name="total_rides_7d", value_type=ValueType.INT64),
Feature(name="avg_rating", value_type=ValueType.FLOAT),
Feature(name="acceptance_rate", value_type=ValueType.FLOAT),
],
online=True,
source=FileSource(
path="data/driver_stats.parquet",
event_timestamp_column="event_timestamp"
)
)
# Push source for real-time features
driver_push = PushSource(
name="driver_push_source",
batch_source=FileSource(
path="data/driver_stats.parquet",
event_timestamp_column="event_timestamp"
)
)
driver_realtime = FeatureView(
name="driver_realtime",
entities=["driver_id"],
ttl=timedelta(minutes=5),
features=[
Feature(name="current_location_lat", value_type=ValueType.FLOAT),
Feature(name="current_location_lon", value_type=ValueType.FLOAT),
Feature(name="last_ride_minutes_ago", value_type=ValueType.INT64),
],
online=True,
source=driver_push
)
# Initialize and apply
store = FeatureStore(repo_path=".")
store.apply([driver, driver_stats, driver_realtime])
# Online retrieval
feature_vector = store.get_online_features(
features=[
"driver_statistics:avg_daily_rides",
"driver_statistics:avg_rating",
"driver_realtime:last_ride_minutes_ago"
],
entity_rows=[
{"driver_id": 1001},
{"driver_id": 1002},
{"driver_id": 1003}
]
).to_dict()
# Push real-time features
store.push(
"driver_realtime",
pd.DataFrame({
"driver_id": [1001],
"event_timestamp": [pd.Timestamp.now()],
"current_location_lat": [37.7749],
"current_location_lon": [-122.4194],
"last_ride_minutes_ago": [5]
})
)
Feature Engineering Pipeline
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class FeatureEngineeringPipeline:
def __init__(self, feature_store):
self.store = feature_store
def compute_driver_features(self, rides_df, window_hours=24):
"""Compute driver features from ride data"""
# Time-windowed aggregations
recent_rides = rides_df[
rides_df['timestamp'] > datetime.now() - timedelta(hours=window_hours)
]
features = recent_rides.groupby('driver_id').agg({
'ride_id': 'count',
'fare_amount': ['sum', 'mean', 'std'],
'distance_km': ['sum', 'mean'],
'duration_minutes': ['mean', 'max'],
'rating': ['mean', 'min'],
'timestamp': ['min', 'max']
}).reset_index()
features.columns = ['driver_id', 'ride_count', 'total_fare', 'avg_fare',
'fare_std', 'total_distance', 'avg_distance',
'avg_duration', 'max_duration', 'avg_rating',
'min_rating', 'first_ride', 'last_ride']
# Derived features
features['acceptance_rate'] = self._compute_acceptance_rate(rides_df)
features['efficiency_score'] = features['total_fare'] / features['total_distance']
features['hours_active'] = (
pd.to_datetime(features['last_ride']) - pd.to_datetime(features['first_ride'])
).dt.total_seconds() / 3600
# Lag features
for lag in [1, 7, 30]:
lag_features = self._compute_lag_features(rides_df, lag)
features = features.merge(lag_features, on='driver_id', how='left')
features['event_timestamp'] = datetime.now()
return features
def _compute_acceptance_rate(self, rides_df):
accepted = rides_df[rides_df['status'] == 'completed'].groupby('driver_id').size()
offered = rides_df.groupby('driver_id').size()
return (accepted / offered).fillna(0)
def _compute_lag_features(self, rides_df, lag_days):
cutoff = datetime.now() - timedelta(days=lag_days)
lag_data = rides_df[rides_df['timestamp'] < cutoff]
return lag_data.groupby('driver_id').agg({
'ride_id': 'count',
'fare_amount': 'sum'
}).reset_index().rename(columns={
'ride_id': f'rides_{lag_days}d_ago',
'fare_amount': f'fare_{lag_days}d_ago'
})
def write_to_store(self, features, feature_view):
"""Write features to the feature store"""
self.store.push(
feature_view,
features,
to=PushSource.INCREMENTAL
)
# Usage
pipeline = FeatureEngineeringPipeline(store)
features = pipeline.compute_driver_features(rides_df)
pipeline.write_to_store(features, "driver_statistics")
Online-Offline Consistency
import pandas as pd
import numpy as np
from datetime import datetime
class FeatureConsistencyChecker:
def __init__(self, feature_store):
self.store = feature_store
def check_consistency(self, feature_view, entity_ids, timestamp):
"""Compare online and offline feature values"""
# Get offline features
offline_df = self.store.get_historical_features(
entity_df=pd.DataFrame({
'driver_id': entity_ids,
'event_timestamp': [timestamp] * len(entity_ids)
}),
features=[f"{feature_view}:*"]
).to_df()
# Get online features
online_dict = self.store.get_online_features(
features=[f"{feature_view}:*"],
entity_rows=[{"driver_id": eid} for eid in entity_ids]
).to_dict()
online_df = pd.DataFrame(online_dict)
# Compare
mismatches = []
for col in offline_df.columns:
if col == 'driver_id':
continue
if col in online_df.columns:
offline_vals = offline_df.set_index('driver_id')[col]
online_vals = online_df.set_index('driver_id')[col]
aligned = pd.DataFrame({'offline': offline_vals, 'online': online_vals})
diff = (aligned['offline'] - aligned['online']).abs()
if diff.max() > 0.01: # Tolerance
mismatches.append({
'feature': col,
'max_diff': diff.max(),
'mean_diff': diff.mean(),
'affected_entities': (diff > 0.01).sum()
})
return {
'consistent': len(mismatches) == 0,
'mismatches': mismatches,
'timestamp': timestamp
}
# Feature freshness monitoring
class FeatureFreshnessMonitor:
def __init__(self, feature_store):
self.store = feature_store
def check_freshness(self, feature_views):
"""Check if features are fresh enough"""
results = []
for fv_name, max_age_minutes in feature_views.items():
last_updated = self.store.get_latest_feature_view(fv_name).last_updated
age_minutes = (datetime.now() - last_updated).total_seconds() / 60
results.append({
'feature_view': fv_name,
'last_updated': last_updated,
'age_minutes': age_minutes,
'is_fresh': age_minutes <= max_age_minutes,
'staleness_ratio': age_minutes / max_age_minutes
})
return results
Feature Discovery and Documentation
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class FeatureMetadata:
name: str
description: str
owner: str
data_type: str
entity: str
freshness: str
tags: List[str]
statistics: Dict[str, float] = None
class FeatureCatalog:
def __init__(self):
self.features: Dict[str, FeatureMetadata] = {}
def register(self, metadata: FeatureMetadata):
self.features[metadata.name] = metadata
def search(self, query=None, tags=None, entity=None):
"""Search features by description or tags"""
results = []
for name, meta in self.features.items():
if query and query.lower() not in meta.description.lower():
continue
if tags and not any(t in meta.tags for t in tags):
continue
if entity and meta.entity != entity:
continue
results.append(meta)
return results
def get_statistics(self, feature_name):
"""Get feature statistics from recent data"""
meta = self.features.get(feature_name)
if meta and meta.statistics:
return {
'mean': meta.statistics.get('mean'),
'std': meta.statistics.get('std'),
'null_rate': meta.statistics.get('null_rate'),
'cardinality': meta.statistics.get('cardinality')
}
return None
# Usage
catalog = FeatureCatalog()
catalog.register(FeatureMetadata(
name="driver_avg_daily_rides",
description="Average number of rides per day over the last 30 days",
owner="ml-platform",
data_type="float",
entity="driver_id",
freshness="1 hour",
tags=["driver", "engagement", "activity"],
statistics={"mean": 12.5, "std": 4.2, "null_rate": 0.01, "cardinality": 1500}
))
# Search for features
results = catalog.search(tags=["driver", "engagement"])
Best Practices
- Start with a small set of high-value features, then expand
- Monitor consistency between online and offline stores continuously
- Document everything Β feature names, descriptions, owners, and freshness
- Use push sources for real-time features that need sub-second freshness
- Version feature definitions alongside code for reproducibility