πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Real-Time Feature Store

⭐ Premium

Advertisement

Real-Time Feature Store

Feature stores bridge the gap between data engineering and ML, ensuring features are computed consistently for training and serving.

Feature Store Architecture

Feature Store Online/Offline ArchitectureDataSourcesEvents, LogsOffline StoreSpark / BigQueryBatch ComputationHistorical FeaturesOnline StoreRedis / DynamoDBSub-ms LatencyCurrent FeaturesModel TrainingHistorical FeaturesModel ServingReal-time PredictionsConsistencyOnline β…” OfflineValidation

Feast Implementation

from feast import FeatureStore, Entity, Feature, ValueType
from feast import FileSource, PushSource
from feast.feature_store import FeatureStore
from datetime import timedelta
import pandas as pd

# Define entities
driver = Entity(
    name="driver_id",
    value_type=ValueType.INT64,
    description="Unique driver identifier"
)

# Define features
driver_stats = FeatureView(
    name="driver_statistics",
    entities=["driver_id"],
    ttl=timedelta(hours=1),
    features=[
        Feature(name="avg_daily_rides", value_type=ValueType.FLOAT),
        Feature(name="total_rides_7d", value_type=ValueType.INT64),
        Feature(name="avg_rating", value_type=ValueType.FLOAT),
        Feature(name="acceptance_rate", value_type=ValueType.FLOAT),
    ],
    online=True,
    source=FileSource(
        path="data/driver_stats.parquet",
        event_timestamp_column="event_timestamp"
    )
)

# Push source for real-time features
driver_push = PushSource(
    name="driver_push_source",
    batch_source=FileSource(
        path="data/driver_stats.parquet",
        event_timestamp_column="event_timestamp"
    )
)

driver_realtime = FeatureView(
    name="driver_realtime",
    entities=["driver_id"],
    ttl=timedelta(minutes=5),
    features=[
        Feature(name="current_location_lat", value_type=ValueType.FLOAT),
        Feature(name="current_location_lon", value_type=ValueType.FLOAT),
        Feature(name="last_ride_minutes_ago", value_type=ValueType.INT64),
    ],
    online=True,
    source=driver_push
)

# Initialize and apply
store = FeatureStore(repo_path=".")
store.apply([driver, driver_stats, driver_realtime])

# Online retrieval
feature_vector = store.get_online_features(
    features=[
        "driver_statistics:avg_daily_rides",
        "driver_statistics:avg_rating",
        "driver_realtime:last_ride_minutes_ago"
    ],
    entity_rows=[
        {"driver_id": 1001},
        {"driver_id": 1002},
        {"driver_id": 1003}
    ]
).to_dict()

# Push real-time features
store.push(
    "driver_realtime",
    pd.DataFrame({
        "driver_id": [1001],
        "event_timestamp": [pd.Timestamp.now()],
        "current_location_lat": [37.7749],
        "current_location_lon": [-122.4194],
        "last_ride_minutes_ago": [5]
    })
)

Feature Engineering Pipeline

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class FeatureEngineeringPipeline:
    def __init__(self, feature_store):
        self.store = feature_store
    
    def compute_driver_features(self, rides_df, window_hours=24):
        """Compute driver features from ride data"""
        
        # Time-windowed aggregations
        recent_rides = rides_df[
            rides_df['timestamp'] > datetime.now() - timedelta(hours=window_hours)
        ]
        
        features = recent_rides.groupby('driver_id').agg({
            'ride_id': 'count',
            'fare_amount': ['sum', 'mean', 'std'],
            'distance_km': ['sum', 'mean'],
            'duration_minutes': ['mean', 'max'],
            'rating': ['mean', 'min'],
            'timestamp': ['min', 'max']
        }).reset_index()
        
        features.columns = ['driver_id', 'ride_count', 'total_fare', 'avg_fare',
                            'fare_std', 'total_distance', 'avg_distance',
                            'avg_duration', 'max_duration', 'avg_rating',
                            'min_rating', 'first_ride', 'last_ride']
        
        # Derived features
        features['acceptance_rate'] = self._compute_acceptance_rate(rides_df)
        features['efficiency_score'] = features['total_fare'] / features['total_distance']
        features['hours_active'] = (
            pd.to_datetime(features['last_ride']) - pd.to_datetime(features['first_ride'])
        ).dt.total_seconds() / 3600
        
        # Lag features
        for lag in [1, 7, 30]:
            lag_features = self._compute_lag_features(rides_df, lag)
            features = features.merge(lag_features, on='driver_id', how='left')
        
        features['event_timestamp'] = datetime.now()
        
        return features
    
    def _compute_acceptance_rate(self, rides_df):
        accepted = rides_df[rides_df['status'] == 'completed'].groupby('driver_id').size()
        offered = rides_df.groupby('driver_id').size()
        return (accepted / offered).fillna(0)
    
    def _compute_lag_features(self, rides_df, lag_days):
        cutoff = datetime.now() - timedelta(days=lag_days)
        lag_data = rides_df[rides_df['timestamp'] < cutoff]
        
        return lag_data.groupby('driver_id').agg({
            'ride_id': 'count',
            'fare_amount': 'sum'
        }).reset_index().rename(columns={
            'ride_id': f'rides_{lag_days}d_ago',
            'fare_amount': f'fare_{lag_days}d_ago'
        })
    
    def write_to_store(self, features, feature_view):
        """Write features to the feature store"""
        self.store.push(
            feature_view,
            features,
            to=PushSource.INCREMENTAL
        )

# Usage
pipeline = FeatureEngineeringPipeline(store)
features = pipeline.compute_driver_features(rides_df)
pipeline.write_to_store(features, "driver_statistics")

Online-Offline Consistency

import pandas as pd
import numpy as np
from datetime import datetime

class FeatureConsistencyChecker:
    def __init__(self, feature_store):
        self.store = feature_store
    
    def check_consistency(self, feature_view, entity_ids, timestamp):
        """Compare online and offline feature values"""
        
        # Get offline features
        offline_df = self.store.get_historical_features(
            entity_df=pd.DataFrame({
                'driver_id': entity_ids,
                'event_timestamp': [timestamp] * len(entity_ids)
            }),
            features=[f"{feature_view}:*"]
        ).to_df()
        
        # Get online features
        online_dict = self.store.get_online_features(
            features=[f"{feature_view}:*"],
            entity_rows=[{"driver_id": eid} for eid in entity_ids]
        ).to_dict()
        
        online_df = pd.DataFrame(online_dict)
        
        # Compare
        mismatches = []
        for col in offline_df.columns:
            if col == 'driver_id':
                continue
            
            if col in online_df.columns:
                offline_vals = offline_df.set_index('driver_id')[col]
                online_vals = online_df.set_index('driver_id')[col]
                
                aligned = pd.DataFrame({'offline': offline_vals, 'online': online_vals})
                diff = (aligned['offline'] - aligned['online']).abs()
                
                if diff.max() > 0.01:  # Tolerance
                    mismatches.append({
                        'feature': col,
                        'max_diff': diff.max(),
                        'mean_diff': diff.mean(),
                        'affected_entities': (diff > 0.01).sum()
                    })
        
        return {
            'consistent': len(mismatches) == 0,
            'mismatches': mismatches,
            'timestamp': timestamp
        }

# Feature freshness monitoring
class FeatureFreshnessMonitor:
    def __init__(self, feature_store):
        self.store = feature_store
    
    def check_freshness(self, feature_views):
        """Check if features are fresh enough"""
        results = []
        
        for fv_name, max_age_minutes in feature_views.items():
            last_updated = self.store.get_latest_feature_view(fv_name).last_updated
            age_minutes = (datetime.now() - last_updated).total_seconds() / 60
            
            results.append({
                'feature_view': fv_name,
                'last_updated': last_updated,
                'age_minutes': age_minutes,
                'is_fresh': age_minutes <= max_age_minutes,
                'staleness_ratio': age_minutes / max_age_minutes
            })
        
        return results

Feature Discovery and Documentation

from dataclasses import dataclass
from typing import List, Dict

@dataclass
class FeatureMetadata:
    name: str
    description: str
    owner: str
    data_type: str
    entity: str
    freshness: str
    tags: List[str]
    statistics: Dict[str, float] = None

class FeatureCatalog:
    def __init__(self):
        self.features: Dict[str, FeatureMetadata] = {}
    
    def register(self, metadata: FeatureMetadata):
        self.features[metadata.name] = metadata
    
    def search(self, query=None, tags=None, entity=None):
        """Search features by description or tags"""
        results = []
        
        for name, meta in self.features.items():
            if query and query.lower() not in meta.description.lower():
                continue
            if tags and not any(t in meta.tags for t in tags):
                continue
            if entity and meta.entity != entity:
                continue
            results.append(meta)
        
        return results
    
    def get_statistics(self, feature_name):
        """Get feature statistics from recent data"""
        meta = self.features.get(feature_name)
        if meta and meta.statistics:
            return {
                'mean': meta.statistics.get('mean'),
                'std': meta.statistics.get('std'),
                'null_rate': meta.statistics.get('null_rate'),
                'cardinality': meta.statistics.get('cardinality')
            }
        return None

# Usage
catalog = FeatureCatalog()
catalog.register(FeatureMetadata(
    name="driver_avg_daily_rides",
    description="Average number of rides per day over the last 30 days",
    owner="ml-platform",
    data_type="float",
    entity="driver_id",
    freshness="1 hour",
    tags=["driver", "engagement", "activity"],
    statistics={"mean": 12.5, "std": 4.2, "null_rate": 0.01, "cardinality": 1500}
))

# Search for features
results = catalog.search(tags=["driver", "engagement"])

Best Practices

  1. Start with a small set of high-value features, then expand
  2. Monitor consistency between online and offline stores continuously
  3. Document everything Β– feature names, descriptions, owners, and freshness
  4. Use push sources for real-time features that need sub-second freshness
  5. Version feature definitions alongside code for reproducibility

Advertisement