Anomaly Detection

Module 3: Advanced ML + Deep LearningFree Lesson

Advertisement

Anomaly Detection

πŸ’‘ Anomaly detection identifies data points that deviate significantly from normal patterns. It's critical for fraud detection, network intrusion, manufacturing defects, and medical diagnosis. This lesson covers statistical methods, Isolation Forest, Autoencoders, and clustering-based approaches.


1. Types of Anomalies

Architecture Diagram
                    Types of Anomalies
                            β”‚
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚               β”‚               β”‚
        Point           Contextual      Collective
        Anomaly          Anomaly         Anomaly
            β”‚               β”‚               β”‚
     Single point      Normal in        Sequence of
     is anomalous      one context,     points that
                       anomalous in     together are
                       another          anomalous

Point Anomaly

A single data point is far from the rest.

Contextual Anomaly

A data point is anomalous in a specific context but not otherwise.

Collective Anomaly

A collection of data points is anomalous, even if individual points are not.

DfAnomaly Detection

Anomaly detection (also called outlier detection) is the task of identifying data points, observations, or patterns that deviate significantly from the expected behavior in a dataset. Anomalies are rare events that may indicate errors, fraud, or novel phenomena.


2. Statistical Methods

Z-Score Method

DfZ-Score Anomaly Detection

If data follows a normal distribution, points with |z-score| > threshold are anomalies.

Z-Score

z=xβˆ’ΞΌΟƒz = \frac{x - \mu}{\sigma}

Here,

  • xx=Data point
  • ΞΌ\mu=Mean of distribution
  • Οƒ\sigma=Standard deviation

Mahalanobis Distance

Mahalanobis Distance

DM(x)=(xβˆ’ΞΌ)TΞ£βˆ’1(xβˆ’ΞΌ)D_M(\mathbf{x}) = \sqrt{(\mathbf{x} - \boldsymbol{\mu})^T \Sigma^{-1} (\mathbf{x} - \boldsymbol{\mu})}

Here,

  • x\mathbf{x}=Data point vector
  • ΞΌ\boldsymbol{\mu}=Mean vector
  • Ξ£\Sigma=Covariance matrix

Modified Z-Score (Robust)

Modified Z-Score (Robust)

Mi=0.6745β‹…(xiβˆ’x~)MADM_i = \frac{0.6745 \cdot (x_i - \tilde{x})}{\text{MAD}}

Here,

  • xix_i=Data point
  • x~\tilde{x}=Median of the data
  • MAD\text{MAD}=Median Absolute Deviation

ℹ️ Why Modified Z-Score?

The standard z-score assumes normality and is sensitive to outliers in the mean and standard deviation. The modified z-score uses the median and MAD, which are robust statistics that resist the influence of extreme values.

ThBreakdown Point of Robust Statistics

The median has a breakdown point of 50%, meaning up to 50% of the data can be contaminated without the median being arbitrarily affected. In contrast, the mean has a breakdown point of 0% β€” a single extreme value can shift it arbitrarily.

import numpy as np
from scipy import stats

class ZScoreDetector:
    def __init__(self, threshold=3.0):
        self.threshold = threshold
        self.mean = None
        self.std = None

    def fit(self, X):
        self.mean = np.mean(X, axis=0)
        self.std = np.std(X, axis=0)
        return self

    def predict(self, X):
        z_scores = np.abs((X - self.mean) / (self.std + 1e-8))
        return np.any(z_scores > self.threshold, axis=1)

class RobustZScoreDetector:
    def __init__(self, threshold=3.5):
        self.threshold = threshold
        self.median = None
        self.mad = None

    def fit(self, X):
        self.median = np.median(X, axis=0)
        self.mad = np.median(np.abs(X - self.median), axis=0)
        return self

    def predict(self, X):
        modified_z = 0.6745 * (X - self.median) / (self.mad + 1e-8)
        return np.any(np.abs(modified_z) > self.threshold, axis=1)

# Generate sample data
np.random.seed(42)
normal_data = np.random.randn(200, 2) * 0.5 + [5, 5]
anomalies = np.random.uniform(0, 10, (10, 2))
X = np.vstack([normal_data, anomalies])

# Z-Score
z_detector = ZScoreDetector(threshold=3.0).fit(X)
z_labels = z_detector.predict(X)
print(f"Z-Score anomalies: {z_labels.sum()}")

# Robust Z-Score
rz_detector = RobustZScoreDetector(threshold=3.5).fit(X)
rz_labels = rz_detector.predict(X)
print(f"Robust Z-Score anomalies: {rz_labels.sum()}")

πŸ“Z-Score Threshold Selection

For a normal distribution, the probability of |z| > 3 is approximately 0.27%. So with 1000 data points, we expect about 3 false positives. The threshold should be chosen based on:

  • Desired false positive rate
  • Cost of missing an anomaly vs. cost of a false alarm
  • Domain knowledge about acceptable deviation

3. Isolation Forest

DfIsolation Forest

Anomalies are easier to isolate than normal points. The algorithm recursively partitions data with random splits; anomalies require fewer splits to isolate.

Key insight: Anomalies are few and different β†’ they have shorter average path lengths in random trees.

Isolation Forest Anomaly Score

AnomalyΒ Score=2βˆ’E[h(x)]c(n)\text{Anomaly Score} = 2^{-\frac{E[h(x)]}{c(n)}}

Here,

  • E[h(x)]E[h(x)]=Average path length of point x
  • c(n)c(n)=Average path length in BST with n elements

πŸ’‘ Path Length Intuition

In a random binary tree, normal points (which appear frequently) require more splits to isolate because they share similar feature values with many other points. Anomalies, being distinct, are isolated quickly with fewer splits, resulting in shorter path lengths.

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np

class IsolationForestDetector:
    def __init__(self, contamination=0.1, n_estimators=100, random_state=42):
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=n_estimators,
            random_state=random_state
        )
        self.scaler = StandardScaler()

    def fit(self, X):
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled)
        return self

    def predict(self, X):
        X_scaled = self.scaler.transform(X)
        predictions = self.model.predict(X_scaled)
        return predictions == -1  # True = anomaly

    def score(self, X):
        X_scaled = self.scaler.transform(X)
        return -self.model.score_samples(X_scaled)  # Higher = more anomalous

# Example
np.random.seed(42)
X = np.vstack([
    np.random.randn(200, 3) * 0.5 + [0, 0, 0],    # Normal
    np.random.uniform(-3, 3, (15, 3)),              # Anomalies
])

iso_detector = IsolationForestDetector(contamination=0.07)
iso_detector.fit(X)

labels = iso_detector.predict(X)
print(f"Detected {labels.sum()} anomalies out of {len(X)} points")

scores = iso_detector.score(X)
top_anomalies = np.argsort(scores)[::-1][:5]
print("Top 5 anomalous points:", top_anomalies)

4. Local Outlier Factor (LOF)

DfLocal Outlier Factor

Measures the local density deviation of a data point with respect to its neighbors. Points with significantly lower density than their neighbors are considered outliers.

Local Outlier Factor

LOF(x)=βˆ‘o∈Nk(x)lrd(o)lrd(x)∣Nk(x)∣\text{LOF}(x) = \frac{\sum_{o \in N_k(x)} \frac{\text{lrd}(o)}{\text{lrd}(x)}}{|N_k(x)|}

Here,

  • Nk(x)N_k(x)=Set of k nearest neighbors of x
  • lrd(x)\text{lrd}(x)=Local reachability density of x

ℹ️ LOF vs Global Methods

LOF detects anomalies relative to their local neighborhood, not globally. A point might be normal globally but anomalous locally (or vice versa). This makes LOF effective for datasets with varying density clusters.

from sklearn.neighbors import LocalOutlierFactor

class LOFDetector:
    def __init__(self, n_neighbors=20, contamination=0.1):
        self.model = LocalOutlierFactor(
            n_neighbors=n_neighbors,
            contamination=contamination
        )

    def fit_predict(self, X):
        predictions = self.model.fit_predict(X)
        return predictions == -1

    def score(self, X):
        return -self.model.negative_outlier_factor_

lof_detector = LOFDetector(n_neighbors=20)
lof_labels = lof_detector.fit_predict(X)
print(f"LOF detected {lof_labels.sum()} anomalies")

5. DBSCAN-Based Detection

DfDBSCAN Anomaly Detection

Points that are not reachable from any core point (noise points) are considered anomalies.

ℹ️ DBSCAN Parameters

DBSCAN requires two parameters: eps (maximum distance between two points to be considered neighbors) and min_samples (minimum number of points to form a dense region). Points not in any dense region are labeled as noise (-1).

from sklearn.cluster import DBSCAN

class DBSCANDetector:
    def __init__(self, eps=0.5, min_samples=5):
        self.model = DBSCAN(eps=eps, min_samples=min_samples)

    def fit_predict(self, X):
        labels = self.model.fit_predict(X)
        return labels == -1  # Noise points = anomalies

dbscan_detector = DBSCANDetector(eps=0.8, min_samples=5)
dbscan_labels = dbscan_detector.fit_predict(X)
print(f"DBSCAN detected {dbscan_labels.sum()} anomalies")

6. Autoencoder-Based Detection

Reconstruction error as anomaly score:

Autoencoder Anomaly Score

AnomalyΒ Score=βˆ₯xβˆ’Decoder(Encoder(x))βˆ₯2\text{Anomaly Score} = \| \mathbf{x} - \text{Decoder}(\text{Encoder}(\mathbf{x})) \|^2

Here,

  • x\mathbf{x}=Input data point
  • Encoder\text{Encoder}=Encoder neural network
  • Decoder\text{Decoder}=Decoder neural network

DfAutoencoder for Anomaly Detection

An autoencoder learns to compress normal data into a latent representation and reconstruct it. Anomalies, which differ from the training distribution, will have high reconstruction error because the model cannot accurately reconstruct unfamiliar patterns.

πŸ’‘ Training Strategy

Train the autoencoder only on normal data. The model learns the distribution of normal patterns. During inference, anomalies will produce higher reconstruction error, allowing threshold-based detection.

import torch
import torch.nn as nn

class AnomalyAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=32, latent_dim=8):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, latent_dim),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim),
        )

    def forward(self, x):
        z = self.encoder(x)
        recon = self.decoder(z)
        return recon

    def anomaly_score(self, x):
        with torch.no_grad():
            recon = self.forward(x)
            return torch.mean((x - recon) ** 2, dim=1)

# Training
model = AnomalyAutoencoder(input_dim=3, hidden_dim=32, latent_dim=8)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

X_tensor = torch.FloatTensor(X[:200])  # Train on normal data only

for epoch in range(100):
    model.train()
    recon = model(X_tensor)
    loss = nn.MSELoss()(recon, X_tensor)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch+1} | Loss: {loss.item():.4f}")

# Detect anomalies
model.eval()
scores = model.anomaly_score(torch.FloatTensor(X))
threshold = scores.quantile(0.95)
anomaly_mask = scores > threshold
print(f"Autoencoder detected {anomaly_mask.sum().item()} anomalies")

πŸ“Autoencoder Reconstruction Error

For normal data point x_normal = [1.0, 2.0, 3.0]:

  • Encoder: [1.0, 2.0, 3.0] β†’ latent [0.5, -0.3, 0.8, ...]
  • Decoder: [0.5, -0.3, 0.8, ...] β†’ [0.98, 1.97, 3.02]
  • MSE: 0.0003 (low)

For anomalous point x_anomaly = [10.0, -5.0, 0.5]:

  • Encoder: [10.0, -5.0, 0.5] β†’ latent [0.3, -0.1, 0.6, ...]
  • Decoder: [0.3, -0.1, 0.6, ...] β†’ [1.2, 2.1, 2.8]
  • MSE: 58.7 (high β†’ anomaly detected)

7. Ensemble Methods

Feature Bagging

Train multiple detectors on different feature subsets and combine predictions.

Average of Scores

Ensemble Anomaly Score

EnsembleΒ Score(x)=1Mβˆ‘m=1Msm(x)\text{Ensemble Score}(\mathbf{x}) = \frac{1}{M} \sum_{m=1}^{M} s_m(\mathbf{x})

Here,

  • MM=Number of detectors in ensemble
  • sm(x)s_m(\mathbf{x})=Anomaly score from detector m
class EnsembleDetector:
    def __init__(self):
        self.detectors = {
            'zscore': ZScoreDetector(threshold=3.0),
            'isolation': IsolationForestDetector(contamination=0.1),
            'lof': LOFDetector(n_neighbors=20),
        }

    def fit(self, X):
        for name, det in self.detectors.items():
            if name == 'lof':
                continue  # LOF doesn't need explicit fit
            det.fit(X)
        return self

    def predict(self, X, threshold=0.5):
        predictions = []
        for name, det in self.detectors.items():
            if name == 'lof':
                pred = det.fit_predict(X)
            elif name == 'zscore':
                pred = det.predict(X)
            else:
                pred = det.predict(X)
            predictions.append(pred.astype(float))

        avg_score = np.mean(predictions, axis=0)
        return avg_score > threshold

ensemble = EnsembleDetector().fit(X)
ensemble_labels = ensemble.predict(X)
print(f"Ensemble detected {ensemble_labels.sum()} anomalies")

ℹ️ Ensemble Robustness

Ensemble methods improve robustness by combining multiple detection strategies. If one method misses an anomaly that another catches, the ensemble can still detect it. This diversity reduces both false positives and false negatives.


8. Evaluation Metrics

Supervised Evaluation

Precision and Recall

Precision=TPTP+FPRecall=TPTP+FN\text{Precision} = \frac{TP}{TP + FP} \qquad \text{Recall} = \frac{TP}{TP + FN}

Here,

  • TPTP=True Positives (correctly detected anomalies)
  • FPFP=False Positives (normal points flagged as anomalies)
  • FNFN=False Negatives (missed anomalies)

F1 Score

F1=2β‹…Precisionβ‹…RecallPrecision+RecallF_1 = 2 \cdot \frac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}

Here,

  • PrecisionPrecision=Accuracy of anomaly predictions
  • RecallRecall=Coverage of actual anomalies

Unsupervised Evaluation

AUC-ROC (when labels available):

from sklearn.metrics import roc_auc_score, precision_recall_fscore_support, average_precision_score

def evaluate_anomaly_detection(y_true, y_scores, y_pred):
    metrics = {
        'AUC-ROC': roc_auc_score(y_true, y_scores),
        'AUC-PR': average_precision_score(y_true, y_scores),
    }
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average='binary'
    )
    metrics['Precision'] = precision
    metrics['Recall'] = recall
    metrics['F1'] = f1
    return metrics

# Example evaluation
y_true = np.array([0]*200 + [1]*10)  # 200 normal, 10 anomalous
y_scores = scores.numpy()
y_pred = anomaly_mask.numpy()

metrics = evaluate_anomaly_detection(y_true, y_scores, y_pred)
for name, value in metrics.items():
    print(f"{name}: {value:.4f}")

πŸ’‘ Metric Selection for Imbalanced Data

Standard accuracy is misleading for anomaly detection because anomalies are rare. Use AUC-PR (Area Under Precision-Recall Curve) when the positive class (anomalies) is rare. AUC-ROC can be optimistic with high class imbalance.


9. Practical Considerations

Feature Engineering for Anomalies

  • Temporal features: Time since last event, frequency
  • Statistical features: Rolling mean, std, percentiles
  • Domain-specific features: Business metrics, ratios

Handling Imbalanced Data

  • Resampling: SMOTE for minority class
  • Cost-sensitive learning: Weight anomalies higher
  • Threshold tuning: Optimize F1 instead of accuracy

Real-Time Detection

  • Online algorithms: Process streaming data
  • Approximate methods: Random projections, sketching
  • GPU acceleration: Batch processing for deep models

10. Key Takeaways

πŸ“‹Summary: Anomaly Detection

  • Statistical methods (Z-score, Mahalanobis) work when data follows known distributions
  • Isolation Forest is efficient and handles high-dimensional data well
  • LOF detects local anomalies in varying density regions
  • DBSCAN finds noise points as anomalies without requiring density thresholds
  • Autoencoders learn normal patterns; reconstruction error flags anomalies
  • Ensemble methods combine multiple detectors for robust performance
  • Evaluate with AUC-ROC, Precision-Recall, and F1 score
  • Domain knowledge is crucial for feature engineering and threshold setting
  • The choice of method depends on data characteristics (dimensionality, density variation, labeled data availability)
  • Robust statistics (median, MAD) are preferred over mean/std when contamination is high

11. Practice Exercises

Exercise 1: Credit Card Fraud Detection

# TODO: Load credit card fraud dataset (e.g., from Kaggle)
# Compare: Isolation Forest, LOF, Autoencoder
# Handle class imbalance (fraud ~0.17%)
# Target: Recall > 0.8, Precision > 0.7

Exercise 2: Network Intrusion Detection

# TODO: Use KDD Cup 1999 or NSL-KDD dataset
# Build ensemble of Isolation Forest + DBSCAN
# Feature engineering: connection duration, packet counts
# Evaluate on binary (normal vs attack) and multi-class

Exercise 3: Time Series Anomaly Detection

# TODO: Detect anomalies in server metrics (CPU, memory, requests)
# Use: Rolling statistics + Isolation Forest
# Compare with Autoencoder approach
# Handle concept drift (patterns change over time)

Exercise 4: Compare Detection Methods

# TODO: Generate synthetic data with known anomalies
# Test: Z-score, Isolation Forest, LOF, DBSCAN, Autoencoder
# Compare: precision, recall, speed, scalability
# Find: which method works best for each anomaly type

Advertisement

Need Expert Data Science Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement