πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Feature Engineering: Feature Creation, Selection, Importance

Data Science Interview PremiumFeature Engineering⭐ Premium

Advertisement

AMAZON & APPLE INTERVIEW QUESTION

Feature Engineering: Feature Creation, Selection, Importance

Feature Development & Optimization

The Interview Question

ℹ️

Question: You're building a customer churn prediction model for an e-commerce platform. The raw data contains:

  • user_id, signup_date, last_login, total_purchases, total_spend, avg_order_value, category_preferences, device_type, location, support_tickets

Walk through your feature engineering process:

  1. What features would you create from this data?
  2. How would you handle categorical variables?
  3. How do you select the most important features?
  4. How do you ensure your features don't introduce data leakage?

Detailed Answer

1. Feature Creation Framework

Feature engineering transforms raw data into meaningful inputs for machine learning models. Good features capture domain knowledge and patterns that models can leverage.

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.ensemble import RandomForestClassifier
import warnings
warnings.filterwarnings('ignore')

class FeatureEngineer:
    """Comprehensive feature engineering pipeline"""
    
    def __init__(self, df):
        self.df = df.copy()
        self.feature_descriptions = {}
    
    def create_temporal_features(self, date_column, prefix='temporal'):
        """Create time-based features from date columns"""
        df = self.df
        
        # Basic temporal features
        df[f'{prefix}_year'] = df[date_column].dt.year
        df[f'{prefix}_month'] = df[date_column].dt.month
        df[f'{prefix}_day'] = df[date_column].dt.day
        df[f'{prefix}_dayofweek'] = df[date_column].dt.dayofweek
        df[f'{prefix}_is_weekend'] = df[date_column].dt.dayofweek.isin([5, 6]).astype(int)
        df[f'{prefix}_quarter'] = df[date_column].dt.quarter
        
        # Cyclical encoding for time features
        df[f'{prefix}_month_sin'] = np.sin(2 * np.pi * df[f'{prefix}_month'] / 12)
        df[f'{prefix}_month_cos'] = np.cos(2 * np.pi * df[f'{prefix}_month'] / 12)
        df[f'{prefix}_day_sin'] = np.sin(2 * np.pi * df[f'{prefix}_day'] / 31)
        df[f'{prefix}_day_cos'] = np.cos(2 * np.pi * df[f'{prefix}_day'] / 31)
        
        # Days since key events
        current_date = pd.Timestamp.now()
        df[f'{prefix}_days_since'] = (current_date - df[date_column]).dt.days
        df[f'{prefix}_months_since'] = ((current_date - df[date_column]).dt.days / 30).astype(int)
        
        self.df = df
        return self
    
    def create_recency_frequency_monetary_features(self):
        """Create RFM features for customer analysis"""
        df = self.df
        
        # Recency: Days since last activity
        if 'last_login' in df.columns:
            df['recency_days'] = (pd.Timestamp.now() - df['last_login']).dt.days
            df['recency_weeks'] = df['recency_days'] / 7
            df['recency_months'] = df['recency_days'] / 30
        
        # Frequency: Count-based features
        if 'total_purchases' in df.columns:
            df['purchase_frequency'] = df['total_purchases'] / df['days_since_signup']
            df['avg_days_between_purchases'] = df['days_since_signup'] / (df['total_purchases'] + 1)
        
        # Monetary: Value-based features
        if 'total_spend' in df.columns and 'total_purchases' in df.columns:
            df['avg_order_value'] = df['total_spend'] / (df['total_purchases'] + 1)
            df['spend_per_day'] = df['total_spend'] / (df['days_since_signup'] + 1)
            df['spend_consistency'] = df['total_spend'] / (df['total_purchases'] + 1)
        
        self.df = df
        return self
    
    def create_ratio_features(self):
        """Create ratio and interaction features"""
        df = self.df
        
        # Ratios between related features
        if 'total_spend' in df.columns and 'total_purchases' in df.columns:
            df['spend_per_purchase'] = df['total_spend'] / (df['total_purchases'] + 1)
        
        if 'support_tickets' in df.columns and 'total_purchases' in df.columns:
            df['tickets_per_purchase'] = df['support_tickets'] / (df['total_purchases'] + 1)
        
        if 'total_spend' in df.columns and 'days_since_signup' in df.columns:
            df['monthly_spend'] = df['total_spend'] / (df['days_since_signup'] / 30 + 1)
        
        # Interaction features
        if 'total_purchases' in df.columns and 'avg_order_value' in df.columns:
            df['purchase_value_interaction'] = df['total_purchases'] * df['avg_order_value']
        
        if 'recency_days' in df.columns and 'total_purchases' in df.columns:
            df['engagement_score'] = df['total_purchases'] / (df['recency_days'] + 1)
        
        self.df = df
        return self
    
    def create_rolling_features(self, window_columns, windows=[7, 30, 90]):
        """Create rolling window features"""
        df = self.df
        
        for col in window_columns:
            if col in df.columns:
                for window in windows:
                    df[f'{col}_rolling_{window}'] = df[col].rolling(window=window, min_periods=1).mean()
                    df[f'{col}_rolling_{window}_std'] = df[col].rolling(window=window, min_periods=1).std()
                    df[f'{col}_rolling_{window}_max'] = df[col].rolling(window=window, min_periods=1).max()
                    df[f'{col}_rolling_{window}_min'] = df[col].rolling(window=window, min_periods=1).min()
        
        self.df = df
        return self
    
    def create_aggregation_features(self, group_column, value_columns):
        """Create aggregation features by group"""
        df = self.df
        
        for value_col in value_columns:
            if value_col in df.columns and group_column in df.columns:
                # Group-level aggregations
                group_stats = df.groupby(group_column)[value_col].agg(['mean', 'std', 'min', 'max', 'median'])
                group_stats.columns = [f'{value_col}_{group_column}_{stat}' for stat in group_stats.columns]
                df = df.merge(group_stats, on=group_column, how='left')
                
                # Relative features (how does this row compare to group)
                df[f'{value_col}_vs_{group_column}_mean'] = df[value_col] / (df[f'{value_col}_{group_column}_mean'] + 1)
                df[f'{value_col}_vs_{group_column}_std'] = (df[value_col] - df[f'{value_col}_{group_column}_mean']) / (df[f'{value_col}_{group_column}_std'] + 1)
        
        self.df = df
        return self
    
    def create_text_features(self, text_column, prefix='text'):
        """Create features from text data"""
        df = self.df
        
        if text_column in df.columns:
            # Basic text features
            df[f'{prefix}_length'] = df[text_column].str.len()
            df[f'{prefix}_word_count'] = df[text_column].str.split().str.len()
            df[f'{prefix}_avg_word_length'] = df[f'{prefix}_length'] / (df[f'{prefix}_word_count'] + 1)
            
            # Special character counts
            df[f'{prefix}_exclamation_count'] = df[text_column].str.count('!')
            df[f'{prefix}_question_count'] = df[text_column].str.count(r'\?')
            df[f'{prefix}_uppercase_ratio'] = df[text_column].apply(
                lambda x: sum(1 for c in str(x) if c.isupper()) / (len(str(x)) + 1)
            )
        
        self.df = df
        return self

2. Categorical Variable Encoding

class CategoricalEncoder:
    """Multiple categorical encoding strategies"""
    
    def __init__(self):
        self.encoders = {}
        self.encoding_maps = {}
    
    def one_hot_encoding(self, df, columns, drop_first=True):
        """One-hot encoding for low cardinality"""
        df_encoded = df.copy()
        for col in columns:
            if col in df_encoded.columns:
                dummies = pd.get_dummies(df_encoded[col], prefix=col, drop_first=drop_first)
                df_encoded = pd.concat([df_encoded, dummies], axis=1)
                df_encoded.drop(col, axis=1, inplace=True)
        return df_encoded
    
    def label_encoding(self, df, columns):
        """Label encoding for ordinal categories"""
        df_encoded = df.copy()
        for col in columns:
            if col in df_encoded.columns:
                le = LabelEncoder()
                df_encoded[f'{col}_encoded'] = le.fit_transform(df_encoded[col].astype(str))
                self.encoders[col] = le
        return df_encoded
    
    def frequency_encoding(self, df, columns):
        """Frequency encoding based on value counts"""
        df_encoded = df.copy()
        for col in columns:
            if col in df_encoded.columns:
                freq_map = df_encoded[col].value_counts(normalize=True).to_dict()
                df_encoded[f'{col}_frequency'] = df_encoded[col].map(freq_map)
                self.encoding_maps[f'{col}_frequency'] = freq_map
        return df_encoded
    
    def target_encoding(self, df, columns, target, smoothing=10):
        """Target encoding with smoothing"""
        df_encoded = df.copy()
        for col in columns:
            if col in df_encoded.columns:
                global_mean = df_encoded[target].mean()
                agg = df_encoded.groupby(col)[target].agg(['mean', 'count'])
                
                # Smoothing formula
                smooth = (agg['count'] * agg['mean'] + smoothing * global_mean) / (agg['count'] + smoothing)
                
                df_encoded[f'{col}_target_enc'] = df_encoded[col].map(smooth)
                self.encoding_maps[f'{col}_target_enc'] = smooth.to_dict()
        return df_encoded
    
    def binary_encoding(self, df, columns):
        """Binary encoding for high cardinality"""
        df_encoded = df.copy()
        for col in columns:
            if col in df_encoded.columns:
                # Create integer codes
                codes = df_encoded[col].astype('category').cat.codes
                
                # Convert to binary
                n_bits = int(np.ceil(np.log2(codes.max() + 1)))
                for i in range(n_bits):
                    df_encoded[f'{col}_bit_{i}'] = (codes >> i) & 1
                
                df_encoded.drop(col, axis=1, inplace=True)
        return df_encoded
    
    def hash_encoding(self, df, columns, n_features=32):
        """Hash encoding for very high cardinality"""
        from sklearn.feature_extraction import FeatureHasher
        
        df_encoded = df.copy()
        for col in columns:
            if col in df_encoded.columns:
                hasher = FeatureHasher(n_features=n_features, input_type='string')
                hashed = hasher.transform(df_encoded[col].astype(str).values.reshape(-1, 1))
                
                # Create column names
                hash_cols = [f'{col}_hash_{i}' for i in range(n_features)]
                hash_df = pd.DataFrame(hashed.toarray(), columns=hash_cols, index=df_encoded.index)
                
                df_encoded = pd.concat([df_encoded, hash_df], axis=1)
                df_encoded.drop(col, axis=1, inplace=True)
        return df_encoded

Encoding Strategy Guide:

CardinalityEncoding MethodProsCons
Low (≀10)One-HotSimple, interpretableIncreases dimensionality
Medium (10-100)BinaryCompact representationLoss of interpretability
High (>100)Target/MeanPreserves target infoRisk of overfitting
Very High (>1000)HashFixed dimensionsCollisions possible

3. Feature Selection Methods

class FeatureSelector:
    """Multiple feature selection strategies"""
    
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.feature_scores = {}
    
    def filter_methods(self, k=10):
        """Statistical tests for feature selection"""
        results = {}
        
        # ANOVA F-test
        selector_f = SelectKBest(f_classif, k=k)
        selector_f.fit(self.X, self.y)
        results['f_test'] = pd.DataFrame({
            'feature': self.X.columns,
            'score': selector_f.scores_,
            'p_value': selector_f.pvalues_
        }).sort_values('score', ascending=False)
        
        # Mutual Information
        selector_mi = SelectKBest(mutual_info_classif, k=k)
        selector_mi.fit(self.X, self.y)
        results['mutual_info'] = pd.DataFrame({
            'feature': self.X.columns,
            'score': selector_mi.scores_
        }).sort_values('score', ascending=False)
        
        return results
    
    def wrapper_methods(self, method='recursive'):
        """Wrapper methods for feature selection"""
        from sklearn.feature_selection import RFE, RFECV
        from sklearn.model_selection import cross_val_score
        
        if method == 'recursive':
            # Recursive Feature Elimination
            estimator = RandomForestClassifier(n_estimators=100, random_state=42)
            selector = RFE(estimator, n_features_to_select=10, step=1)
            selector.fit(self.X, self.y)
            
            results = pd.DataFrame({
                'feature': self.X.columns,
                'selected': selector.support_,
                'ranking': selector.ranking_
            }).sort_values('ranking')
            
            return results
        
        elif method == 'recursive_cv':
            # RFECV with cross-validation
            estimator = RandomForestClassifier(n_estimators=100, random_state=42)
            selector = RFECV(estimator, step=1, cv=5, scoring='accuracy')
            selector.fit(self.X, self.y)
            
            print(f"Optimal number of features: {selector.n_features_}")
            return selector
    
    def embedded_methods(self):
        """Embedded methods using model importance"""
        results = {}
        
        # Random Forest feature importance
        rf = RandomForestClassifier(n_estimators=100, random_state=42)
        rf.fit(self.X, self.y)
        results['random_forest'] = pd.DataFrame({
            'feature': self.X.columns,
            'importance': rf.feature_importances_
        }).sort_values('importance', ascending=False)
        
        # L1 regularization (Lasso)
        from sklearn.linear_model import LogisticRegression
        from sklearn.feature_selection import SelectFromModel
        
        lr = LogisticRegression(penalty='l1', solver='liblinear', random_state=42)
        lr.fit(self.X, self.y)
        results['lasso'] = pd.DataFrame({
            'feature': self.X.columns,
            'coefficient': np.abs(lr.coef_[0])
        }).sort_values('coefficient', ascending=False)
        
        # Tree-based importance
        from sklearn.tree import DecisionTreeClassifier
        dt = DecisionTreeClassifier(random_state=42)
        dt.fit(self.X, self.y)
        results['decision_tree'] = pd.DataFrame({
            'feature': self.X.columns,
            'importance': dt.feature_importances_
        }).sort_values('importance', ascending=False)
        
        return results
    
    def correlation_analysis(self, threshold=0.8):
        """Remove highly correlated features"""
        corr_matrix = self.X.corr().abs()
        
        # Get upper triangle
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        
        # Find features with correlation > threshold
        to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
        
        print(f"Features to drop due to high correlation (>{threshold}):")
        for col in to_drop:
            correlated_with = upper[col][upper[col] > threshold].index.tolist()
            print(f"  {col}: correlated with {correlated_with}")
        
        return to_drop
    
    def comprehensive_selection(self, n_features=15):
        """Comprehensive feature selection combining multiple methods"""
        print("Running comprehensive feature selection...")
        print("=" * 50)
        
        # 1. Filter methods
        print("\n1. Filter Methods (ANOVA F-test):")
        filter_results = self.filter_methods()
        top_filter = filter_results['f_test'].head(n_features)['feature'].tolist()
        print(f"   Top {n_features} features: {top_filter[:5]}...")
        
        # 2. Embedded methods
        print("\n2. Embedded Methods (Random Forest):")
        embedded_results = self.embedded_methods()
        top_embedded = embedded_results['random_forest'].head(n_features)['feature'].tolist()
        print(f"   Top {n_features} features: {top_embedded[:5]}...")
        
        # 3. Correlation analysis
        print("\n3. Correlation Analysis:")
        to_drop = self.correlation_analysis(threshold=0.85)
        
        # Combine results
        all_features = set(self.X.columns)
        selected_features = all_features - set(to_drop)
        
        # Rank by consensus
        feature_rank = {}
        for feature in selected_features:
            rank = 0
            if feature in top_filter:
                rank += top_filter.index(feature)
            if feature in top_embedded:
                rank += top_embedded.index(feature)
            feature_rank[feature] = rank
        
        final_features = sorted(feature_rank.keys(), key=lambda x: feature_rank[x])[:n_features]
        
        print(f"\nFinal selected features ({len(final_features)}):")
        print(final_features)
        
        return final_features

4. Data Leakage Prevention

class LeakagePrevention:
    """Methods to prevent data leakage in feature engineering"""
    
    def __init__(self, df, target_column):
        self.df = df
        self.target = target_column
    
    def temporal_split_validation(self, date_column, test_days=30):
        """Ensure temporal features don't leak future information"""
        # Sort by date
        df_sorted = self.df.sort_values(date_column)
        
        # Split by time
        cutoff_date = df_sorted[date_column].max() - timedelta(days=test_days)
        
        train = df_sorted[df_sorted[date_column] <= cutoff_date]
        test = df_sorted[df_sorted[date_column] > cutoff_date]
        
        print(f"Train set: {len(train)} rows (up to {cutoff_date.date()})")
        print(f"Test set: {len(test)} rows (after {cutoff_date.date()})")
        
        return train, test
    
    def check_target_leakage(self, features, threshold=0.9):
        """Check for features that leak target information"""
        suspicious_features = []
        
        for feature in features:
            if feature in self.df.columns and self.target in self.df.columns:
                correlation = self.df[feature].corr(self.df[self.target])
                
                if abs(correlation) > threshold:
                    suspicious_features.append({
                        'feature': feature,
                        'correlation': correlation,
                        'severity': 'high' if abs(correlation) > 0.95 else 'medium'
                    })
        
        if suspicious_features:
            print("Warning: Potential target leakage detected!")
            for sf in suspicious_features:
                print(f"  {sf['feature']}: correlation = {sf['correlation']:.3f} ({sf['severity']})")
        else:
            print("No target leakage detected")
        
        return suspicious_features
    
    def time_aware_features(self, date_column, feature_definition):
        """Create features that respect temporal ordering"""
        df = self.df.copy()
        
        # Sort by user and date
        df = df.sort_values([feature_definition['group_column'], date_column])
        
        # Only use past data for each row
        for window in feature_definition['windows']:
            # Rolling features using only past data
            df[f'{feature_definition["value_column"]}_past_{window}'] = (
                df.groupby(feature_definition['group_column'])[feature_definition['value_column']]
                .transform(lambda x: x.shift(1).rolling(window=window, min_periods=1).mean())
            )
        
        return df
    
    def cross_validation_strategy(self, n_splits=5, groups=None):
        """Appropriate cross-validation for time series"""
        from sklearn.model_selection import TimeSeriesSplit
        
        if groups is not None:
            # Group-based CV
            from sklearn.model_selection import GroupKFold
            cv = GroupKFold(n_splits=n_splits)
            splits = list(cv.split(self.df, self.df[self.target], groups))
        else:
            # Time series CV
            cv = TimeSeriesSplit(n_splits=n_splits)
            splits = list(cv.split(self.df))
        
        print(f"Cross-validation strategy: {type(cv).__name__}")
        print(f"Number of splits: {n_splits}")
        
        return splits

⚠️

Critical Warning: Data leakage is the #1 cause of overly optimistic model performance. Always ensure:

  • Features are computed using only past data
  • No target information leaks into features
  • Test set is completely unseen during feature engineering

5. Real-World Application: Churn Prediction Features

def create_churn_features(df):
    """Complete feature engineering for churn prediction"""
    
    features = pd.DataFrame()
    features['user_id'] = df['user_id']
    
    # 1. Recency features
    features['days_since_last_login'] = (pd.Timestamp.now() - df['last_login']).dt.days
    features['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days
    
    # 2. Activity features
    features['total_purchases'] = df['total_purchases']
    features['total_spend'] = df['total_spend']
    features['avg_order_value'] = df['total_spend'] / (df['total_purchases'] + 1)
    
    # 3. Engagement features
    features['purchases_per_month'] = df['total_purchases'] / (features['days_since_signup'] / 30 + 1)
    features['spend_per_month'] = df['total_spend'] / (features['days_since_signup'] / 30 + 1)
    features['engagement_score'] = df['total_purchases'] / (features['days_since_last_login'] + 1)
    
    # 4. Support features
    features['support_tickets'] = df['support_tickets']
    features['tickets_per_purchase'] = df['support_tickets'] / (df['total_purchases'] + 1)
    
    # 5. Trend features (if historical data available)
    # These would require historical data
    # features['purchase_trend'] = ...
    # features['spend_trend'] = ...
    
    # 6. Categorical features (encoded)
    features = pd.get_dummies(features, columns=['device_type', 'location'], drop_first=True)
    
    return features

6. Common Follow-Up Questions

Follow-up 1: How do you handle feature engineering for different model types?

# Feature engineering varies by model type
def feature_engineering_by_model(df, model_type):
    """Adapt features based on model type"""
    
    if model_type == 'linear':
        # Linear models need:
        # - No multicollinearity
        # - Normalized/standardized features
        # - Interaction terms explicit
        from sklearn.preprocessing import StandardScaler
        
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        scaler = StandardScaler()
        df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
        
        # Add interaction terms
        for i in range(len(numeric_cols)):
            for j in range(i+1, len(numeric_cols)):
                df[f'{numeric_cols[i]}_x_{numeric_cols[j]}'] = (
                    df[numeric_cols[i]] * df[numeric_cols[j]]
                )
    
    elif model_type == 'tree':
        # Tree models need:
        # - Raw values (no scaling needed)
        # - Can handle non-linear relationships
        # - Feature engineering focuses on domain knowledge
        pass
    
    elif model_type == 'neural_network':
        # Neural networks need:
        # - Scaled features
        # - Embedding layers for categoricals
        # - Batch normalization helps
        pass
    
    return df

Follow-up 2: How do you monitor feature drift?

# Feature drift detection
def detect_feature_drift(reference_data, current_data, feature_columns, threshold=0.1):
    """Detect if features have drifted over time"""
    from scipy.stats import ks_2samp
    
    drift_results = []
    
    for feature in feature_columns:
        if feature in reference_data.columns and feature in current_data.columns:
            # KS test for distribution shift
            stat, p_value = ks_2samp(reference_data[feature].dropna(), 
                                     current_data[feature].dropna())
            
            # Population Stability Index (PSI)
            psi = calculate_psi(reference_data[feature], current_data[feature])
            
            drift_results.append({
                'feature': feature,
                'ks_statistic': stat,
                'ks_p_value': p_value,
                'psi': psi,
                'drifted': p_value < 0.05 or psi > threshold
            })
    
    drift_df = pd.DataFrame(drift_results)
    drifted_features = drift_df[drift_df['drifted']]['feature'].tolist()
    
    if drifted_features:
        print(f"Warning: {len(drifted_features)} features have drifted:")
        print(drifted_features)
    else:
        print("No significant feature drift detected")
    
    return drift_df

def calculate_psi(reference, current, bins=10):
    """Calculate Population Stability Index"""
    # Create bins from reference data
    breakpoints = np.percentile(reference.dropna(), np.linspace(0, 100, bins + 1))
    breakpoints[0] = -np.inf
    breakpoints[-1] = np.inf
    
    # Calculate proportions
    ref_proportions = np.histogram(reference.dropna(), bins=breakpoints)[0] / len(reference.dropna())
    curr_proportions = np.histogram(current.dropna(), bins=breakpoints)[0] / len(current.dropna())
    
    # Avoid division by zero
    ref_proportions = np.where(ref_proportions == 0, 0.0001, ref_proportions)
    curr_proportions = np.where(curr_proportions == 0, 0.0001, curr_proportions)
    
    # Calculate PSI
    psi = np.sum((curr_proportions - ref_proportions) * np.log(curr_proportions / ref_proportions))
    
    return psi

Company-Specific Tips

ℹ️

Amazon Tips:

  • Amazon values business-driven feature engineering
  • Be prepared to discuss features for recommendation systems
  • Know how to engineer features for real-time serving
  • Understand feature stores and feature pipelines

Apple Tips:

  • Apple focuses on privacy-preserving feature engineering
  • Know how to do federated learning feature engineering
  • Be comfortable with on-device feature computation
  • Understand differential privacy in feature creation

Quiz Section


Related Topics

Advertisement