Anomaly Detection with GenAI
Autoencoder-Based Detection
Autoencoders learn to compress and reconstruct normal data. Anomalies produce high reconstruction error because the model hasn't learned their patterns.
import numpy as np
import torch
import torch.nn as nn
from typing import Tuple
class AutoencoderAnomalyDetector:
def __init__(self, input_dim: int, latent_dim: int = 32):
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, latent_dim)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Linear(128, input_dim)
)
self.threshold = None
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def train(self, normal_data, epochs=50, lr=0.001):
optimizer = torch.optim.Adam(self.parameters(), lr=lr)
criterion = nn.MSELoss()
for epoch in range(epochs):
reconstructed = self.forward(normal_data)
loss = criterion(reconstructed, normal_data)
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
reconstructed = self.forward(normal_data)
errors = torch.mean((normal_data - reconstructed) ** 2, dim=1)
self.threshold = errors.mean() + 2 * errors.std()
def detect(self, data) -> Tuple[np.ndarray, np.ndarray]:
with torch.no_grad():
reconstructed = self.forward(data)
errors = torch.mean((data - reconstructed) ** 2, dim=1)
anomalies = errors > self.threshold
return anomalies.numpy(), errors.numpy()
detector = AutoencoderAnomalyDetector(input_dim=10)
detector.train(normal_training_data)
is_anomaly, scores = detector.detect(test_data)
VAE for Anomaly Detection
class VAEAnomalyDetector(nn.Module):
def __init__(self, input_dim: int, latent_dim: int = 32):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU()
)
self.mu_layer = nn.Linear(64, latent_dim)
self.logvar_layer = nn.Linear(64, latent_dim)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Linear(128, input_dim)
)
def encode(self, x):
h = self.encoder(x)
return self.mu_layer(h), self.logvar_layer(h)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
return self.decoder(z)
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
return self.decode(z), mu, logvar
def anomaly_score(self, x):
reconstructed, mu, logvar = self.forward(x)
recon_error = torch.mean((x - reconstructed) ** 2, dim=1)
kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1)
return recon_error + 0.1 * kl_div
vae = VAEAnomalyDetector(input_dim=10)
scores = vae.anomaly_score(test_data)
is_anomaly = scores > threshold
GAN-Based Detection
class GANDiscriminator(nn.Module):
def __init__(self, input_dim: int):
super().__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 128),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.LeakyReLU(0.2),
nn.Dropout(0.3),
nn.Linear(64, 1),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
class GANAnomalyDetector:
def __init__(self, input_dim: int):
self.discriminator = GANDiscriminator(input_dim)
self.threshold = None
def train(self, normal_data, epochs=100):
optimizer = torch.optim.Adam(self.discriminator.parameters(), lr=0.0002)
criterion = nn.BCELoss()
for epoch in range(epochs):
real_output = self.discriminator(normal_data)
loss = criterion(real_output, torch.ones_like(real_output))
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
scores = self.discriminator(normal_data).squeeze()
self.threshold = scores.mean() - 2 * scores.std()
def detect(self, data):
with torch.no_grad():
scores = self.discriminator(data).squeeze()
return scores < self.threshold, scores.numpy()
gan_detector = GANAnomalyDetector(input_dim=10)
gan_detector.train(normal_data)
is_anomaly, scores = gan_detector.detect(test_data)
Ensemble Methods
class EnsembleAnomalyDetector:
def __init__(self):
self.detectors = []
self.weights = []
def add_detector(self, detector, weight=1.0):
self.detectors.append(detector)
self.weights.append(weight)
def detect(self, data):
all_scores = []
for detector, weight in zip(self.detectors, self.weights):
_, scores = detector.detect(data)
normalized_scores = (scores - scores.mean()) / (scores.std() + 1e-8)
all_scores.append(normalized_scores * weight)
ensemble_scores = np.mean(all_scores, axis=0)
threshold = np.percentile(ensemble_scores, 95)
return ensemble_scores > threshold, ensemble_scores
ensemble = EnsembleAnomalyDetector()
ensemble.add_detector(ae_detector, weight=1.0)
ensemble.add_detector(vae_detector, weight=0.8)
ensemble.add_detector(gan_detector, weight=0.6)
is_anomaly, scores = ensemble.detect(test_data)
Best Practices
- Train only on normal data for unsupervised detection
- Tune thresholds based on false positive tolerance
- Use ensemble methods for robust detection
- Monitor concept drift in production
- Implement explainability for detected anomalies
- Validate with domain experts