๐ŸŽ‰ 75% of content is free forever โ€” Unlock Premium from $10/mo โ†’
CW
Search coursesโ€ฆ
๐Ÿ’ผ Servicesโ„น๏ธ Aboutโœ‰๏ธ ContactView Pricing Plansfrom $10

Design a Unified Data Platform: Ingestion to Analytics

Cloud ArchitectureData Platform Architectureโญ Premium

Advertisement

Design a Unified Data Platform: Ingestion to Analytics

Difficulty: Staff Level | Companies: Netflix, Uber, Airbnb, Databricks, Snowflake

Interview Question

"Design a unified data platform that handles batch and streaming data for analytics, machine learning, and real-time dashboards. How do you ensure data quality, governance, and cost efficiency?"

โ„น๏ธKey Concepts

This question tests your understanding of data architecture patterns, modern data stack, and lakehouse architecture.

Complete Data Platform Architecture

Architecture Overview

Architecture Diagram
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    UNIFIED DATA PLATFORM ARCHITECTURE                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ DATA SOURCES โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                   โ”‚
โ”‚  โ”‚  Databases โ”‚ APIs โ”‚ Files โ”‚ IoT โ”‚ Logs โ”‚ Events โ”‚                   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                   โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ INGESTION LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                   โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚           Batch Ingestion                    โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  (Airflow, Glue, EMR)                       โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚           Stream Ingestion                   โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  (Kinesis, Kafka, Flink)                    โ”‚    โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ STORAGE LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚   Data   โ”‚  โ”‚  Data    โ”‚  โ”‚   Data   โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚   Lake   โ”‚  โ”‚  Warehouseโ”‚  โ”‚   Lakehouse         โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  (S3)    โ”‚  โ”‚ (Redshift)โ”‚  โ”‚  (Delta) โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ PROCESSING LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  Spark   โ”‚  โ”‚  Flink   โ”‚  โ”‚  dbt     โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  (Batch) โ”‚  โ”‚ (Stream) โ”‚  โ”‚(Transform)โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                         โ”‚                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ SERVING LAYER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                 โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚Analytics โ”‚  โ”‚   ML     โ”‚  โ”‚Real-Time โ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ”‚  BI      โ”‚  โ”‚ Platform โ”‚  โ”‚Dashboardsโ”‚          โ”‚              โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚              โ”‚
โ”‚  โ”‚                                                       โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Mathematical Foundation: Data Volume

Data Ingestion Rate:

  • Batch data: B = 1TB/day
  • Streaming data: S = 100GB/hour = 2.4TB/day
  • Total daily ingestion: T = B + S = 3.4TB/day
  • Monthly ingestion: M = T ร— 30 = 102TB/month
  • Yearly ingestion: Y = T ร— 365 = 1,241TB/year

Storage Requirements:

  • Raw data: R = Y = 1,241TB
  • Processed data: P = Y ร— 1.5 = 1,861.5TB (1.5x for transformations)
  • Total storage: S_total = R + P = 3,102.5TB

Processing Capacity:

  • Batch processing: B_proc = 1TB/day รท 24hours = 41.7GB/hour
  • Stream processing: S_proc = 100GB/hour
  • Total processing: T_proc = B_proc + S_proc = 141.7GB/hour

AWS Data Platform Implementation

# S3 Data Lake
resource "aws_s3_bucket" "data_lake" {
  bucket = "company-data-lake-${var.environment}"

  tags = {
    Environment = var.environment
    ManagedBy   = "Terraform"
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "data_lake" {
  bucket = aws_s3_bucket.data_lake.id

  rule {
    id     = "archive-old-data"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }

    transition {
      days          = 90
      storage_class = "GLACIER"
    }

    transition {
      days          = 365
      storage_class = "DEEP_ARCHIVE"
    }

    expiration {
      days = 2555  # 7 years
    }
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "data_lake" {
  bucket = aws_s3_bucket.data_lake.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_lake.arn
    }
  }
}

# Glue Data Catalog
resource "aws_glue_catalog_database" "data_lake" {
  name = "data_lake_${var.environment}"
}

resource "aws_glue_catalog_table" "raw_events" {
  name          = "raw_events"
  database_name = aws_glue_catalog_database.data_lake.name

  storage_descriptor {
    location      = "s3://${aws_s3_bucket.data_lake.id}/raw/events/"
    input_format  = "org.apache.hadoop.mapred.TextInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"

    ser_de_info {
      serialization_library = "org.openx.data.jsonserde.JsonSerDe"
    }

    columns {
      name = "event_id"
      type = "string"
    }

    columns {
      name = "event_type"
      type = "string"
    }

    columns {
      name = "timestamp"
      type = "bigint"
    }

    columns {
      name = "user_id"
      type = "string"
    }

    columns {
      name = "payload"
      type = "string"
    }
  }

  table_type = "EXTERNAL_TABLE"

  parameters = {
    "classification" = "json"
  }
}

# Kinesis Data Stream for real-time ingestion
resource "aws_kinesis_stream" "events" {
  name             = "events-stream-${var.environment}"
  shard_count      = 10
  retention_period = 24

  stream_mode_details {
    stream_mode = "PROVISIONED"
  }

  encryption_type = "KMS"
  kms_key_id      = aws_kms_key.kinesis.arn

  tags = {
    Environment = var.environment
  }
}

# Redshift Data Warehouse
resource "aws_redshift_cluster" "analytics" {
  cluster_identifier = "analytics-cluster-${var.environment}"
  database_name      = "analytics"
  master_username    = var.redshift_username
  master_password    = var.redshift_password
  node_type          = "dc2.large"
  cluster_type       = "single-node"

  encrypted = true
  kms_key_id = aws_kms_key.redshift.arn

  vpc_security_group_ids = [aws_security_group.redshift.id]
  subnet_group_name      = aws_redshift_subnet_group.analytics.name

  skip_final_snapshot = var.environment != "production"

  tags = {
    Environment = var.environment
  }
}

# EMR Cluster for Spark processing
resource "aws_emr_cluster" "spark" {
  name          = "spark-cluster-${var.environment}"
  release_label = "emr-6.10.0"
  applications  = ["Spark", "Hive", "Hue"]
  service_role  = aws_iam_role.emr_service.arn

  master_instance_group {
    instance_type  = "m5.xlarge"
    instance_count = 1
  }

  core_instance_group {
    instance_type  = "m5.2xlarge"
    instance_count = 3

    ebs_config {
      size                 = 100
      type                 = "gp3"
      volumes_per_instance = 2
    }
  }

  ec2_attributes {
    instance_profile = aws_iam_instance_profile.emr.arn
    subnet_id        = aws_subnet.private[0].id
  }

  tags = {
    Environment = var.environment
  }
}

Data Ingestion with Apache Kafka

# Kafka producer for data ingestion
from kafka import KafkaProducer
from typing import Dict, Any, List
import json
from datetime import datetime
import uuid

class DataIngestionProducer:
    """Kafka producer for data ingestion"""

    def __init__(self, bootstrap_servers: List[str]):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',
            retries=3
        )

    def ingest_event(self, topic: str, event: Dict[str, Any], 
                    partition_key: str = None):
        """Ingest single event"""
        event['ingestion_timestamp'] = datetime.utcnow().isoformat()
        event['event_id'] = str(uuid.uuid4())

        if partition_key is None:
            partition_key = event.get('user_id', event['event_id'])

        future = self.producer.send(
            topic=topic,
            key=partition_key,
            value=event
        )

        return future.get(timeout=10)

    def ingest_batch(self, topic: str, events: List[Dict[str, Any]]):
        """Ingest batch of events"""
        futures = []
        for event in events:
            future = self.producer.send(
                topic=topic,
                key=event.get('user_id'),
                value=event
            )
            futures.append(future)

        # Wait for all to complete
        for future in futures:
            future.get(timeout=10)

    def flush(self):
        self.producer.flush()

    def close(self):
        self.producer.close()

# Kafka consumer for stream processing
from kafka import KafkaConsumer
from typing import Callable

class DataIngestionConsumer:
    """Kafka consumer for stream processing"""

    def __init__(self, topics: List[str], bootstrap_servers: List[str],
                 group_id: str):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

    def consume(self, handler: Callable):
        """Consume messages"""
        try:
            for message in self.consumer:
                try:
                    handler(message.value)
                    self.consumer.commit()
                except Exception as e:
                    print(f"Error processing message: {e}")
        except KeyboardInterrupt:
            pass
        finally:
            self.consumer.close()

Data Transformation with dbt

-- dbt model: stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),

transformed AS (
    SELECT
        order_id,
        user_id,
        order_date,
        status,
        total_amount,
        currency,
        created_at,
        updated_at,
        -- Data quality checks
        CASE 
            WHEN total_amount < 0 THEN 0
            ELSE total_amount
        END AS clean_amount,
        -- Derived columns
        DATE_TRUNC('day', order_date) AS order_day,
        DATE_TRUNC('week', order_date) AS order_week,
        DATE_TRUNC('month', order_date) AS order_month
    FROM source
    WHERE order_id IS NOT NULL
)

SELECT * FROM transformed
-- dbt model: fct_daily_revenue.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

daily_metrics AS (
    SELECT
        order_day,
        COUNT(DISTINCT order_id) AS total_orders,
        COUNT(DISTINCT user_id) AS unique_users,
        SUM(clean_amount) AS total_revenue,
        AVG(clean_amount) AS avg_order_value,
        MIN(clean_amount) AS min_order_value,
        MAX(clean_amount) AS max_order_value,
        -- Percentiles
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY clean_amount) AS median_order_value,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY clean_amount) AS p95_order_value
    FROM orders
    GROUP BY order_day
)

SELECT 
    *,
    -- Growth metrics
    LAG(total_revenue, 1) OVER (ORDER BY order_day) AS prev_day_revenue,
    (total_revenue - LAG(total_revenue, 1) OVER (ORDER BY order_day)) / 
        NULLIF(LAG(total_revenue, 1) OVER (ORDER BY order_day), 0) AS revenue_growth_rate
FROM daily_metrics

Real-Time Analytics with Apache Flink

# Flink stream processing
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit

def create_stream_processing_job():
    """Create Flink stream processing job"""
    # Create Flink table environment
    env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = StreamTableEnvironment.create(environment_settings=env_settings)

    # Define source table (Kafka)
    t_env.execute_sql("""
        CREATE TABLE kafka_orders (
            order_id STRING,
            user_id STRING,
            amount DOUBLE,
            event_time TIMESTAMP(3),
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'orders',
            'properties.bootstrap.servers' = 'kafka:9092',
            'properties.group.id' = 'flink-processor',
            'format' = 'json',
            'scan.startup.mode' = 'latest-offset'
        )
    """)

    # Define sink table (Redis for real-time dashboard)
    t_env.execute_sql("""
        CREATE TABLE redis_metrics (
            window_start TIMESTAMP(3),
            window_end TIMESTAMP(3),
            order_count BIGINT,
            total_amount DOUBLE,
            avg_amount DOUBLE,
            PRIMARY KEY (window_start) NOT ENFORCED
        ) WITH (
            'connector' = 'redis',
            'host' = 'redis',
            'port' = '6379'
        )
    """)

    # Windowed aggregation
    t_env.execute_sql("""
        INSERT INTO redis_metrics
        SELECT
            TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
            TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
            COUNT(*) AS order_count,
            SUM(amount) AS total_amount,
            AVG(amount) AS avg_amount
        FROM kafka_orders
        GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
    """)

# Data quality checks
class DataQualityChecker:
    """Data quality validation"""

    def __init__(self):
        self.rules = []

    def add_rule(self, name: str, check_func, severity: str = 'error'):
        self.rules.append({
            'name': name,
            'check': check_func,
            'severity': severity
        })

    def validate(self, data) -> Dict[str, Any]:
        """Validate data against rules"""
        results = {
            'passed': True,
            'violations': []
        }

        for rule in self.rules:
            try:
                if not rule['check'](data):
                    results['passed'] = False
                    results['violations'].append({
                        'rule': rule['name'],
                        'severity': rule['severity']
                    })
            except Exception as e:
                results['passed'] = False
                results['violations'].append({
                    'rule': rule['name'],
                    'error': str(e),
                    'severity': 'error'
                })

        return results

# Usage
checker = DataQualityChecker()
checker.add_rule('not_null', lambda x: x is not None)
checker.add_rule('positive_amount', lambda x: x.get('amount', 0) > 0)
checker.add_rule('valid_email', lambda x: '@' in x.get('email', ''))

โš ๏ธData Quality

Implement data quality checks at ingestion, transformation, and serving layers. Use data contracts to ensure schema compatibility between producers and consumers.

Data Governance

# Data catalog and governance
from typing import Dict, Any, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class DataAsset:
    asset_id: str
    name: str
    description: str
    owner: str
    classification: str
    pii_fields: List[str]
    retention_days: int
    created_at: datetime
    tags: Dict[str, str]

class DataCatalog:
    """Data catalog for governance"""

    def __init__(self):
        self.assets: Dict[str, DataAsset] = {}

    def register_asset(self, asset: DataAsset):
        """Register data asset"""
        self.assets[asset.asset_id] = asset

    def get_asset(self, asset_id: str) -> DataAsset:
        """Get data asset"""
        return self.assets.get(asset_id)

    def search_assets(self, query: str) -> List[DataAsset]:
        """Search data assets"""
        results = []
        for asset in self.assets.values():
            if (query.lower() in asset.name.lower() or
                query.lower() in asset.description.lower()):
                results.append(asset)
        return results

    def get_assets_by_classification(self, classification: str) -> List[DataAsset]:
        """Get assets by classification"""
        return [a for a in self.assets.values() 
                if a.classification == classification]

class DataRetentionManager:
    """Data retention management"""

    def __init__(self):
        self.retention_policies = {}

    def add_policy(self, asset_type: str, retention_days: int):
        """Add retention policy"""
        self.retention_policies[asset_type] = retention_days

    def enforce_retention(self, s3_client, bucket: str, prefix: str):
        """Enforce data retention"""
        response = s3_client.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix
        )

        for obj in response.get('Contents', []):
            asset = self._get_asset_type(obj['Key'])
            retention_days = self.retention_policies.get(asset, 365)

            age_days = (datetime.utcnow() - obj['LastModified']).days
            if age_days > retention_days:
                self._delete_object(s3_client, bucket, obj['Key'])

โœ…Data Platform Benefits

A unified data platform enables data-driven decision making, ML model training, and real-time analytics. Use the medallion architecture (bronze/silver/gold) for data quality.

Summary

LayerPurposeAWS Services
IngestionData collectionKinesis, MSK, Glue
StorageData lake/warehouseS3, Redshift, Delta Lake
ProcessingBatch/StreamEMR, Flink, dbt
ServingAnalytics/MLQuickSight, SageMaker
GovernanceQuality/SecurityGlue Catalog, Lake Formation

Advertisement