πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Design Real-Time Fraud Detection System

ML System DesignAnomaly Detection and Classification⭐ Premium

Advertisement

Stripe, PayPal, Banks, FinTech

Design Real-Time Fraud Detection System

Building real-time fraud detection for millions of transactions with sub-100ms latency

Interview Question

"Design a real-time fraud detection system that can analyze millions of transactions per day, detect fraudulent activities with high precision, and provide explainable decisions for compliance requirements, all within 100ms latency."

Difficulty: Hard | Frequently asked at Stripe, PayPal, Square, JPMorgan, Goldman Sachs


1. Requirements Gathering

Functional Requirements

  1. Real-time Transaction Scoring: Evaluate each transaction for fraud risk in real-time
  2. Rule Engine: Configurable business rules for fraud prevention
  3. Alert Generation: Generate alerts for suspicious activities
  4. Case Management: Workflow for fraud analysts to investigate and resolve cases
  5. Explainable Decisions: Provide reasons for fraud decisions for compliance
  6. Feedback Loop: Incorporate analyst decisions to improve models
  7. Batch Analysis: Historical analysis for model training and pattern discovery

Non-Functional Requirements

  1. Latency: < 100ms for real-time scoring (critical path)
  2. Throughput: 10,000+ transactions per second, 500M+ transactions/day
  3. Availability: 99.999% uptime (financial system requirement)
  4. Accuracy: < 0.1% false positive rate, > 95% fraud recall
  5. Explainability: All decisions must be auditable with reasons
  6. Scalability: Handle 10x growth in transaction volume
  7. Compliance: PCI DSS, GDPR, SOC2 compliance

ℹ️

Scale Perspective: Stripe processes over $1 trillion in payments annually. PayPal handles 25+ billion transactions per year. Real-time fraud detection must evaluate each transaction in under 100ms while maintaining extremely low false positive rates to avoid blocking legitimate transactions.


2. High-Level Architecture Overview

The fraud detection system follows a layered architecture: Data Ingestion β†’ Feature Computation β†’ Model Scoring β†’ Decision Engine β†’ Case Management.

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                           DATA SOURCES                                      β”‚
β”‚  Payment Gateway β”‚ Bank APIs β”‚ Merchant Systems β”‚ User Apps β”‚ Device Telemetryβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                         DATA INGESTION LAYER                                β”‚
β”‚  Apache Kafka β”‚ Schema Registry β”‚ Event Validation β”‚ Deduplication          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β–Ό               β–Ό               β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  REAL-TIME PROCESSING  β”‚ β”‚ FEATURE STORE β”‚ β”‚ BATCH PROCESSING     β”‚
β”‚  (Flink/Spark Streaming)β”‚ β”‚ (Redis/DynamoDB)β”‚ β”‚ (Spark/Airflow)     β”‚
β”‚  (< 20ms)              β”‚ β”‚ (< 5ms)        β”‚ β”‚ (Daily/Weekly)       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        FRAUD DETECTION ENGINE                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Rule Engine  β”‚  β”‚ ML Models    β”‚  β”‚ Graph        β”‚  β”‚ Ensemble     β”‚   β”‚
β”‚  β”‚ (Drools)     β”‚  β”‚ (GBDT+NN)    β”‚  β”‚ Analysis     β”‚  β”‚ Scoring      β”‚   β”‚
β”‚  β”‚ (< 5ms)      β”‚  β”‚ (< 30ms)     β”‚  β”‚ (< 20ms)     β”‚  β”‚ (< 10ms)     β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        DECISION ENGINE                                       β”‚
β”‚  Risk Scoring β”‚ Action Decision β”‚ Explanation Generation β”‚ Logging          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β–Ό               β–Ό               β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  TRANSACTION           β”‚ β”‚ FRAUD          β”‚ β”‚ ANALYST              β”‚
β”‚  PROCESSING            β”‚ β”‚ ANALYTICS      β”‚ β”‚ DASHBOARD            β”‚
β”‚  (Approve/Decline)     β”‚ β”‚ (Real-time)    β”‚ β”‚ (Case Management)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ’‘

Key Insight: The fraud detection system must balance three competing objectives: catching fraud (high recall), avoiding false positives (high precision), and making fast decisions (low latency). Different components optimize for different aspects of this trade-off.


3. Data Pipeline Design

3.1 Transaction Data Model

from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, List
from decimal import Decimal

@dataclass
class Transaction:
    # Core transaction fields
    transaction_id: str
    user_id: str
    merchant_id: str
    amount: Decimal
    currency: str
    timestamp: datetime
    
    # Transaction details
    payment_method: str  # card, bank_transfer, wallet
    card_last_four: Optional[str]
    card_brand: Optional[str]
    card_country: Optional[str]
    
    # Merchant details
    merchant_category: str
    merchant_country: str
    merchant_name: str
    
    # Device and location
    device_fingerprint: Optional[str]
    ip_address: str
    ip_country: str
    user_agent: str
    
    # Session context
    session_id: Optional[str]
    referral_source: Optional[str]
    
    # Risk signals (pre-computed)
    is_3d_secure: bool
    avs_result: Optional[str]
    cvv_result: Optional[str]
    
@dataclass
class FraudLabel:
    transaction_id: str
    is_fraud: bool
    fraud_type: Optional[str]  # account_takeover, card_testing, friendly_fraud
    confidence: float
    analyst_id: Optional[str]
    investigation_notes: Optional[str]
    resolved_at: Optional[datetime]

3.2 Real-time Feature Computation

class RealTimeFeatureComputer:
    """Compute features in real-time for fraud scoring"""
    
    def __init__(self, redis_client, feature_store):
        self.redis = redis_client
        self.feature_store = feature_store
        
    async def compute_features(self, transaction: Transaction) -> Dict:
        """Compute all features for a transaction"""
        features = {}
        
        # User velocity features (real-time from Redis)
        user_key = f"user:{transaction.user_id}"
        user_features = await self.compute_user_velocity(transaction, user_key)
        features.update(user_features)
        
        # Card velocity features
        card_key = f"card:{transaction.card_last_four}:{transaction.card_brand}"
        card_features = await self.compute_card_velocity(transaction, card_key)
        features.update(card_features)
        
        # Merchant features
        merchant_key = f"merchant:{transaction.merchant_id}"
        merchant_features = await self.compute_merchant_features(transaction, merchant_key)
        features.update(merchant_features)
        
        # Geographic features
        geo_features = await self.compute_geo_features(transaction)
        features.update(geo_features)
        
        # Device features
        device_features = await self.compute_device_features(transaction)
        features.update(device_features)
        
        # Historical features (from feature store)
        historical_features = await self.feature_store.get_features(
            user_id=transaction.user_id,
            feature_names=['avg_transaction_amount', 'avg_transactions_per_day',
                          'preferred_merchants', 'usual_countries']
        )
        features.update(historical_features)
        
        return features
    
    async def compute_user_velocity(self, transaction, user_key):
        """Compute user transaction velocity features"""
        pipe = self.redis.pipeline()
        
        # Time windows for velocity
        windows = [
            ('1h', 3600),
            ('24h', 86400),
            ('7d', 604800),
            ('30d', 2592000)
        ]
        
        for window_name, window_seconds in windows:
            key = f"{user_key}:transactions:{window_name}"
            pipe.zcount(key, transaction.timestamp.timestamp() - window_seconds, 
                       transaction.timestamp.timestamp())
        
        counts = await pipe.execute()
        
        return {
            'user_txn_count_1h': counts[0],
            'user_txn_count_24h': counts[1],
            'user_txn_count_7d': counts[2],
            'user_txn_count_30d': counts[3],
            'user_txn_count_avg_daily': counts[3] / 30,
            'user_txn_frequency_ratio': counts[0] / max(counts[3] / 30, 0.001)
        }
    
    async def compute_geo_features(self, transaction):
        """Compute geographic risk features"""
        # Check if transaction country matches user's usual countries
        usual_countries = await self.redis.smembers(
            f"user:{transaction.user_id}:usual_countries"
        )
        
        is_unusual_country = transaction.card_country not in usual_countries
        
        # Check distance from last transaction
        last_location = await self.redis.get(
            f"user:{transaction.user_id}:last_location"
        )
        
        if last_location:
            distance = self.calculate_distance(
                last_location, 
                transaction.ip_country
            )
            time_diff = (transaction.timestamp - last_location['timestamp']).total_seconds()
            
            # Impossible travel check
            max_speed_kmh = 1000  # Commercial flight speed
            impossible_travel = distance / (time_diff / 3600) > max_speed_kmh if time_diff > 0 else False
        else:
            distance = None
            impossible_travel = False
        
        return {
            'is_unusual_country': is_unusual_country,
            'distance_from_last_txn_km': distance,
            'impossible_travel_detected': impossible_travel,
            'country_mismatch': transaction.card_country != transaction.ip_country
        }

3.3 Feature Store Design

class FraudFeatureStore:
    """Feature store for fraud detection features"""
    
    def __init__(self):
        # Online store for real-time serving
        self.online_store = RedisCluster(
            host='redis-cluster',
            port=6379,
            decode_responses=True
        )
        
        # Offline store for training
        self.offline_store = SparkSession.builder \
            .appName("FraudFeatureStore") \
            .getOrCreate()
    
    async def get_features(self, user_id: str, feature_names: List[str]) -> Dict:
        """Get features for online serving"""
        pipeline = self.online_store.pipeline()
        
        for feature_name in feature_names:
            key = f"features:{user_id}:{feature_name}"
            pipeline.hgetall(key)
        
        results = await pipeline.execute()
        
        return {
            feature_name: self.deserialize(result)
            for feature_name, result in zip(feature_names, results)
        }
    
    def compute_daily_features(self, date: str):
        """Batch compute daily features"""
        # Read transaction data
        transactions = self.offline_store.read.parquet(
            f"s3://fraud-data/transactions/date={date}"
        )
        
        # Compute user-level features
        user_features = transactions.groupBy("user_id").agg(
            F.avg("amount").alias("avg_amount"),
            F.stddev("amount").alias("std_amount"),
            F.count("*").alias("txn_count"),
            F.countDistinct("merchant_id").alias("unique_merchants"),
            F.countDistinct("country").alias("unique_countries"),
            F.sum(F.when(F.col("is_fraud") == True, 1).otherwise(0)).alias("fraud_count_30d")
        )
        
        # Compute merchant-level features
        merchant_features = transactions.groupBy("merchant_id").agg(
            F.avg("amount").alias("avg_amount"),
            F.count("*").alias("txn_count"),
            F.sum(F.when(F.col("is_fraud") == True, 1).otherwise(0)).alias("fraud_count_30d"),
            F.countDistinct("user_id").alias("unique_users")
        )
        
        # Store features
        user_features.write.mode("overwrite").parquet(
            f"s3://fraud-features/user-features/date={date}"
        )
        
        merchant_features.write.mode("overwrite").parquet(
            f"s3://fraud-features/merchant-features/date={date}"
        )

⚠️

Critical Feature Engineering Considerations:

  1. Time-based features: Be careful about data leakage - never use future information
  2. Velocity features: Must be computed in real-time with sliding windows
  3. Historical features: Balance freshness with computational cost
  4. Cross-entity features: User-merchant, user-card, merchant-device interactions

4. Model Selection and Training Approach

4.1 Multi-Model Architecture

class FraudDetectionEnsemble:
    """Ensemble of models for different fraud types"""
    
    def __init__(self):
        # Model 1: Card fraud detection
        self.card_fraud_model = self.load_model('card_fraud_v2')
        
        # Model 2: Account takeover detection
        self.account_takeover_model = self.load_model('account_takeover_v1')
        
        # Model 3: Friendly fraud detection
        self.friendly_fraud_model = self.load_model('friendly_fraud_v1')
        
        # Model 4: Card testing detection
        self.card_testing_model = self.load_model('card_testing_v1')
        
        # Meta-model for final scoring
        self.meta_model = self.load_model('meta_scorer_v1')
    
    def predict(self, features: Dict) -> Dict:
        """Get predictions from all models"""
        predictions = {}
        
        # Get individual model predictions
        predictions['card_fraud'] = self.card_fraud_model.predict_proba(
            features['card_features']
        )[1]
        
        predictions['account_takeover'] = self.account_takeover_model.predict_proba(
            features['account_features']
        )[1]
        
        predictions['friendly_fraud'] = self.friendly_fraud_model.predict_proba(
            features['transaction_features']
        )[1]
        
        predictions['card_testing'] = self.card_testing_model.predict_proba(
            features['velocity_features']
        )[1]
        
        # Meta-model combines predictions
        meta_features = np.array([
            predictions['card_fraud'],
            predictions['account_takeover'],
            predictions['friendly_fraud'],
            predictions['card_testing'],
            features['amount_normalized'],
            features['risk_score']
        ]).reshape(1, -1)
        
        final_score = self.meta_model.predict_proba(meta_features)[0][1]
        
        return {
            'fraud_score': final_score,
            'component_scores': predictions,
            'risk_level': self.get_risk_level(final_score)
        }
    
    def get_risk_level(self, score):
        if score > 0.8:
            return 'HIGH'
        elif score > 0.5:
            return 'MEDIUM'
        elif score > 0.2:
            return 'LOW'
        else:
            return 'MINIMAL'

4.2 Gradient Boosted Decision Trees (Primary Model)

import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import precision_recall_curve, auc

class CardFraudModel:
    """Gradient boosted model for card fraud detection"""
    
    def __init__(self):
        self.model = lgb.LGBMClassifier(
            objective='binary',
            metric='auc',
            num_leaves=63,
            learning_rate=0.05,
            n_estimators=1000,
            min_child_samples=100,
            subsample=0.8,
            colsample_bytree=0.8,
            reg_alpha=0.1,
            reg_lambda=1.0,
            scale_pos_weight=100,  # Handle class imbalance
            random_state=42
        )
        
    def train(self, X_train, y_train, X_val, y_val):
        """Train model with early stopping"""
        self.model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            callbacks=[
                lgb.early_stopping(50),
                lgb.log_evaluation(100)
            ]
        )
        
    def cross_validate(self, X, y, n_folds=5):
        """Perform stratified k-fold cross-validation"""
        skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
        
        scores = []
        for train_idx, val_idx in skf.split(X, y):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            self.model.fit(
                X_train, y_train,
                eval_set=[(X_val, y_val)],
                callbacks=[lgb.early_stopping(50)]
            )
            
            y_pred = self.model.predict_proba(X_val)[:, 1]
            precision, recall, _ = precision_recall_curve(y_val, y_pred)
            pr_auc = auc(recall, precision)
            scores.append(pr_auc)
        
        return np.mean(scores), np.std(scores)
    
    def get_feature_importance(self):
        """Get feature importance for explainability"""
        importance = self.model.feature_importances_
        feature_names = self.model.feature_name_
        
        return sorted(
            zip(feature_names, importance),
            key=lambda x: x[1],
            reverse=True
        )

4.3 Neural Network for Complex Patterns

class FraudDetectionNeuralNet(tf.keras.Model):
    """Deep neural network for complex fraud patterns"""
    
    def __init__(self):
        super().__init__()
        
        # Feature embedding layers
        self.user_embedding = tf.keras.layers.Embedding(1000000, 32)
        self.merchant_embedding = tf.keras.layers.Embedding(500000, 16)
        self.device_embedding = tf.keras.layers.Embedding(1000000, 8)
        
        # Feature interaction layers
        self.cross_network = CrossNetwork(num_layers=3)
        
        # Deep network
        self.deep_network = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(64, activation='relu')
        ])
        
        # Attention mechanism for sequence features
        self.attention = tf.keras.layers.MultiHeadAttention(
            num_heads=4, key_dim=32
        )
        
        # Output layer
        self.output_layer = tf.keras.layers.Dense(1, activation='sigmoid')
    
    def call(self, inputs, training=False):
        # Embed categorical features
        user_emb = self.user_embedding(inputs['user_id'])
        merchant_emb = self.merchant_embedding(inputs['merchant_id'])
        device_emb = self.device_embedding(inputs['device_id'])
        
        # Concatenate all features
        numerical_features = inputs['numerical_features']
        categorical_embeddings = tf.concat([
            user_emb, merchant_emb, device_emb
        ], axis=1)
        
        all_features = tf.concat([
            numerical_features, categorical_embeddings
        ], axis=1)
        
        # Apply cross network for feature interactions
        cross_output = self.cross_network(all_features)
        
        # Apply deep network
        deep_output = self.deep_network(all_features)
        
        # Combine and predict
        combined = tf.concat([cross_output, deep_output], axis=1)
        return self.output_layer(combined)

4.4 Handling Class Imbalance

class ImbalancedDataHandler:
    """Techniques for handling extreme class imbalance in fraud detection"""
    
    def __init__(self):
        pass
    
    def oversampling_smote(self, X, y, sampling_ratio=0.1):
        """SMOTE oversampling for minority class"""
        from imblearn.over_sampling import SMOTE
        
        smote = SMOTE(sampling_strategy=sampling_ratio, random_state=42)
        X_resampled, y_resampled = smote.fit_resample(X, y)
        
        return X_resampled, y_resampled
    
    def focal_loss(self, y_true, y_pred, alpha=0.25, gamma=2.0):
        """Focal loss for handling class imbalance"""
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1 - 1e-7)
        
        # Binary cross entropy
        bce = -y_true * tf.math.log(y_pred) - (1 - y_true) * tf.math.log(1 - y_pred)
        
        # Focal modulating factor
        p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
        alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)
        focal_weight = alpha_t * tf.pow(1 - p_t, gamma)
        
        return focal_weight * bce
    
    def compute_class_weights(self, y):
        """Compute class weights for imbalanced data"""
        n_samples = len(y)
        n_fraud = sum(y)
        n_legitimate = n_samples - n_fraud
        
        weight_fraud = n_samples / (2 * n_fraud)
        weight_legitimate = n_samples / (2 * n_legitimate)
        
        return {0: weight_legitimate, 1: weight_fraud}

ℹ️

Class Imbalance Strategy: For fraud detection with 0.1% fraud rate:

  1. Use focal loss or class weights instead of oversampling
  2. Combine multiple techniques: ensemble of models trained with different sampling strategies
  3. Focus on precision-recall AUC, not accuracy
  4. Use threshold tuning based on business requirements (cost of false positives vs false negatives)

5. Serving Architecture

5.1 Real-time Scoring Pipeline

Architecture Diagram
Transaction β†’ Kafka β†’ Feature Computation β†’ Model Scoring β†’ Decision Engine β†’ Response
                     (< 20ms)              (< 30ms)        (< 10ms)
                              β”‚                    β”‚                β”‚
                              β–Ό                    β–Ό                β–Ό
                        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                        β”‚ Feature β”‚          β”‚ Model   β”‚      β”‚ Rule    β”‚
                        β”‚ Store   β”‚          β”‚ Serving β”‚      β”‚ Engine  β”‚
                        β”‚ (Redis) β”‚          β”‚ (TF/ONNX)β”‚      β”‚ (Drools)β”‚
                        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

5.2 Model Serving Options

class ModelServing:
    """Model serving for fraud detection"""
    
    def __init__(self):
        # Option 1: TensorFlow Serving
        self.tf_serving_client = tf_serving_client.Client(
            host='tensorflow-serving:8501'
        )
        
        # Option 2: ONNX Runtime
        self.onnx_session = ort.InferenceSession(
            "fraud_model.onnx",
            providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
        )
        
        # Option 3: Triton Inference Server
        self.triton_client = tritonclient.http.InferenceServerClient(
            url='triton-server:8000'
        )
    
    async def predict(self, features: Dict) -> Dict:
        """Get prediction from model"""
        # Prepare input tensor
        input_tensor = self.prepare_input(features)
        
        # Get prediction
        prediction = self.onnx_session.run(
            ['fraud_probability'],
            {'input': input_tensor}
        )[0]
        
        return {
            'fraud_probability': float(prediction[0]),
            'model_version': 'v2.1.3',
            'inference_time_ms': self.measure_latency()
        }

5.3 Decision Engine

class FraudDecisionEngine:
    """Make final fraud decision based on model scores and rules"""
    
    def __init__(self):
        self.rule_engine = RuleEngine()
        self.decision_history = DecisionHistory()
        
    async def make_decision(self, transaction, model_scores, features):
        """Make final fraud decision"""
        
        # Step 1: Check hard rules (immediate block/allow)
        rule_decision = await self.rule_engine.evaluate(transaction, features)
        if rule_decision == 'BLOCK':
            return {
                'decision': 'DECLINE',
                'reason': 'Hard rule violation',
                'rule_id': rule_decision.rule_id,
                'confidence': 1.0
            }
        
        # Step 2: Get model score
        ml_score = model_scores['fraud_probability']
        
        # Step 3: Combine with business rules
        final_score = self.combine_scores(ml_score, rule_decision, features)
        
        # Step 4: Make decision based on threshold
        if final_score > 0.9:
            decision = 'DECLINE'
        elif final_score > 0.7:
            decision = 'CHALLENGE'  # 3DS or additional verification
        elif final_score > 0.5:
            decision = 'REVIEW'    # Manual review queue
        else:
            decision = 'APPROVE'
        
        # Step 5: Generate explanation
        explanation = self.generate_explanation(
            transaction, model_scores, features, decision
        )
        
        return {
            'decision': decision,
            'fraud_score': final_score,
            'explanation': explanation,
            'risk_level': self.get_risk_level(final_score),
            'recommended_actions': self.get_recommended_actions(decision, final_score)
        }
    
    def generate_explanation(self, transaction, model_scores, features, decision):
        """Generate human-readable explanation for decision"""
        reasons = []
        
        # High-value transaction
        if features.get('amount_normalized', 0) > 3:
            reasons.append("Transaction amount significantly higher than user's average")
        
        # Unusual location
        if features.get('is_unusual_country', False):
            reasons.append("Transaction from unusual country for this user")
        
        # Velocity anomaly
        if features.get('user_txn_count_1h', 0) > 10:
            reasons.append("Unusually high transaction frequency in last hour")
        
        # Device anomaly
        if features.get('new_device', False):
            reasons.append("Transaction from new device")
        
        # Card mismatch
        if features.get('card_country_mismatch', False):
            reasons.append("Card country differs from IP country")
        
        # Model contribution
        if model_scores['card_fraud'] > 0.5:
            reasons.append("Card fraud model indicates high risk")
        
        return {
            'decision_reasons': reasons,
            'primary_factor': reasons[0] if reasons else "Normal transaction pattern",
            'confidence_explanation': f"Model confidence: {model_scores['fraud_probability']:.2%}"
        }

ℹ️

Decision Engine Discussion: When discussing the decision engine, emphasize:

  1. Hard rules vs ML-based decisions
  2. Threshold tuning based on business requirements
  3. Explainability requirements for compliance
  4. Feedback loop from analyst decisions

6. Monitoring and Observability

6.1 Key Metrics

class FraudMonitoringMetrics:
    """Comprehensive monitoring for fraud detection system"""
    
    # Model performance metrics
    MODEL_METRICS = [
        'precision_at_threshold',
        'recall_at_threshold',
        'f1_score',
        'pr_auc',
        'roc_auc',
        'false_positive_rate',
        'false_negative_rate'
    ]
    
    # Business metrics
    BUSINESS_METRICS = [
        'fraud_rate',
        'fraud_loss_amount',
        'chargeback_rate',
        'false_positive_rate',
        'customer_friction_score'
    ]
    
    # Operational metrics
    OPERATIONAL_METRICS = [
        'scoring_latency_p50',
        'scoring_latency_p95',
        'scoring_latency_p99',
        'throughput_tps',
        'error_rate',
        'feature_freshness'
    ]
    
    # Drift metrics
    DRIFT_METRICS = [
        'feature_drift_psi',
        'prediction_drift_ks',
        'model_calibration_error',
        'data_quality_score'
    ]

6.2 Real-time Monitoring Dashboard

class FraudMonitoringDashboard:
    """Real-time monitoring dashboard for fraud detection"""
    
    def __init__(self):
        self.metrics_collector = MetricsCollector()
        self.alert_manager = AlertManager()
        
    async def update_dashboard(self):
        """Update monitoring dashboard"""
        metrics = await self.collect_metrics()
        
        # Check for anomalies
        anomalies = self.detect_anomalies(metrics)
        
        # Send alerts if needed
        for anomaly in anomalies:
            await self.alert_manager.send_alert(anomaly)
        
        # Update dashboard
        await self.update_visualizations(metrics, anomalies)
    
    async def collect_metrics(self):
        """Collect all monitoring metrics"""
        return {
            'real_time': {
                'transactions_per_second': await self.get_tps(),
                'avg_latency_ms': await self.get_avg_latency(),
                'fraud_rate': await self.get_fraud_rate(),
                'false_positive_rate': await self.get_fpr()
            },
            'hourly': {
                'total_transactions': await self.get_hourly_transactions(),
                'total_fraud': await self.get_hourly_fraud(),
                'total_fraud_amount': await self.get_hourly_fraud_amount(),
                'model_drift_score': await self.get_model_drift()
            },
            'daily': {
                'chargeback_rate': await self.get_daily_chargebacks(),
                'customer_complaints': await self.get_complaints(),
                'model_performance': await self.get_model_performance()
            }
        }
    
    def detect_anomalies(self, metrics):
        """Detect anomalies in metrics"""
        anomalies = []
        
        # Check for sudden spike in fraud rate
        current_fraud_rate = metrics['real_time']['fraud_rate']
        if current_fraud_rate > self.baseline_fraud_rate * 2:
            anomalies.append({
                'type': 'FRAUD_SPIKE',
                'severity': 'HIGH',
                'message': f'Fraud rate spike detected: {current_fraud_rate:.2%}',
                'current': current_fraud_rate,
                'baseline': self.baseline_fraud_rate
            })
        
        # Check for latency degradation
        current_latency = metrics['real_time']['avg_latency_ms']
        if current_latency > 100:  # SLA threshold
            anomalies.append({
                'type': 'LATENCY_BREACH',
                'severity': 'MEDIUM',
                'message': f'Latency SLA breach: {current_latency:.2f}ms',
                'current': current_latency,
                'threshold': 100
            })
        
        return anomalies

⚠️

Critical Monitoring Points:

  1. Model drift: Monitor for changes in feature distributions and prediction patterns
  2. Feedback loop: Track how analyst decisions affect future model performance
  3. Concept drift: Fraud patterns evolve - detect when model performance degrades
  4. Alert fatigue: Balance sensitivity with false alarm rate

7. Scale Considerations and Trade-offs

7.1 Horizontal Scaling

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SCALING ARCHITECTURE                                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                             β”‚
β”‚  Transaction Volume                                                         β”‚
β”‚  └── Partition by user_id or merchant_id                                    β”‚
β”‚      β”œβ”€β”€ Shard 1: Users A-H                                                β”‚
β”‚      β”œβ”€β”€ Shard 2: Users I-P                                                β”‚
β”‚      └── Shard 3: Users Q-Z                                                β”‚
β”‚                                                                             β”‚
β”‚  Model Serving                                                              β”‚
β”‚  └── Horizontal scaling with load balancing                                 β”‚
β”‚      β”œβ”€β”€ GPU instances for neural network models                            β”‚
β”‚      └── CPU instances for GBDT models                                     β”‚
β”‚                                                                             β”‚
β”‚  Feature Store                                                              β”‚
β”‚  └── Redis cluster with replication                                         β”‚
β”‚      β”œβ”€β”€ Master for writes                                                  β”‚
β”‚      └── Replicas for reads                                                 β”‚
β”‚                                                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

7.2 Cost vs Performance Trade-offs

DimensionOption A (Cost Optimized)Option B (Performance Optimized)
Model ComplexitySimple GBDT (fast inference)Deep neural network (better accuracy)
Feature FreshnessBatch features (hourly)Real-time features (streaming)
Model RetrainingWeekly retrainingDaily retraining
ExplainabilityPost-hoc explanationsInherently interpretable models
StorageSampled historical dataFull historical data

7.3 Latency Optimization

class LatencyOptimizer:
    """Optimize latency for fraud detection"""
    
    def __init__(self):
        self.feature_cache = LRUCache(maxsize=100000)
        self.model_cache = ModelCache()
        
    async def optimize_scoring(self, transaction):
        """Optimize fraud scoring latency"""
        
        # Step 1: Check feature cache
        cache_key = self.generate_cache_key(transaction)
        cached_features = self.feature_cache.get(cache_key)
        
        if cached_features:
            features = cached_features
        else:
            # Step 2: Compute features (parallel)
            features = await self.compute_features_parallel(transaction)
            self.feature_cache.set(cache_key, features, ttl=60)
        
        # Step 3: Check model cache
        model_version = self.model_cache.get_latest_version()
        if model_version != self.current_model_version:
            await self.update_model(model_version)
        
        # Step 4: Batch scoring for similar transactions
        if self.can_batch(transaction):
            return await self.batch_score(transaction, features)
        else:
            return await self.individual_score(features)
    
    async def compute_features_parallel(self, transaction):
        """Compute features in parallel for lower latency"""
        tasks = [
            self.compute_user_features(transaction.user_id),
            self.compute_card_features(transaction.card_last_four),
            self.compute_merchant_features(transaction.merchant_id),
            self.compute_geo_features(transaction.ip_country),
            self.compute_device_features(transaction.device_fingerprint)
        ]
        
        results = await asyncio.gather(*tasks)
        
        # Merge results
        features = {}
        for result in results:
            features.update(result)
        
        return features

πŸ’‘

Latency Optimization Tips:

  1. Cache frequently accessed features
  2. Compute features in parallel
  3. Use efficient data structures (numpy arrays vs pandas)
  4. Consider model distillation for faster inference
  5. Use async I/O for database calls

8. Advanced Topics

8.1 Graph-based Fraud Detection

import networkx as nx
from node2vec import Node2Vec

class GraphFraudDetector:
    """Graph-based fraud detection using transaction networks"""
    
    def __init__(self):
        self.graph = nx.DiGraph()
        
    def build_transaction_graph(self, transactions):
        """Build graph from transaction data"""
        for txn in transactions:
            # Add nodes
            self.graph.add_node(txn.user_id, type='user')
            self.graph.add_node(txn.merchant_id, type='merchant')
            self.graph.add_node(txn.device_fingerprint, type='device')
            
            # Add edges with features
            self.graph.add_edge(
                txn.user_id, txn.merchant_id,
                amount=txn.amount,
                timestamp=txn.timestamp,
                is_fraud=txn.is_fraud
            )
    
    def detect_fraud_communities(self):
        """Detect fraud rings using community detection"""
        # Find strongly connected components
        communities = list(nx.strongly_connected_components(self.graph))
        
        fraud_communities = []
        for community in communities:
            # Check if community has high fraud concentration
            fraud_rate = self.compute_community_fraud_rate(community)
            if fraud_rate > 0.3:  # Threshold
                fraud_communities.append({
                    'nodes': community,
                    'fraud_rate': fraud_rate,
                    'size': len(community)
                })
        
        return fraud_communities
    
    def compute_node_features(self, node_id):
        """Compute graph-based features for a node"""
        node = self.graph.nodes[node_id]
        
        # Degree centrality
        in_degree = self.graph.in_degree(node_id)
        out_degree = self.graph.out_degree(node_id)
        
        # PageRank
        pagerank = nx.pagerank(self.graph, personalization={node_id: 1})
        
        # Triangle count (for clustering)
        triangles = nx.triangles(self.graph.to_undirected(), node_id)
        
        return {
            'in_degree': in_degree,
            'out_degree': out_degree,
            'pagerank': pagerank[node_id],
            'triangle_count': triangles,
            'node_type': node['type']
        }

8.2 Real-time Anomaly Detection

class RealTimeAnomalyDetector:
    """Detect anomalies in real-time using streaming algorithms"""
    
    def __init__(self):
        # Count-Min Sketch for frequency estimation
        self.count_min = CountMinSketch(width=1000000, depth=10)
        
        # HyperLogLog for cardinality estimation
        self.hll = HyperLogLog(error_rate=0.01)
        
        # Sliding window for recent patterns
        self.window_size = 3600  # 1 hour
        self.window = SlidingWindow(self.window_size)
        
    def update(self, transaction):
        """Update anomaly detection with new transaction"""
        # Update frequency sketch
        self.count_min.update(transaction.user_id)
        self.count_min.update(transaction.merchant_id)
        self.count_min.update(transaction.card_last_four)
        
        # Update cardinality estimator
        self.hll.add(transaction.user_id)
        
        # Add to sliding window
        self.window.add(transaction)
        
        # Check for anomalies
        anomalies = self.detect_anomalies(transaction)
        
        return anomalies
    
    def detect_anomalies(self, transaction):
        """Detect anomalies based on current state"""
        anomalies = []
        
        # Check frequency anomaly
        user_freq = self.count_min.estimate(transaction.user_id)
        if user_freq > self.get_frequency_threshold():
            anomalies.append({
                'type': 'HIGH_FREQUENCY',
                'entity': transaction.user_id,
                'frequency': user_freq,
                'threshold': self.get_frequency_threshold()
            })
        
        # Check velocity anomaly
        recent_count = self.window.count_entity(
            transaction.user_id, 
            window_seconds=300  # 5 minutes
        )
        if recent_count > self.get_velocity_threshold():
            anomalies.append({
                'type': 'VELOCITY_ANOMALY',
                'entity': transaction.user_id,
                'count': recent_count,
                'window': '5 minutes'
            })
        
        return anomalies

8.3 Explainable AI for Compliance

class FraudExplainer:
    """Generate explanations for fraud decisions"""
    
    def __init__(self):
        self.shap_explainer = shap.TreeExplainer(self.model)
        self.lime_explainer = lime.lime_tabular.LimeTabularExplainer(
            training_data=self.training_data,
            feature_names=self.feature_names,
            class_names=['Legitimate', 'Fraud']
        )
    
    def explain_prediction(self, transaction_features):
        """Generate explanation for a prediction"""
        # SHAP explanation
        shap_values = self.shap_explainer.shap_values(transaction_features)
        
        # Get top contributing features
        feature_importance = list(zip(
            self.feature_names,
            shap_values[0]
        ))
        feature_importance.sort(key=lambda x: abs(x[1]), reverse=True)
        
        # Generate human-readable explanation
        explanation = self.generate_narrative(
            transaction_features,
            feature_importance[:5]  # Top 5 features
        )
        
        return {
            'shap_values': shap_values,
            'top_features': feature_importance[:5],
            'narrative': explanation,
            'confidence': self.compute_confidence(shap_values)
        }
    
    def generate_narrative(self, features, top_features):
        """Generate human-readable explanation"""
        narrative_parts = []
        
        for feature_name, importance in top_features:
            if feature_name == 'is_unusual_country' and features[feature_name]:
                narrative_parts.append(
                    "Transaction from unusual country for this user"
                )
            elif feature_name == 'amount_normalized' and features[feature_name] > 3:
                narrative_parts.append(
                    f"Transaction amount ({features['amount']:.2f}) is "
                    f"{features['amount_normalized']:.1f}x higher than average"
                )
            elif feature_name == 'user_txn_count_1h' and features[feature_name] > 10:
                narrative_parts.append(
                    f"User has made {features['user_txn_count_1h']} transactions "
                    f"in the last hour (unusual frequency)"
                )
        
        return "Primary risk factors: " + "; ".join(narrative_parts)

ℹ️

Explainability Requirements: For financial fraud detection:

  1. Provide both global and local explanations
  2. Use multiple explanation methods (SHAP, LIME, rule-based)
  3. Ensure explanations are auditable and logged
  4. Allow analysts to provide feedback on explanation quality

9. Implementation Roadmap

Phase 1: Rule-based System (Weeks 1-2)

  • Implement basic rule engine
  • Set up transaction ingestion pipeline
  • Create basic monitoring dashboard
  • Establish baseline metrics

Phase 2: ML Models (Weeks 3-6)

  • Feature engineering pipeline
  • Train initial GBDT model
  • Implement model serving
  • Set up A/B testing framework

Phase 3: Advanced Detection (Weeks 7-10)

  • Graph-based fraud detection
  • Real-time anomaly detection
  • Multi-model ensemble
  • Advanced explainability

Phase 4: Optimization (Weeks 11-14)

  • Latency optimization
  • Cost optimization
  • Advanced monitoring
  • Feedback loop implementation

10. Summary and Key Takeaways

Architecture Recap

  1. Multi-layered detection: Rules + ML + Graph analysis
  2. Real-time feature computation: Streaming features with low latency
  3. Ensemble approach: Multiple models for different fraud types
  4. Explainable decisions: All decisions must be auditable

Key Metrics

  • Model Performance: Precision, Recall, F1, PR-AUC
  • Business Impact: Fraud rate, chargeback rate, false positive rate
  • Operational: Latency, throughput, error rate

Common Interview Mistakes

  1. Ignoring class imbalance (99.9% legitimate transactions)
  2. Not discussing explainability requirements
  3. Forgetting about feedback loops from analyst decisions
  4. Not considering latency requirements for real-time decisions
  5. Ignoring concept drift and model degradation

ℹ️

Final Interview Tip: Emphasize the business impact of fraud detection decisions. Discuss the trade-off between catching fraud and customer friction. Show understanding of both ML techniques and production requirements for financial systems.


Further Reading

  • "Fraud Detection in Financial Services" (IEEE Conference)
  • "Graph-Based Fraud Detection" (KDD Conference)
  • "Explainable AI for Financial Services" (NIST)
  • "Real-Time Anomaly Detection" (ACM Computing Surveys)
  • "Production Machine Learning Systems" (O'Reilly)

Advertisement