🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Real-Time Streaming Dashboards (Kafka + Flink + Grafana)

Data Engineering ProjectsReal-Time Analytics⭐ Premium

Advertisement

Real-Time Streaming Dashboards

Kafka + Flink + Prometheus + Grafana for Live Monitoring

ℹ️

Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS/GCP Build real-time operational dashboards that update every second, enabling instant visibility into system health, business metrics, and user behavior.

Project Overview

Problem Statement

Traditional dashboards rely on batch data, creating visibility gaps that delay incident response and business decisions. Operations teams need real-time visibility into system performance, while business teams need instant access to live metrics.

Objectives

  1. Achieve sub-second dashboard refresh rates
  2. Process 1M+ metrics per second
  3. Support complex windowed aggregations
  4. Enable drill-down from summary to detail
  5. Maintain 99.99% dashboard availability

Tech Stack

ComponentTechnologyPurpose
Event StreamingApache KafkaMetric ingestion
Stream ProcessingApache FlinkReal-time aggregation
Time-Series DBPrometheus + ThanosMetric storage
VisualizationGrafanaDashboards and alerts
AlertingAlertManagerIncident management

Architecture Diagram

METRIC SOURCESApplications (StatsD)Infrastructure (Node)Business Events (Custom)External APIs (Webhooks)APACHE KAFKAmetrics-raw topicmetrics-agg topicalerts topicAPACHE FLINKReal-time Aggregation (1s)Windowed Aggregation (5m/1h)Anomaly Detection (ML)STORAGE LAYERSPrometheus (Real-time)Thanos (Long-term)Grafana (Query)VISUALIZATIONOps Dashboard (Grafana)Business DashboardExecutive DashboardAlert Manager

Data Source Setup and Schema

Metric Schema Definitions

# schemas/metrics.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List
from enum import Enum

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"
    SUMMARY = "summary"

@dataclass
class MetricDefinition:
    name: str
    metric_type: MetricType
    description: str
    labels: List[str] = field(default_factory=list)
    retention_days: int = 30

@dataclass
class MetricPoint:
    metric_name: str
    value: float
    timestamp: datetime
    labels: Dict[str, str] = field(default_factory=dict)

    def to_prometheus_text(self) -> str:
        label_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
        if label_str:
            label_str = "{" + label_str + "}"
        return f"{self.metric_name}{label_str} {self.value} {int(self.timestamp.timestamp() * 1000)}"

METRIC_DEFINITIONS = {
    "http_requests_total": MetricDefinition(
        name="http_requests_total",
        metric_type=MetricType.COUNTER,
        description="Total HTTP requests",
        labels=["method", "path", "status", "instance"]
    ),
    "http_request_duration_seconds": MetricDefinition(
        name="http_request_duration_seconds",
        metric_type=MetricType.HISTOGRAM,
        description="HTTP request duration",
        labels=["method", "path", "instance"]
    ),
    "cpu_usage_percent": MetricDefinition(
        name="cpu_usage_percent",
        metric_type=MetricType.GAUGE,
        description="CPU usage percentage",
        labels=["instance", "job"]
    ),
    "memory_usage_bytes": MetricDefinition(
        name="memory_usage_bytes",
        metric_type=MetricType.GAUGE,
        description="Memory usage in bytes",
        labels=["instance", "job"]
    ),
    "orders_per_minute": MetricDefinition(
        name="orders_per_minute",
        metric_type=MetricType.COUNTER,
        description="Orders processed per minute",
        labels=["region", "product_line"]
    ),
    "revenue_per_hour": MetricDefinition(
        name="revenue_per_hour",
        metric_type=MetricType.COUNTER,
        description="Revenue generated per hour",
        labels=["region", "payment_method"]
    ),
    "user_sessions_active": MetricDefinition(
        name="user_sessions_active",
        metric_type=MetricType.GAUGE,
        description="Active user sessions",
        labels=["platform", "country"]
    )
}

Step-by-Step Implementation Guide

Step 1: Kafka Metric Producer

# producers/metric_producer.py
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from typing import Dict
import time, random, logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class MetricProducer:
    def __init__(self, config: Dict):
        self.producer = Producer({
            'bootstrap.servers': config['kafka_brokers'],
            'client.id': 'metric-producer',
            'acks': 'all',
            'retries': 5,
            'compression.type': 'snappy'
        })
        self.schema_registry = SchemaRegistryClient({'url': config['schema_registry_url']})
        self.metric_schema = """
        {
            "type": "record", "name": "Metric",
            "namespace": "com.company.metrics",
            "fields": [
                {"name": "metric_name", "type": "string"},
                {"name": "value", "type": "double"},
                {"name": "timestamp", "type": "long"},
                {"name": "labels", "type": {"type": "map", "values": "string"}},
                {"name": "source", "type": "string"}
            ]
        }
        """
        self.serializer = AvroSerializer(self.schema_registry, self.metric_schema,
            conf={'auto.register.schemas': True})

    def produce_metric(self, metric_name: str, value: float,
                      labels: Dict[str, str], source: str = "app"):
        metric = {
            "metric_name": metric_name, "value": value,
            "timestamp": int(time.time() * 1000),
            "labels": labels, "source": source
        }
        key = f"{metric_name}:{labels.get('instance', 'default')}"
        try:
            self.producer.produce(topic="metrics-raw", key=key.encode('utf-8'),
                value=self.serializer(metric, {"subject": "metrics-raw-value"}),
                callback=lambda err, msg: logger.error(f"Delivery failed: {err}") if err else None)
            self.producer.poll(0)
        except Exception as e:
            logger.error(f"Failed to produce metric: {e}")

    def produce_http_metrics(self, endpoint: str, method: str,
                            status_code: int, duration_ms: float, instance: str):
        self.produce_metric("http_requests_total", 1,
            {"method": method, "path": endpoint, "status": str(status_code), "instance": instance})
        self.produce_metric("http_request_duration_seconds", duration_ms / 1000,
            {"method": method, "path": endpoint, "instance": instance})

    def produce_system_metrics(self, instance: str, cpu_percent: float,
                              memory_bytes: int, disk_usage_percent: float):
        self.produce_metric("cpu_usage_percent", cpu_percent, {"instance": instance, "job": "system"})
        self.produce_metric("memory_usage_bytes", memory_bytes, {"instance": instance, "job": "system"})
        self.produce_metric("disk_usage_percent", disk_usage_percent, {"instance": instance, "job": "system"})

    def produce_business_metrics(self, region: str, orders: int,
                                revenue: float, active_sessions: int):
        self.produce_metric("orders_per_minute", orders, {"region": region, "product_line": "all"})
        self.produce_metric("revenue_per_hour", revenue, {"region": region, "payment_method": "all"})
        self.produce_metric("user_sessions_active", active_sessions, {"platform": "web", "country": "US"})

    def generate_synthetic_metrics(self, duration_seconds: int = 60,
                                  metrics_per_second: int = 1000):
        instances = [f"instance-{i}" for i in range(10)]
        regions = ["us-east-1", "us-west-2", "eu-west-1"]
        end_time = time.time() + duration_seconds
        while time.time() < end_time:
            for _ in range(metrics_per_second):
                instance = random.choice(instances)
                self.produce_http_metrics(
                    endpoint=random.choice(["/api/users", "/api/orders", "/api/products"]),
                    method=random.choice(["GET", "POST", "PUT"]),
                    status_code=random.choice([200, 200, 200, 200, 400, 404, 500]),
                    duration_ms=random.expovariate(1/100), instance=instance)
                self.produce_system_metrics(instance=instance,
                    cpu_percent=random.gauss(50, 20),
                    memory_bytes=random.randint(1_000_000_000, 4_000_000_000),
                    disk_usage_percent=random.gauss(60, 15))
                if random.random() < 0.1:
                    region = random.choice(regions)
                    self.produce_business_metrics(region=region, orders=random.randint(1, 10),
                        revenue=random.uniform(100, 10000), active_sessions=random.randint(100, 1000))
            time.sleep(1)

    def close(self):
        self.producer.flush()

Step 2: Flink Stream Processing

# flink/stream_processor.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table.expressions import col
from pyflink.table.window import Tumble

class StreamingMetricsProcessor:
    def __init__(self):
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.env.set_parallelism(4)
        self.t_env = StreamTableEnvironment.create(self.env)

    def setup_kafka_source(self):
        self.t_env.execute_sql("""
            CREATE TABLE metrics_raw (
                metric_name STRING, value DOUBLE, `timestamp` BIGINT,
                labels MAP<STRING, STRING>, source STRING,
                event_time AS TO_TIMESTAMP_LTZ(`timestamp`, 3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
            ) WITH (
                'connector' = 'kafka', 'topic' = 'metrics-raw',
                'properties.bootstrap.servers' = 'kafka:9092',
                'properties.group.id' = 'flink-metrics-processor',
                'scan.startup.mode' = 'latest-offset',
                'format' = 'json'
            )
        """)

    def setup_prometheus_sink(self):
        self.t_env.execute_sql("""
            CREATE TABLE prometheus_sink (
                metric_name STRING, labels STRING, value DOUBLE, `timestamp` BIGINT
            ) WITH (
                'connector' = 'prometheus',
                'pushgateway.url' = 'prometheus-pushgateway:9091',
                'delete.after.write' = 'true'
            )
        """)

    def create_real_time_aggregation(self):
        self.t_env.execute_sql("""
            CREATE TABLE real_time_metrics AS
            SELECT
                metric_name,
                labels,
                TUMBLE_START(event_time, INTERVAL '1' SECOND) AS window_start,
                TUMBLE_END(event_time, INTERVAL '1' SECOND) AS window_end,
                SUM(value) AS total_value,
                COUNT(*) AS event_count,
                AVG(value) AS avg_value,
                MAX(value) AS max_value,
                MIN(value) AS min_value
            FROM metrics_raw
            GROUP BY
                TUMBLE(event_time, INTERVAL '1' SECOND),
                metric_name,
                labels
        """)

    def create_windowed_aggregation(self):
        self.t_env.execute_sql("""
            CREATE TABLE windowed_metrics AS
            SELECT
                metric_name,
                TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
                TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
                SUM(value) AS total_value,
                COUNT(*) AS sample_count,
                AVG(value) AS avg_value,
                STDDEV_POP(value) AS stddev_value
            FROM metrics_raw
            GROUP BY
                TUMBLE(event_time, INTERVAL '5' MINUTE),
                metric_name
        """)

    def create_anomaly_detection(self):
        self.t_env.execute_sql("""
            CREATE TABLE anomaly_alerts AS
            SELECT
                metric_name,
                labels,
                event_time,
                value,
                AVG(value) OVER (
                    PARTITION BY metric_name
                    ORDER BY event_time
                    ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
                ) AS moving_avg,
                STDDEV_POP(value) OVER (
                    PARTITION BY metric_name
                    ORDER BY event_time
                    ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
                ) AS moving_stddev,
                CASE
                    WHEN ABS(value - AVG(value) OVER (
                        PARTITION BY metric_name
                        ORDER BY event_time
                        ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
                    )) > 3 * STDDEV_POP(value) OVER (
                        PARTITION BY metric_name
                        ORDER BY event_time
                        ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
                    ) THEN true
                    ELSE false
                END AS is_anomaly
            FROM metrics_raw
        """)

    def run(self):
        self.setup_kafka_source()
        self.setup_prometheus_sink()
        self.create_real_time_aggregation()
        self.create_windowed_aggregation()
        self.create_anomaly_detection()
        self.t_env.execute("Streaming Metrics Processor")

Step 3: Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "Real-Time Operations Dashboard",
    "tags": ["streaming", "real-time", "operations"],
    "timezone": "browser",
    "refresh": "1s",
    "time": {"from": "now-1h", "to": "now"},
    "panels": [
      {
        "title": "Request Rate (per second)",
        "type": "timeseries",
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
        "targets": [
          {
            "expr": "sum(rate(http_requests_total[1m])) by (instance)",
            "legendFormat": "{{instance}}",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "reqps",
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 1000},
                {"color": "red", "value": 5000}
              ]
            }
          }
        }
      },
      {
        "title": "Response Latency (P99)",
        "type": "timeseries",
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0},
        "targets": [
          {
            "expr": "histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket[5m])) by (le, instance))",
            "legendFormat": "P99 {{instance}}",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "s",
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 0.5},
                {"color": "red", "value": 1.0}
              ]
            }
          }
        }
      },
      {
        "title": "CPU Usage by Instance",
        "type": "gauge",
        "gridPos": {"h": 8, "w": 8, "x": 0, "y": 8},
        "targets": [
          {
            "expr": "avg(cpu_usage_percent) by (instance)",
            "legendFormat": "{{instance}}",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "min": 0, "max": 100,
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 70},
                {"color": "red", "value": 90}
              ]
            }
          }
        }
      },
      {
        "title": "Active Sessions",
        "type": "stat",
        "gridPos": {"h": 8, "w": 8, "x": 8, "y": 8},
        "targets": [
          {
            "expr": "sum(user_sessions_active)",
            "legendFormat": "Total Active",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "thresholds": {
              "steps": [
                {"color": "blue", "value": null},
                {"color": "green", "value": 100},
                {"color": "yellow", "value": 500}
              ]
            }
          }
        }
      },
      {
        "title": "Revenue per Hour",
        "type": "bargauge",
        "gridPos": {"h": 8, "w": 8, "x": 16, "y": 8},
        "targets": [
          {
            "expr": "sum(increase(revenue_per_hour[1h])) by (region)",
            "legendFormat": "{{region}}",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "currencyUSD"
          }
        }
      },
      {
        "title": "Anomaly Alerts",
        "type": "table",
        "gridPos": {"h": 10, "w": 24, "x": 0, "y": 16},
        "targets": [
          {
            "expr": "anomaly_alerts{is_anomaly=\"true\"}",
            "format": "table",
            "instant": true,
            "refId": "A"
          }
        ]
      }
    ],
    "templating": {
      "list": [
        {
          "name": "instance",
          "type": "query",
          "query": "label_values(cpu_usage_percent, instance)",
          "refresh": 2,
          "multi": true,
          "includeAll": true
        }
      ]
    },
    "alerting": {
      "rules": [
        {
          "alert": "HighErrorRate",
          "expr": "sum(rate(http_requests_total{status=~\"5..\"}[5m])) / sum(rate(http_requests_total[5m])) > 0.05",
          "for": "2m",
          "labels": {"severity": "critical"},
          "annotations": {
            "summary": "High error rate detected (>5%)"
          }
        },
        {
          "alert": "HighLatency",
          "expr": "histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1",
          "for": "5m",
          "labels": {"severity": "warning"},
          "annotations": {
            "summary": "P99 latency above 1 second"
          }
        }
      ]
    }
  }
}

Infrastructure Setup (Terraform)

# infrastructure/streaming_dashboards.tf
variable "environment" {
  default = "production"
}

resource "aws_msk_cluster" "metrics_kafka" {
  cluster_name           = "metrics-streaming-${var.environment}"
  kafka_version          = "3.5.1"
  number_of_broker_nodes = 3

  broker_node_group_info {
    instance_type   = "kafka.m5.large"
    client_subnets  = var.private_subnets
    security_groups = [aws_security_group.kafka.id]
    storage_info {
      ebs_storage_info { volume_size = 100 }
    }
  }

  encryption_info {
    encryption_in_transit { client_broker = "TLS"; in_cluster = true }
    encryption_at_rest_kms_key_arn = aws_kms_key.kafka.arn
  }
}

resource "aws_ecs_cluster" "flink_cluster" {
  name = "flink-streaming-${var.environment}"
  setting { name = "containerInsights"; value = "enabled" }
}

resource "aws_ecs_task_definition" "flink_jobmanager" {
  family                   = "flink-jobmanager"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "2048"
  memory                   = "4096"
  container_definitions = jsonencode([{
    name  = "jobmanager"
    image = "${var.ecr_url}/flink-jobmanager:latest"
    portMappings = [{ containerPort = 8081, hostPort = 8081 }]
    command = ["jobmanager"]
    environment = [
      { name = "FLINK_PROPERTIES", value = "jobmanager.rpc.address: jobmanager\nstate.backend: rocksdb\nstate.checkpoints.dir: s3://${var.checkpoint_bucket}/checkpoints" }
    ]
    logConfiguration = {
      logDriver = "awslogs"
      options = { "awslogs-group" = aws_cloudwatch_log_group.flink.name, "awslogs-region" = var.region }
    }
  }])
  execution_role_arn = aws_iam_role.ecs_execution.arn
  task_role_arn      = aws_iam_role.flink_task.arn
}

resource "aws_ecs_task_definition" "flink_taskmanager" {
  family                   = "flink-taskmanager"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = "4096"
  memory                   = "8192"
  container_definitions = jsonencode([{
    name  = "taskmanager"
    image = "${var.ecr_url}/flink-taskmanager:latest"
    command = ["taskmanager"]
    environment = [
      { name = "FLINK_PROPERTIES", value = "jobmanager.rpc.address: jobmanager\ntaskmanager.numberOfTaskSlots: 4\nstate.backend: rocksdb" }
    ]
    logConfiguration = {
      logDriver = "awslogs"
      options = { "awslogs-group" = aws_cloudwatch_log_group.flink.name, "awslogs-region" = var.region }
    }
  }])
  execution_role_arn = aws_iam_role.ecs_execution.arn
  task_role_arn      = aws_iam_role.flink_task.arn
}

resource "aws_ecs_service" "jobmanager" {
  name            = "flink-jobmanager"
  cluster         = aws_ecs_cluster.flink_cluster.id
  task_definition = aws_ecs_task_definition.flink_jobmanager.arn
  desired_count   = 1
  network_configuration {
    subnets          = var.private_subnets
    security_groups  = [aws_security_group.flink.id]
    assign_public_ip = false
  }
}

resource "aws_ecs_service" "taskmanager" {
  name            = "flink-taskmanager"
  cluster         = aws_ecs_cluster.flink_cluster.id
  task_definition = aws_ecs_task_definition.flink_taskmanager.arn
  desired_count   = 3
  network_configuration {
    subnets          = var.private_subnets
    security_groups  = [aws_security_group.flink.id]
    assign_public_ip = false
  }
}

resource "helm_release" "prometheus" {
  name       = "prometheus"
  repository = "https://prometheus-community.github.io/helm-charts"
  chart      = "kube-prometheus-stack"
  version    = "55.0.0"
  namespace  = "monitoring"
  create_namespace = true

  values = [yamlencode({
    prometheus = {
      prometheusSpec = {
        retention = "30d"
        storageSpec = {
          volumeClaimTemplate = { spec = { storageClassName = "gp3", resources = { requests = { storage = "100Gi" } } } }
        }
      }
    }
    grafana = {
      adminPassword = var.grafana_password
      dashboards = { default = { "real-time-ops" = { json = file("dashboards/real-time-ops.json") } } }
    }
  })]
}

resource "aws_cloudwatch_log_group" "flink" {
  name              = "/ecs/flink-streaming"
  retention_in_days = 30
}

resource "aws_s3_bucket" "flink_checkpoints" {
  bucket = "flink-checkpoints-${var.environment}"
}

output "grafana_endpoint" {
  value = "http://grafana.monitoring.svc.cluster.local:3000"
}

output "prometheus_endpoint" {
  value = "http://prometheus.monitoring.svc.cluster.local:9090"
}

Testing and Validation

# tests/test_streaming_dashboards.py
import pytest
import time
import json
from datetime import datetime
from unittest.mock import Mock, patch

class TestStreamingDashboards:
    @pytest.fixture
    def metric_producer(self):
        from producers.metric_producer import MetricProducer
        return MetricProducer({'kafka_brokers': 'localhost:9092', 'schema_registry_url': 'http://localhost:8081'})

    @pytest.fixture
    def sample_metrics(self):
        return [
            {"metric_name": "http_requests_total", "value": 1, "labels": {"method": "GET", "status": "200"}, "timestamp": int(time.time() * 1000)},
            {"metric_name": "cpu_usage_percent", "value": 75.5, "labels": {"instance": "web-1"}, "timestamp": int(time.time() * 1000)},
            {"metric_name": "orders_per_minute", "value": 42, "labels": {"region": "us-east-1"}, "timestamp": int(time.time() * 1000)}
        ]

    def test_metric_schema_compliance(self, sample_metrics):
        for metric in sample_metrics:
            assert "metric_name" in metric
            assert "value" in metric
            assert "labels" in metric
            assert "timestamp" in metric
            assert isinstance(metric["value"], (int, float))
            assert isinstance(metric["labels"], dict)

    def test_prometheus_format_conversion(self):
        from schemas.metrics import MetricPoint
        point = MetricPoint(
            metric_name="http_requests_total",
            value=100.0,
            timestamp=datetime.now(),
            labels={"method": "GET", "status": "200"}
        )
        text = point.to_prometheus_text()
        assert "http_requests_total" in text
        assert 'method="GET"' in text
        assert "100.0" in text

    def test_grafana_panel_generation(self):
        from schemas.metrics import DashboardMetric
        panel = DashboardMetric(
            name="Request Rate",
            query="sum(rate(http_requests_total[1m]))",
            unit="reqps",
            thresholds={"warning": 1000, "critical": 5000}
        )
        grafana_panel = panel.to_grafana_panel()
        assert grafana_panel["title"] == "Request Rate"
        assert grafana_panel["type"] == "timeseries"
        assert len(grafana_panel["targets"]) == 1

    def test_alerting_rule_configuration(self):
        rules = [
            {"alert": "HighErrorRate", "expr": "error_rate > 0.05", "for": "2m", "severity": "critical"},
            {"alert": "HighLatency", "expr": "p99_latency > 1", "for": "5m", "severity": "warning"}
        ]
        for rule in rules:
            assert "alert" in rule
            assert "expr" in rule
            assert "for" in rule
            assert "severity" in rule

    def test_dashboard_refresh_rate(self):
        dashboard_config = {"refresh": "1s", "time": {"from": "now-1h", "to": "now"}}
        assert dashboard_config["refresh"] == "1s"

    @patch('confluent_kafka.Producer')
    def test_kafka_metric_production(self, mock_producer_class):
        mock_producer = Mock()
        mock_producer_class.return_value = mock_producer
        mock_producer.produce.return_value = None
        from producers.metric_producer import MetricProducer
        producer = MetricProducer({'kafka_brokers': 'localhost:9092', 'schema_registry_url': 'http://localhost:8081'})
        producer.produce_metric("test_metric", 42.0, {"instance": "test"})
        mock_producer.produce.assert_called()

    def test_anomaly_detection_threshold(self):
        values = [10, 12, 11, 13, 10, 100, 11, 12]
        mean = sum(values[:-1]) / len(values[:-1])
        import math
        stddev = math.sqrt(sum((x - mean) ** 2 for x in values[:-1]) / len(values[:-1]))
        threshold = 3 * stddev
        anomaly = abs(values[-3] - mean) > threshold
        assert isinstance(anomaly, bool)

    def test_windowed_aggregation_logic(self):
        events = [
            {"timestamp": 1000, "value": 10},
            {"timestamp": 2000, "value": 20},
            {"timestamp": 3000, "value": 15},
            {"timestamp": 4000, "value": 25}
        ]
        window_size_ms = 2500
        windows = {}
        for event in events:
            window_key = event["timestamp"] // window_size_ms
            if window_key not in windows:
                windows[window_key] = []
            windows[window_key].append(event["value"])
        for window_key, values in windows.items():
            avg = sum(values) / len(values)
            assert avg > 0
            assert len(values) > 0

    def test_metric_retention_policy(self):
        retention_config = {
            "real_time": {"retention": "1h", "resolution": "1s"},
            "short_term": {"retention": "24h", "resolution": "1m"},
            "long_term": {"retention": "30d", "resolution": "5m"}
        }
        assert len(retention_config) == 3
        for key, config in retention_config.items():
            assert "retention" in config
            assert "resolution" in config

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
MSK (Kafka)3x kafka.m5.large$450
ECS FlinkJobManager + 3 TaskManagers$800
PrometheusManaged, 100GB storage$200
Grafana CloudPro tier$50
S3 (Checkpoints)500GB$12
Data TransferCross-AZ$75
Total$1,587

Cost Optimization Strategies

💡

Tip: Reduce streaming dashboard costs by 30-40%:

  1. Flink Checkpointing: Optimize checkpoint intervals to reduce S3 costs
  2. Prometheus Downsampling: Downsample old metrics automatically
  3. Kafka Retention: Set appropriate retention periods per topic
  4. Grafana Caching: Enable query caching to reduce Prometheus load
  5. Auto-scaling: Scale Flink TaskManagers based on lag

Performance Metrics

MetricBatch DashboardsStreaming DashboardsImprovement
Data Freshness5-15 minutes< 1 second300-900x faster
Alert Latency10-30 minutes< 5 seconds120-360x faster
Dashboard Load3-5 seconds< 1 second3-5x faster
Query Performance2-10 seconds< 100ms20-100x faster

Interview Talking Points

Architecture Decisions

ℹ️

Best Practice: Focus on these streaming concepts in interviews:

  1. Why Flink over Spark Streaming?

    • True streaming (not micro-batch)
    • Lower latency (< 100ms vs seconds)
    • Exactly-once semantics
    • Better state management
  2. Why Prometheus over InfluxDB?

    • Pull-based model reduces complexity
    • Rich query language (PromQL)
    • Excellent Kubernetes integration
    • Large ecosystem of exporters
  3. Why Thanos for long-term storage?

    • Unlimited retention on object storage
    • Multi-cluster support
    • Global view across Prometheus instances
    • Cost-effective long-term storage

Common Interview Questions

Q: "How do you handle late-arriving metrics?"

late_data_strategies = {
    "Watermarks": "Allow 5-second lateness with Flink watermarks",
    "Windows": "Use sliding windows instead of tumbling",
    "Grace Period": "Configurable grace period for late events",
    "Side Outputs": "Route late data to separate stream"
}

Q: "How do you ensure dashboard availability during failures?"

availability_strategy = {
    "Multi-AZ": "Deploy across 3 availability zones",
    "Replication": "Prometheus with 2 replicas + Thanos",
    "Fallback": "Grafana shows stale data instead of errors",
    "Circuit Breaker": "Rate limiting prevents cascade failures"
}

Q: "How do you optimize Prometheus query performance?"

optimization_techniques = {
    "Recording Rules": "Pre-compute expensive queries",
    "Label Optimization": "Minimize high-cardinality labels",
    "Sharding": "Shard by metric type or label",
    "Caching": "Grafana query caching + Thanos query frontend"
}

Deployment Checklist

  • Deploy Kafka cluster for metric ingestion
  • Set up Flink cluster with job and task managers
  • Configure Prometheus with retention policies
  • Deploy Grafana with pre-built dashboards
  • Set up AlertManager with notification channels
  • Configure recording rules for common queries
  • Test end-to-end metric flow
  • Validate alerting thresholds
  • Set up monitoring for the monitoring stack
  • Document runbooks for common issues

⚠️

Warning: Always test alerting rules in staging first. False positives erode trust in the monitoring system. Start with warning-level alerts and escalate to critical after tuning.


This project demonstrates real-time monitoring and observability skills and is highly relevant for SRE and data engineering interviews at companies with large-scale distributed systems.

Advertisement