πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Anomaly Detection in Streaming Data

⭐ Premium

Advertisement

Anomaly Detection in Streaming Data

Data streams arrive continuously – IoT sensors, network traffic, financial transactions. Batch processing can't keep up. Streaming algorithms process data in real-time with bounded memory, detecting anomalies as they occur.

Why Streaming Anomaly Detection Matters

A DDoS attack generates millions of requests per second. A fraudulent transaction pattern emerges in milliseconds. Batch processing by hour is too slow. Streaming detection catches threats in real-time.

import numpy as np
import pandas as pd
from collections import defaultdict, deque
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

Streaming Data Fundamentals

class DataStream:
    """Simulate a data stream with concept drift."""
    
    def __init__(self, n_points=10000, drift_point=7000):
        self.n_points = n_points
        self.drift_point = drift_point
        self.current = 0
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current >= self.n_points:
            raise StopIteration
        
        t = self.current
        self.current += 1
        
        if t < self.drift_point:
            # Normal distribution
            return {
                'timestamp': t,
                'value': np.random.normal(50, 5),
                'label': 'normal'
            }
        else:
            # Concept drift – distribution changes
            if np.random.random() < 0.1:
                return {
                    'timestamp': t,
                    'value': np.random.normal(80, 10),  # Anomaly
                    'label': 'anomaly'
                }
            else:
                return {
                    'timestamp': t,
                    'value': np.random.normal(60, 8),  # Shifted mean
                    'label': 'normal'
                }

# Test stream
stream = DataStream()
values = [next(stream)['value'] for _ in range(1000)]
print(f"Stream values: mean={np.mean(values):.2f}, std={np.std(values):.2f}")

Exponential Moving Average (EMA) Detector

class EMADetector:
    """EMA-based anomaly detection for streaming data."""
    
    def __init__(self, alpha=0.1, threshold=3.0, warmup=100):
        self.alpha = alpha
        self.threshold = threshold
        self.warmup = warmup
        self.ema = None
        self.ema_var = None
        self.count = 0
    
    def update(self, value):
        self.count += 1
        
        if self.ema is None:
            self.ema = value
            self.ema_var = 0
            return False
        
        # Update EMA
        delta = value - self.ema
        self.ema += self.alpha * delta
        self.ema_var = (1 - self.alpha) * (self.ema_var + self.alpha * delta ** 2)
        
        # Check anomaly after warmup
        if self.count < self.warmup:
            return False
        
        std = np.sqrt(self.ema_var)
        z_score = abs(value - self.ema) / (std + 1e-10)
        
        return z_score > self.threshold

# Test EMA detector
ema_detector = EMADetector(alpha=0.1, threshold=3.0)
stream = DataStream()

anomalies = []
for point in stream:
    is_anomaly = ema_detector.update(point['value'])
    if is_anomaly:
        anomalies.append(point)

print(f"EMA detected {len(anomalies)} anomalies")
print(f"Anomaly rate: {len(anomalies) / 10000:.2%}")

Sliding Window Statistics

class SlidingWindowDetector:
    """Detect anomalies using sliding window statistics."""
    
    def __init__(self, window_size=100, threshold=3.0):
        self.window_size = window_size
        self.threshold = threshold
        self.window = deque(maxlen=window_size)
    
    def update(self, value):
        self.window.append(value)
        
        if len(self.window) < self.window_size // 2:
            return False
        
        arr = np.array(self.window)
        mean = np.mean(arr)
        std = np.std(arr) + 1e-10
        
        z_score = abs(value - mean) / std
        return z_score > self.threshold
    
    def get_statistics(self):
        arr = np.array(self.window)
        return {
            'mean': np.mean(arr),
            'std': np.std(arr),
            'min': np.min(arr),
            'max': np.max(arr)
        }

# Test
window_detector = SlidingWindowDetector(window_size=100, threshold=3.0)
stream = DataStream()
anomalies = sum(1 for _ in range(10000) if window_detector.update(next(stream)['value']))
print(f"Window detector: {anomalies} anomalies")

Count-Min Sketch

Probabilistic data structure for frequency estimation in streams.

class CountMinSketch:
    """Count-Min Sketch for frequency estimation."""
    
    def __init__(self, width=1000, depth=5):
        self.width = width
        self.depth = depth
        self.table = np.zeros((depth, width), dtype=int)
        self.hashes = [hash(f"seed_{i}") for i in range(depth)]
    
    def add(self, item, count=1):
        for i in range(self.depth):
            idx = (self.hashes[i] + hash(str(item))) % self.width
            self.table[i, idx] += count
    
    def estimate(self, item):
        estimates = []
        for i in range(self.depth):
            idx = (self.hashes[i] + hash(str(item))) % self.width
            estimates.append(self.table[i, idx])
        return min(estimates)  # Conservative estimate
    
    def get_heavy_hitters(self, threshold=100):
        """Find items appearing more than threshold times."""
        heavy = []
        for col in range(self.width):
            min_count = min(self.table[:, col])
            if min_count >= threshold:
                heavy.append((col, min_count))
        return sorted(heavy, key=lambda x: -x[1])

# Test
cms = CountMinSketch(width=10000, depth=5)

# Simulate stream
stream_items = np.random.choice(['item_A', 'item_B', 'item_C', 'item_D'], 10000, 
                                 p=[0.5, 0.3, 0.15, 0.05])

for item in stream_items:
    cms.add(item)

print("Frequency estimates:")
for item in ['item_A', 'item_B', 'item_C', 'item_D', 'item_E']:
    est = cms.estimate(item)
    print(f"  {item}: ~{est}")

Count-Min Sketch for Anomaly Detection

class CMSAnomalyDetector:
    """Detect frequency anomalies using Count-Min Sketch."""
    
    def __init__(self, window_size=1000, threshold=3.0):
        self.window_size = window_size
        self.threshold = threshold
        self.cms = CountMinSketch(width=5000, depth=5)
        self.total = 0
        self.window_items = deque(maxlen=window_size)
    
    def update(self, item):
        # Estimate current frequency
        freq = self.cms.estimate(item)
        
        # Add to sketch
        self.cms.add(item)
        self.total += 1
        self.window_items.append(item)
        
        if self.total < self.window_size // 2:
            return False, freq
        
        # Expected frequency (uniform distribution assumption)
        unique_items = len(set(self.window_items))
        expected_freq = self.total / max(unique_items, 1)
        
        # Z-score
        if expected_freq > 0:
            z = (freq - expected_freq) / (np.sqrt(expected_freq) + 1e-10)
            return z > self.threshold, freq
        
        return False, freq

# Test
cms_detector = CMSAnomalyDetector()
items = ['normal'] * 900 + ['burst_item'] * 100
np.random.shuffle(items)

anomalies = 0
for item in items:
    is_anomaly, freq = cms_detector.update(item)
    if is_anomaly:
        anomalies += 1

print(f"CMS anomaly detector: {anomalies} anomalies detected")

Sketching for Distinct Counting

class HyperLogLog:
    """HyperLogLog for approximate distinct counting."""
    
    def __init__(self, precision=10):
        self.precision = precision
        self.m = 1 << precision
        self.registers = np.zeros(self.m, dtype=int)
    
    def add(self, item):
        h = hash(str(item))
        idx = h & (self.m - 1)
        remaining = h >> self.precision
        
        # Count leading zeros
        if remaining == 0:
            self.registers[idx] = max(self.registers[idx], 1)
        else:
            leading_zeros = 1
            while (remaining & 1) == 0 and leading_zeros < 64:
                leading_zeros += 1
                remaining >>= 1
            self.registers[idx] = max(self.registers[idx], leading_zeros)
    
    def count(self):
        """Estimate number of distinct elements."""
        alpha = 0.7213 / (1 + 1.079 / self.m)
        raw = alpha * self.m ** 2 / sum(2 ** (-r) for r in self.registers)
        
        # Small range correction
        if raw <= 5 * self.m / 2:
            zeros = np.sum(self.registers == 0)
            if zeros > 0:
                return self.m * np.log(self.m / zeros)
        
        return raw

# Test
hll = HyperLogLog(precision=10)
for i in range(10000):
    hll.add(f"item_{i % 5000}")

print(f"HyperLogLog estimated distinct: {hll.count():.0f}")
print(f"Actual distinct: 5000")

Reservoir Sampling

class ReservoirSampling:
    """Reservoir sampling for streaming data."""
    
    def __init__(self, k=100):
        self.k = k
        self.reservoir = []
        self.count = 0
    
    def update(self, item):
        self.count += 1
        
        if len(self.reservoir) < self.k:
            self.reservoir.append(item)
        else:
            j = np.random.randint(0, self.count)
            if j < self.k:
                self.reservoir[j] = item
    
    def get_sample(self):
        return self.reservoir

# Test
reservoir = ReservoirSampling(k=100)
for i in range(10000):
    reservoir.update(i)

sample = reservoir.get_sample()
print(f"Reservoir sample: {len(sample)} items")
print(f"Sample range: {min(sample)} to {max(sample)}")

Z-Score Streaming Detector

class StreamingZScore:
    """Welford's online algorithm for streaming z-score detection."""
    
    def __init__(self, threshold=3.0, window=1000):
        self.threshold = threshold
        self.window = window
        self.n = 0
        self.mean = 0
        self.M2 = 0
        self.values = deque(maxlen=window)
    
    def update(self, value):
        self.n += 1
        self.values.append(value)
        
        # Welford's online algorithm
        delta = value - self.mean
        self.mean += delta / self.n
        delta2 = value - self.mean
        self.M2 += delta * delta2
        
        if self.n < 2:
            return False
        
        variance = self.M2 / (self.n - 1)
        std = np.sqrt(variance)
        
        if std == 0:
            return False
        
        z_score = abs(value - self.mean) / std
        return z_score > self.threshold

# Test
z_detector = StreamingZScore(threshold=3.0)
stream = DataStream()

anomalies = 0
for _ in range(10000):
    point = next(stream)
    if z_detector.update(point['value']):
        anomalies += 1

print(f"Z-score detector: {anomalies} anomalies")

Best Practices

  1. Choose window size carefully – too small misses patterns, too slow to adapt
  2. Use EMA for concept drift – adapts to changing distributions
  3. Count-Min Sketch for frequencies – O(1) updates, bounded memory
  4. HyperLogLog for distinct counts – 1% error with tiny memory footprint
  5. Combine detectors – ensemble for robustness
  6. Set thresholds on validation data – don't guess thresholds

Summary

Streaming anomaly detection requires algorithms with bounded memory and O(1) updates. EMA and sliding windows detect point anomalies, Count-Min Sketch tracks frequencies, and HyperLogLog counts distinct elements. Master these techniques for real-time monitoring and threat detection.

Advertisement