πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Anomaly Detection with GenAI

🟒 Free Lesson

Advertisement

Anomaly Detection with GenAI

Anomaly Detection with GenAIInput DataFeature ExtractAnomaly ScoreThresholdAnomaly AlertDetection Methodsβ€’ Autoencoders (Reconstruction Error)β€’ VAE-based Anomaly Detectionβ€’ GAN-based Detectorsβ€’ Transformer Anomaly DetectionApplicationsβ€’ Fraud Detectionβ€’ Network Intrusion Detectionβ€’ Manufacturing Defectsβ€’ Medical Diagnosis

Autoencoder-Based Detection

Autoencoders learn to compress and reconstruct normal data. Anomalies produce high reconstruction error because the model hasn't learned their patterns.

import numpy as np
import torch
import torch.nn as nn
from typing import Tuple

class AutoencoderAnomalyDetector:
    def __init__(self, input_dim: int, latent_dim: int = 32):
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Linear(128, input_dim)
        )
        
        self.threshold = None
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
    
    def train(self, normal_data, epochs=50, lr=0.001):
        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        criterion = nn.MSELoss()
        
        for epoch in range(epochs):
            reconstructed = self.forward(normal_data)
            loss = criterion(reconstructed, normal_data)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        with torch.no_grad():
            reconstructed = self.forward(normal_data)
            errors = torch.mean((normal_data - reconstructed) ** 2, dim=1)
            self.threshold = errors.mean() + 2 * errors.std()
    
    def detect(self, data) -> Tuple[np.ndarray, np.ndarray]:
        with torch.no_grad():
            reconstructed = self.forward(data)
            errors = torch.mean((data - reconstructed) ** 2, dim=1)
        
        anomalies = errors > self.threshold
        return anomalies.numpy(), errors.numpy()

detector = AutoencoderAnomalyDetector(input_dim=10)
detector.train(normal_training_data)
is_anomaly, scores = detector.detect(test_data)

VAE for Anomaly Detection

class VAEAnomalyDetector(nn.Module):
    def __init__(self, input_dim: int, latent_dim: int = 32):
        super().__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU()
        )
        
        self.mu_layer = nn.Linear(64, latent_dim)
        self.logvar_layer = nn.Linear(64, latent_dim)
        
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )
    
    def encode(self, x):
        h = self.encoder(x)
        return self.mu_layer(h), self.logvar_layer(h)
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        return self.decoder(z)
    
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar
    
    def anomaly_score(self, x):
        reconstructed, mu, logvar = self.forward(x)
        
        recon_error = torch.mean((x - reconstructed) ** 2, dim=1)
        kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1)
        
        return recon_error + 0.1 * kl_div

vae = VAEAnomalyDetector(input_dim=10)
scores = vae.anomaly_score(test_data)
is_anomaly = scores > threshold

GAN-Based Detection

class GANDiscriminator(nn.Module):
    def __init__(self, input_dim: int):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        return self.model(x)

class GANAnomalyDetector:
    def __init__(self, input_dim: int):
        self.discriminator = GANDiscriminator(input_dim)
        self.threshold = None
    
    def train(self, normal_data, epochs=100):
        optimizer = torch.optim.Adam(self.discriminator.parameters(), lr=0.0002)
        criterion = nn.BCELoss()
        
        for epoch in range(epochs):
            real_output = self.discriminator(normal_data)
            loss = criterion(real_output, torch.ones_like(real_output))
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        with torch.no_grad():
            scores = self.discriminator(normal_data).squeeze()
            self.threshold = scores.mean() - 2 * scores.std()
    
    def detect(self, data):
        with torch.no_grad():
            scores = self.discriminator(data).squeeze()
        return scores < self.threshold, scores.numpy()

gan_detector = GANAnomalyDetector(input_dim=10)
gan_detector.train(normal_data)
is_anomaly, scores = gan_detector.detect(test_data)

Ensemble Methods

class EnsembleAnomalyDetector:
    def __init__(self):
        self.detectors = []
        self.weights = []
    
    def add_detector(self, detector, weight=1.0):
        self.detectors.append(detector)
        self.weights.append(weight)
    
    def detect(self, data):
        all_scores = []
        
        for detector, weight in zip(self.detectors, self.weights):
            _, scores = detector.detect(data)
            normalized_scores = (scores - scores.mean()) / (scores.std() + 1e-8)
            all_scores.append(normalized_scores * weight)
        
        ensemble_scores = np.mean(all_scores, axis=0)
        threshold = np.percentile(ensemble_scores, 95)
        
        return ensemble_scores > threshold, ensemble_scores

ensemble = EnsembleAnomalyDetector()
ensemble.add_detector(ae_detector, weight=1.0)
ensemble.add_detector(vae_detector, weight=0.8)
ensemble.add_detector(gan_detector, weight=0.6)
is_anomaly, scores = ensemble.detect(test_data)

Best Practices

  • Train only on normal data for unsupervised detection
  • Tune thresholds based on false positive tolerance
  • Use ensemble methods for robust detection
  • Monitor concept drift in production
  • Implement explainability for detected anomalies
  • Validate with domain experts
⭐

Premium Content

Anomaly Detection with GenAI

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Generative AI Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement