🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Real-Time Data Warehouse with CDC (Debezium + Kafka + Snowflake)

Data Engineering ProjectsChange Data Capture⭐ Premium

Advertisement

Real-Time Data Warehouse with CDC

Debezium + Kafka + Snowflake for Instant Analytics

ℹ️

Project Difficulty: Advanced | Duration: 3-4 weeks | Cloud: AWS/GCP Build a real-time data warehouse using Change Data Capture to replicate transactional data into Snowflake with sub-second latency.

Project Overview

Problem Statement

Traditional ETL processes create data freshness gaps of hours or days, preventing real-time analytics and decision-making. Organizations need to replicate transactional databases to data warehouses in real-time while maintaining data consistency.

Objectives

  1. Capture changes from PostgreSQL in real-time (< 1 second latency)
  2. Transform and route events through Kafka
  3. Load data into Snowflake with exactly-once semantics
  4. Support schema evolution and data quality checks
  5. Enable real-time dashboards and analytics

Tech Stack

ComponentTechnologyPurpose
CDC ToolDebeziumDatabase change capture
Message BrokerApache KafkaEvent streaming
StreamingKafka ConnectData integration
WarehouseSnowflakeCloud data warehouse
MonitoringDatadog + PagerDutyObservability

Architecture Diagram

SOURCE DATABASESPostgreSQL (OLTP)MySQL (OLTP)MongoDB (Document Store)DEBEZIUM CDC CONNECTORSPG Connector (WAL-based)MySQL Connector (Binlog)MongoDB Connector (Oplog)APACHE KAFKAdbserver1.public.usersdbserver2.public.ordersdbserver3.catalog.productsKAFKA CONNECT SINKSSnowflake Sink (Structured)S3 Sink (Raw Events)Elasticsearch Sink (Index)SNOWFLAKE DATA WAREHOUSERAW Layer (Ingestion)STAGING Layer (Transform)ANALYTICS Layer (Views)CONSUMERSBI Tools (Looker)ML Pipelines (SageMaker)Data Apps (APIs)Real-time Dashboards

Data Source Setup and Schema

Source Database Schema (PostgreSQL)

-- source_database/001_schema.sql
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";

-- Users table
CREATE TABLE public.users (
    user_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    email VARCHAR(255) UNIQUE NOT NULL,
    first_name VARCHAR(100) NOT NULL,
    last_name VARCHAR(100) NOT NULL,
    phone VARCHAR(20),
    address JSONB,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    deleted_at TIMESTAMP WITH TIME ZONE
);

-- Create index for CDC
CREATE INDEX idx_users_updated_at ON public.users(updated_at);
CREATE INDEX idx_users_email ON public.users(email);

-- Products table
CREATE TABLE public.products (
    product_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    sku VARCHAR(50) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    category VARCHAR(100),
    subcategory VARCHAR(100),
    price DECIMAL(10, 2) NOT NULL,
    cost DECIMAL(10, 2),
    stock_quantity INTEGER DEFAULT 0,
    attributes JSONB,
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Orders table
CREATE TABLE public.orders (
    order_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES public.users(user_id),
    order_number VARCHAR(50) UNIQUE NOT NULL,
    status VARCHAR(30) NOT NULL,
    total_amount DECIMAL(12, 2) NOT NULL,
    tax_amount DECIMAL(10, 2) DEFAULT 0,
    shipping_amount DECIMAL(10, 2) DEFAULT 0,
    discount_amount DECIMAL(10, 2) DEFAULT 0,
    currency VARCHAR(3) DEFAULT 'USD',
    shipping_address JSONB,
    billing_address JSONB,
    notes TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    shipped_at TIMESTAMP WITH TIME ZONE,
    delivered_at TIMESTAMP WITH TIME ZONE
);

-- Order items table
CREATE TABLE public.order_items (
    order_item_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    order_id UUID REFERENCES public.orders(order_id) ON DELETE CASCADE,
    product_id UUID REFERENCES public.products(product_id),
    quantity INTEGER NOT NULL,
    unit_price DECIMAL(10, 2) NOT NULL,
    discount DECIMAL(5, 2) DEFAULT 0,
    total_price DECIMAL(12, 2) NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Inventory movements table
CREATE TABLE public.inventory_movements (
    movement_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    product_id UUID REFERENCES public.products(product_id),
    warehouse_id VARCHAR(50) NOT NULL,
    movement_type VARCHAR(20) NOT NULL, -- 'inbound', 'outbound', 'adjustment'
    quantity INTEGER NOT NULL,
    reference_type VARCHAR(50),
    reference_id UUID,
    notes TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Audit log table
CREATE TABLE public.audit_log (
    audit_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    table_name VARCHAR(100) NOT NULL,
    record_id UUID NOT NULL,
    action VARCHAR(10) NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
    old_data JSONB,
    new_data JSONB,
    changed_by VARCHAR(100),
    changed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Create audit trigger function
CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO public.audit_log (table_name, record_id, action, new_data, changed_by)
        VALUES (TG_TABLE_NAME, NEW.user_id, 'INSERT', to_jsonb(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO public.audit_log (table_name, record_id, action, old_data, new_data, changed_by)
        VALUES (TG_TABLE_NAME, NEW.user_id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO public.audit_log (table_name, record_id, action, old_data, changed_by)
        VALUES (TG_TABLE_NAME, OLD.user_id, 'DELETE', to_jsonb(OLD), current_user);
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- Apply audit triggers
CREATE TRIGGER users_audit_trigger
    AFTER INSERT OR UPDATE OR DELETE ON public.users
    FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();

CREATE TRIGGER orders_audit_trigger
    AFTER INSERT OR UPDATE OR DELETE ON public.orders
    FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();

-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;

-- Create replication user
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE ecommerce TO replicator;
GRANT USAGE ON SCHEMA public TO replicator;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;

-- Create publication for CDC
CREATE PUBLICATION dbz_publication FOR TABLE 
    public.users, 
    public.products, 
    public.orders, 
    public.order_items,
    public.inventory_movements;

Snowflake Target Schema

-- snowflake/001_warehouse_schema.sql
-- Create database and schemas
CREATE DATABASE IF NOT EXISTS ECOMMERCE_DW;

USE DATABASE ECOMMERCE_DW;

-- Raw layer (ingestion)
CREATE SCHEMA IF NOT EXISTS RAW;
CREATE SCHEMA IF NOT EXISTS STAGING;
CREATE SCHEMA IF NOT EXISTS ANALYTICS;

-- Raw tables (exact copy from source)
CREATE OR REPLACE TABLE RAW.USERS (
    user_id VARCHAR(36),
    email VARCHAR(255),
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    phone VARCHAR(20),
    address VARIANT,
    status VARCHAR(20),
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ,
    deleted_at TIMESTAMP_NTZ,
    _cdc_operation VARCHAR(10),
    _cdc_timestamp TIMESTAMP_NTZ,
    _kafka_offset NUMBER,
    _kafka_partition NUMBER,
    _ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

CREATE OR REPLACE TABLE RAW.PRODUCTS (
    product_id VARCHAR(36),
    sku VARCHAR(50),
    name VARCHAR(255),
    description TEXT,
    category VARCHAR(100),
    subcategory VARCHAR(100),
    price DECIMAL(10, 2),
    cost DECIMAL(10, 2),
    stock_quantity INTEGER,
    attributes VARIANT,
    is_active BOOLEAN,
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ,
    _cdc_operation VARCHAR(10),
    _cdc_timestamp TIMESTAMP_NTZ,
    _kafka_offset NUMBER,
    _kafka_partition NUMBER,
    _ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

CREATE OR REPLACE TABLE RAW.ORDERS (
    order_id VARCHAR(36),
    user_id VARCHAR(36),
    order_number VARCHAR(50),
    status VARCHAR(30),
    total_amount DECIMAL(12, 2),
    tax_amount DECIMAL(10, 2),
    shipping_amount DECIMAL(10, 2),
    discount_amount DECIMAL(10, 2),
    currency VARCHAR(3),
    shipping_address VARIANT,
    billing_address VARIANT,
    notes TEXT,
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ,
    shipped_at TIMESTAMP_NTZ,
    delivered_at TIMESTAMP_NTZ,
    _cdc_operation VARCHAR(10),
    _cdc_timestamp TIMESTAMP_NTZ,
    _kafka_offset NUMBER,
    _kafka_partition NUMBER,
    _ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Staging tables (cleaned and validated)
CREATE OR REPLACE TABLE STAGING.USERS (
    user_id VARCHAR(36) PRIMARY KEY,
    email VARCHAR(255),
    full_name VARCHAR(201),
    phone VARCHAR(20),
    city VARCHAR(100),
    state VARCHAR(100),
    country VARCHAR(100),
    status VARCHAR(20),
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ,
    is_current BOOLEAN DEFAULT TRUE,
    valid_from TIMESTAMP_NTZ,
    valid_to TIMESTAMP_NTZ
);

CREATE OR REPLACE TABLE STAGING.PRODUCTS (
    product_id VARCHAR(36) PRIMARY KEY,
    sku VARCHAR(50),
    name VARCHAR(255),
    category VARCHAR(100),
    subcategory VARCHAR(100),
    price DECIMAL(10, 2),
    cost DECIMAL(10, 2),
    stock_quantity INTEGER,
    is_active BOOLEAN,
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ
);

CREATE OR REPLACE TABLE STAGING.ORDERS (
    order_id VARCHAR(36) PRIMARY KEY,
    user_id VARCHAR(36),
    order_number VARCHAR(50),
    status VARCHAR(30),
    total_amount DECIMAL(12, 2),
    item_count INTEGER,
    shipping_city VARCHAR(100),
    shipping_state VARCHAR(100),
    shipping_country VARCHAR(100),
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ,
    shipped_at TIMESTAMP_NTZ,
    delivered_at TIMESTAMP_NTZ,
    delivery_days INTEGER
);

-- Analytics tables (business-ready)
CREATE OR REPLACE TABLE ANALYTICS.DAILY_SALES (
    sale_date DATE,
    product_id VARCHAR(36),
    category VARCHAR(100),
    total_revenue DECIMAL(15, 2),
    total_orders INTEGER,
    total_items INTEGER,
    avg_order_value DECIMAL(10, 2),
    unique_customers INTEGER,
    _computed_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

CREATE OR REPLACE TABLE ANALYTICS.CUSTOMER_SEGMENTS (
    user_id VARCHAR(36),
    segment VARCHAR(50),
    lifetime_value DECIMAL(12, 2),
    order_count INTEGER,
    avg_order_value DECIMAL(10, 2),
    last_order_date DATE,
    _computed_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Create tasks for incremental processing
CREATE OR REPLACE TASK STAGING.USERS_TASK
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('RAW.USERS_STREAM')
AS
    INSERT INTO STAGING.USERS (
        user_id, email, full_name, phone, city, state, country,
        status, created_at, updated_at, is_current, valid_from, valid_to
    )
    SELECT 
        user_id,
        email,
        CONCAT(first_name, ' ', last_name) as full_name,
        phone,
        address:'city'::STRING as city,
        address:'state'::STRING as state,
        address:'country'::STRING as country,
        status,
        created_at,
        updated_at,
        TRUE as is_current,
        _cdc_timestamp as valid_from,
        NULL as valid_to
    FROM RAW.USERS_STREAM
    WHERE METADATA$ACTION = 'INSERT'
       OR METADATA$ISUPDATE = TRUE;

-- Create streams for change tracking
CREATE OR REPLACE STREAM RAW.USERS_STREAM 
    ON TABLE RAW.USERS 
    APPEND_ONLY = FALSE;

CREATE OR REPLACE STREAM RAW.ORDERS_STREAM 
    ON TABLE RAW.ORDERS 
    APPEND_ONLY = FALSE;

CREATE OR REPLACE STREAM RAW.PRODUCTS_STREAM 
    ON TABLE RAW.PRODUCTS 
    APPEND_ONLY = FALSE;

Step-by-Step Implementation Guide

Step 1: Debezium Configuration

// debezium/connectors/postgres-connector.json
{
  "name": "postgres-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "source-db.internal",
    "database.port": "5432",
    "database.user": "replicator",
    "database.password": "${secrets:db-replicator-password}",
    "database.dbname": "ecommerce",
    "database.server.name": "dbserver1",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    
    "topic.prefix": "dbserver1",
    "schema.include.list": "public",
    "table.include.list": "public.users,public.products,public.orders,public.order_items",
    
    "transforms": "route,unwrap,addMetadata",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "cdc.$3",
    
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
    
    "transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField",
    "transforms.addMetadata.timestamp.field": "event_timestamp",
    
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "none",
    "snapshot.fetch.size": "10000",
    
    "heartbeat.interval.ms": "10000",
    "tombstones.on.delete": "true",
    
    "producer.batch.size": "16384",
    "producer.buffer.memory": "33554432",
    "producer.linger.ms": "50",
    "producer.compression.type": "snappy",
    
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "errors.tolerance": "none"
  }
}

Step 2: Kafka Connect Snowflake Sink

// kafka-connect/snowflake-sink.json
{
  "name": "snowflake-sink-connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connect.SnowflakeSinkConnector",
    "topics": "cdc.users,cdc.products,cdc.orders,cdc.order_items",
    
    "snowflake.ingestion.method": "KAFKA_CONNECT_SNOWPIPE",
    "buffer.count.records": "10000",
    "buffer.flush.time.sec": "60",
    "buffer.size.bytes": "5000000",
    
    "snowflake.database.name": "ECOMMERCE_DW",
    "snowflake.schema.name": "RAW",
    
    "snowflake.table.name": "${topic}",
    "snowflake.topic2table.map": "cdc.users:USERS,cdc.products:PRODUCTS,cdc.orders:ORDERS,cdc.order_items:ORDER_ITEMS",
    
    "snowflake.account": "${env:SNOWFLAKE_ACCOUNT}",
    "snowflake.user": "${env:SNOWFLAKE_USER}",
    "snowflake.private.key": "${env:SNOWFLAKE_PRIVATE_KEY}",
    "snowflake.private.key.passphrase": "${env:SNOWFLAKE_PASSPHRASE}",
    
    "snowflake.role": "KAFKA_CONNECT_ROLE",
    "snowflake.warehouse": "COMPUTE_WH",
    
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "addMetadata,extractKey",
    "transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField",
    "transforms.addMetadata.timestamp.field": "_cdc_timestamp",
    "transforms.addMetadata.operation.field": "_cdc_operation",
    
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "user_id",
    
    "errors.log.enable": "true",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-snowflake",
    "errors.deadletterqueue.topic.replication.factor": 3
  }
}

Step 3: Snowflake Transformation Pipeline

# snowflake/transformations.py
import snowflake.connector
from snowflake.connector import DictCursor
import json
from datetime import datetime
from typing import Dict, List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SnowflakeCDCManager:
    def __init__(self, config: Dict):
        self.config = config
        self.conn = None
    
    def connect(self):
        """Establish connection to Snowflake."""
        self.conn = snowflake.connector.connect(
            user=self.config['user'],
            password=self.config['password'],
            account=self.config['account'],
            warehouse=self.config['warehouse'],
            database=self.config['database'],
            schema=self.config['schema'],
            role=self.config.get('role', 'SYSADMIN')
        )
        logger.info("Connected to Snowflake")
    
    def process_cdc_events(self):
        """Process CDC events from raw to staging."""
        cursor = self.conn.cursor(DictCursor)
        
        try:
            # Process user changes
            cursor.execute("""
                INSERT INTO STAGING.USERS (
                    user_id, email, full_name, phone, city, state, country,
                    status, created_at, updated_at, is_current, valid_from, valid_to
                )
                SELECT 
                    r.user_id,
                    r.email,
                    CONCAT(r.first_name, ' ', r.last_name) as full_name,
                    r.phone,
                    r.address:city::STRING as city,
                    r.address:state::STRING as state,
                    r.address:country::STRING as country,
                    r.status,
                    r.created_at,
                    r.updated_at,
                    CASE WHEN r._cdc_operation = 'd' THEN FALSE ELSE TRUE END as is_current,
                    r._cdc_timestamp as valid_from,
                    CASE WHEN r._cdc_operation = 'd' THEN r._cdc_timestamp ELSE NULL END as valid_to
                FROM RAW.USERS r
                LEFT JOIN STAGING.USERS s ON r.user_id = s.user_id
                WHERE r._ingestion_timestamp > COALESCE(s.updated_at, '1900-01-01'::TIMESTAMP_NTZ)
                  AND r._cdc_operation IN ('c', 'u', 'd')
            """)
            
            # Process order changes
            cursor.execute("""
                INSERT INTO STAGING.ORDERS (
                    order_id, user_id, order_number, status, total_amount,
                    item_count, shipping_city, shipping_state, shipping_country,
                    created_at, updated_at, shipped_at, delivered_at, delivery_days
                )
                SELECT 
                    r.order_id,
                    r.user_id,
                    r.order_number,
                    r.status,
                    r.total_amount,
                    COALESCE(item_counts.item_count, 0) as item_count,
                    r.shipping_address:city::STRING as shipping_city,
                    r.shipping_address:state::STRING as shipping_state,
                    r.shipping_address:country::STRING as shipping_country,
                    r.created_at,
                    r.updated_at,
                    r.shipped_at,
                    r.delivered_at,
                    CASE 
                        WHEN r.delivered_at IS NOT NULL 
                        THEN DATEDIFF('day', r.created_at, r.delivered_at)
                        ELSE NULL
                    END as delivery_days
                FROM RAW.ORDERS r
                LEFT JOIN (
                    SELECT order_id, COUNT(*) as item_count
                    FROM RAW.ORDER_ITEMS
                    GROUP BY order_id
                ) item_counts ON r.order_id = item_counts.order_id
                LEFT JOIN STAGING.ORDERS s ON r.order_id = s.order_id
                WHERE r._ingestion_timestamp > COALESCE(s.updated_at, '1900-01-01'::TIMESTAMP_NTZ)
                  AND r._cdc_operation IN ('c', 'u', 'd')
            """)
            
            logger.info("CDC events processed successfully")
            
        except Exception as e:
            logger.error(f"Error processing CDC events: {e}")
            raise
    
    def compute_analytics(self):
        """Compute analytics metrics from staged data."""
        cursor = self.conn.cursor(DictCursor)
        
        try:
            # Daily sales metrics
            cursor.execute("""
                MERGE INTO ANALYTICS.DAILY_SALES t
                USING (
                    SELECT 
                        DATE(o.created_at) as sale_date,
                        oi.product_id,
                        p.category,
                        SUM(oi.total_price) as total_revenue,
                        COUNT(DISTINCT o.order_id) as total_orders,
                        SUM(oi.quantity) as total_items,
                        AVG(o.total_amount) as avg_order_value,
                        COUNT(DISTINCT o.user_id) as unique_customers
                    FROM STAGING.ORDERS o
                    JOIN RAW.ORDER_ITEMS oi ON o.order_id = oi.order_id
                    JOIN STAGING.PRODUCTS p ON oi.product_id = p.product_id
                    WHERE DATE(o.created_at) >= CURRENT_DATE() - 1
                    GROUP BY 1, 2, 3
                ) s
                ON t.sale_date = s.sale_date 
                    AND t.product_id = s.product_id
                WHEN MATCHED THEN UPDATE SET
                    t.total_revenue = s.total_revenue,
                    t.total_orders = s.total_orders,
                    t.total_items = s.total_items,
                    t.avg_order_value = s.avg_order_value,
                    t.unique_customers = s.unique_customers,
                    t._computed_at = CURRENT_TIMESTAMP()
                WHEN NOT MATCHED THEN INSERT (
                    sale_date, product_id, category, total_revenue,
                    total_orders, total_items, avg_order_value,
                    unique_customers, _computed_at
                ) VALUES (
                    s.sale_date, s.product_id, s.category, s.total_revenue,
                    s.total_orders, s.total_items, s.avg_order_value,
                    s.unique_customers, CURRENT_TIMESTAMP()
                )
            """)
            
            # Customer segments
            cursor.execute("""
                MERGE INTO ANALYTICS.CUSTOMER_SEGMENTS t
                USING (
                    SELECT 
                        u.user_id,
                        CASE 
                            WHEN COUNT(o.order_id) >= 10 THEN 'VIP'
                            WHEN COUNT(o.order_id) >= 5 THEN 'Loyal'
                            WHEN COUNT(o.order_id) >= 2 THEN 'Repeat'
                            ELSE 'One-time'
                        END as segment,
                        SUM(o.total_amount) as lifetime_value,
                        COUNT(o.order_id) as order_count,
                        AVG(o.total_amount) as avg_order_value,
                        MAX(DATE(o.created_at)) as last_order_date
                    FROM STAGING.USERS u
                    LEFT JOIN STAGING.ORDERS o ON u.user_id = o.user_id
                    WHERE u.is_current = TRUE
                    GROUP BY u.user_id
                ) s
                ON t.user_id = s.user_id
                WHEN MATCHED THEN UPDATE SET
                    t.segment = s.segment,
                    t.lifetime_value = s.lifetime_value,
                    t.order_count = s.order_count,
                    t.avg_order_value = s.avg_order_value,
                    t.last_order_date = s.last_order_date,
                    t._computed_at = CURRENT_TIMESTAMP()
                WHEN NOT MATCHED THEN INSERT (
                    user_id, segment, lifetime_value, order_count,
                    avg_order_value, last_order_date, _computed_at
                ) VALUES (
                    s.user_id, s.segment, s.lifetime_value, s.order_count,
                    s.avg_order_value, s.last_order_date, CURRENT_TIMESTAMP()
                )
            """)
            
            logger.info("Analytics computed successfully")
            
        except Exception as e:
            logger.error(f"Error computing analytics: {e}")
            raise
    
    def monitor_cdc_lag(self) -> Dict:
        """Monitor CDC replication lag."""
        cursor = self.conn.cursor(DictCursor)
        
        try:
            cursor.execute("""
                SELECT 
                    table_name,
                    MAX(_cdc_timestamp) as latest_cdc_timestamp,
                    DATEDIFF('second', MAX(_cdc_timestamp), CURRENT_TIMESTAMP()) as lag_seconds,
                    COUNT(*) as total_records,
                    COUNT(CASE WHEN _cdc_operation = 'c' THEN 1 END) as inserts,
                    COUNT(CASE WHEN _cdc_operation = 'u' THEN 1 END) as updates,
                    COUNT(CASE WHEN _cdc_operation = 'd' THEN 1 END) as deletes
                FROM RAW.USERS
                WHERE _ingestion_timestamp >= CURRENT_DATE()
                GROUP BY table_name
            """)
            
            results = cursor.fetchall()
            
            return {
                'tables': results,
                'monitoring_timestamp': datetime.now().isoformat(),
                'status': 'healthy' if all(r['lag_seconds'] < 300 for r in results) else 'lagging'
            }
            
        except Exception as e:
            logger.error(f"Error monitoring CDC lag: {e}")
            raise
    
    def handle_schema_evolution(self, table_name: str, new_columns: List[Dict]):
        """Handle schema evolution for CDC tables."""
        cursor = self.conn.cursor()
        
        try:
            # Get current table schema
            cursor.execute(f"""
                SELECT column_name, data_type
                FROM information_schema.columns
                WHERE table_schema = 'RAW' 
                  AND table_name = '{table_name}'
                ORDER BY ordinal_position
            """)
            
            current_columns = {row[0]: row[1] for row in cursor.fetchall()}
            
            # Add new columns
            for col in new_columns:
                if col['name'] not in current_columns:
                    cursor.execute(f"""
                        ALTER TABLE RAW.{table_name}
                        ADD COLUMN {col['name']} {col['type']}
                    """)
                    logger.info(f"Added column {col['name']} to {table_name}")
            
            # Update downstream schemas if needed
            if table_name == 'USERS':
                self._evolve_users_schema(new_columns)
            elif table_name == 'ORDERS':
                self._evolve_orders_schema(new_columns)
            
            logger.info(f"Schema evolution completed for {table_name}")
            
        except Exception as e:
            logger.error(f"Error evolving schema: {e}")
            raise
    
    def _evolve_users_schema(self, new_columns: List[Dict]):
        """Evolve users staging schema."""
        cursor = self.conn.cursor()
        
        for col in new_columns:
            try:
                cursor.execute(f"""
                    ALTER TABLE STAGING.USERS
                    ADD COLUMN {col['name']} {col['type']}
                """)
            except Exception as e:
                if "already exists" not in str(e):
                    raise
    
    def _evolve_orders_schema(self, new_columns: List[Dict]):
        """Evolve orders staging schema."""
        cursor = self.conn.cursor()
        
        for col in new_columns:
            try:
                cursor.execute(f"""
                    ALTER TABLE STAGING.ORDERS
                    ADD COLUMN {col['name']} {col['type']}
                """)
            except Exception as e:
                if "already exists" not in str(e):
                    raise

Infrastructure Setup (Terraform)

# infrastructure/snowflake_cdc.tf
terraform {
  required_version = ">= 1.5.0"
  
  required_providers {
    snowflake = {
      source  = "Snowflake-Labs/snowflake"
      version = "~> 0.89"
    }
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    confluent = {
      source  = "confluentinc/confluent"
      version = "~> 1.50.0"
    }
  }
}

# Variables
variable "snowflake_account" {
  description = "Snowflake account identifier"
  type        = string
}

variable "snowflake_user" {
  description = "Snowflake user"
  type        = string
  sensitive   = true
}

variable "snowflake_password" {
  description = "Snowflake password"
  type        = string
  sensitive   = true
}

variable "kafka_cluster_id" {
  description = "Confluent Kafka cluster ID"
  type        = string
}

# Snowflake Provider
provider "snowflake" {
  account  = var.snowflake_account
  user     = var.snowflake_user
  password = var.snowflake_password
  role     = "SYSADMIN"
}

# Snowflake Database
resource "snowflake_database" "ecommerce_dw" {
  name    = "ECOMMERCE_DW"
  comment = "E-commerce data warehouse with CDC"
}

# Snowflake Schemas
resource "snowflake_schema" "raw" {
  database = snowflake_database.ecommerce_dw.name
  name     = "RAW"
  comment  = "Raw CDC data layer"
}

resource "snowflake_schema" "staging" {
  database = snowflake_database.ecommerce_dw.name
  name     = "STAGING"
  comment  = "Staged and cleaned data"
}

resource "snowflake_schema" "analytics" {
  database = snowflake_database.ecommerce_dw.name
  name     = "ANALYTICS"
  comment  = "Business analytics layer"
}

# Snowflake Warehouse
resource "snowflake_warehouse" "compute" {
  name           = "COMPUTE_WH"
  comment        = "Compute warehouse for CDC processing"
  warehouse_size = "medium"
  auto_suspend   = 60
  auto_resume    = true
  
  scaling_policy = "ECONOMY"
  
  min_cluster_count = 1
  max_cluster_count = 3
  
  resource_monitor = snowflake_resource_monitor.cdc_monitor.name
}

# Resource Monitor
resource "snowflake_resource_monitor" "cdc_monitor" {
  name             = "CDC_RESOURCE_MONITOR"
  credit_quota     = 100
  frequency        = "MONTHLY"
  start_timestamp  = "IMMEDIATELY"
  
  notify_triggers  = [80, 90, 100]
  
  suspend_triggers = [100]
  suspend_immediate_triggers = [110]
}

# Snowflake Roles
resource "snowflake_role" "kafka_connect" {
  name    = "KAFKA_CONNECT_ROLE"
  comment = "Role for Kafka Connect Snowflake Sink"
}

# Grant permissions
resource "snowflake_grant_privileges_to_role" "kafka_connect_raw" {
  role_name = snowflake_role.kafka_connect.name
  
  privileges = ["USAGE", "CREATE TABLE", "INSERT", "UPDATE", "DELETE"]
  
  on_account_object {
    object_type = "SCHEMA"
    object_name = "${snowflake_database.ecommerce_dw.name}.${snowflake_schema.raw.name}"
  }
}

resource "snowflake_grant_privileges_to_role" "kafka_connect_staging" {
  role_name = snowflake_role.kafka_connect.name
  
  privileges = ["USAGE", "CREATE TABLE", "INSERT", "UPDATE", "DELETE"]
  
  on_account_object {
    object_type = "SCHEMA"
    object_name = "${snowflake_database.ecommerce_dw.name}.${snowflake_schema.staging.name}"
  }
}

# API Integration for external functions
resource "snowflake_api_integration" "cdc_integration" {
  name                = "CDC_API_INTEGRATION"
  api_provider        = "AWS_API_GATEWAY"
  api_allowed_prefixes = ["https://*.execute-api.us-east-1.amazonaws.com/"]
  enabled             = true
}

# External Functions for CDC monitoring
resource "snowflake_external_function" "cdc_health_check" {
  name     = "CDC_HEALTH_CHECK"
  database = snowflake_database.ecommerce_dw.name
  schema   = snowflake_schema.raw.name
  return   = "VARIANT"
  api_integration = snowflake_api_integration.cdc_integration.name
  
  arguments {
    name = "TABLE_NAME"
    type = "VARCHAR"
  }
  
  max_batch_rows = 1
  comment        = "CDC health check function"
}

# Confluent Kafka Cluster
resource "confluent_kafka_cluster" "cdc_cluster" {
  display_name = "cdc-kafka-cluster"
  availability = "HIGH"
  cloud        = "AWS"
  region       = "us-east-1"
  
  basic {}
  
  environment {
    id = confluent_environment.cdc_environment.id
  }
}

resource "confluent_environment" "cdc_environment" {
  display_name = "cdc-environment"
  description  = "Environment for CDC pipeline"
}

# Kafka Topics for CDC
resource "confluent_kafka_topic" "cdc_topics" {
  for_each = toset(["users", "products", "orders", "order_items"])
  
  topic_name    = "cdc.${each.key}"
  partitions    = 6
  replication_factor = 3
  
  config = {
    "cleanup.policy"            = "compact"
    "delete.retention.ms"       = "86400000"
    "min.compaction.lag.ms"     = "60000"
    "retention.ms"              = "604800000"
  }
  
  environment {
    id = confluent_environment.cdc_environment.id
  }
  
  kafka_cluster {
    id = confluent_kafka_cluster.cdc_cluster.id
  }
}

# Service Account for Debezium
resource "confluent_service_account" "debezium" {
  display_name = "debezium-service-account"
  description  = "Service account for Debezium CDC connector"
}

resource "confluent_role_binding" "debezium" {
  principal   = "User:${confluent_service_account.debezium.id}"
  role_name   = "CloudManager"
  crn_pattern = confluent_kafka_cluster.cdc_cluster.rbac_crn
}

# API Key for Debezium
resource "confluent_api_key" "debezium" {
  display_name = "debezium-api-key"
  description  = "API Key for Debezium connector"
  
  managed_resource {
    id          = confluent_kafka_cluster.cdc_cluster.id
    environment {
      id = confluent_environment.cdc_environment.id
    }
  }
  
  depends_on = [
    confluent_role_binding.debezium
  ]
}

# Snowflake Sink Connector
resource "confluent_connector" "snowflake_sink" {
  environment {
    id = confluent_environment.cdc_environment.id
  }
  
  kafka_cluster {
    id = confluent_kafka_cluster.cdc_cluster.id
  }
  
  config_nonsensitive {
    connector_class = "com.snowflake.kafka.connect.SnowflakeSinkConnector"
    
    topics = join(",", [
      for topic in confluent_kafka_topic.cdc_topics : topic.topic_name
    ])
    
    snowflake_ingestion_method = "KAFKA_CONNECT_SNOWPIPE"
    snowflake_database_name    = snowflake_database.ecommerce_dw.name
    snowflake_schema_name      = snowflake_schema.raw.name
    snowflake_account          = var.snowflake_account
    
    key_converter   = "io.confluent.connect.avro.AvroConverter"
    value_converter = "io.confluent.connect.avro.AvroConverter"
    
    buffer_count_records = "10000"
    buffer_flush_time_sec = "60"
    
    errors_log_enable       = "true"
    errors_tolerance        = "all"
    errors_deadletterqueue_topic_name = "dlq-snowflake"
  }
  
  depends_on = [
    confluent_kafka_topic.cdc_topics
  ]
}

# Outputs
output "snowflake_database" {
  description = "Snowflake database name"
  value       = snowflake_database.ecommerce_dw.name
}

output "kafka_cluster_id" {
  description = "Kafka cluster ID"
  value       = confluent_kafka_cluster.cdc_cluster.id
}

output "cdc_topics" {
  description = "CDC topic names"
  value       = [for topic in confluent_kafka_topic.cdc_topics : topic.topic_name]
}

Testing and Validation

# tests/test_cdc_pipeline.py
import pytest
import json
import time
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock
from confluent_kafka import Producer, Consumer, KafkaError
import snowflake.connector

class TestCDCPipeline:
    @pytest.fixture
    def kafka_config(self):
        return {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'test-cdc-group',
            'auto.offset.reset': 'earliest'
        }
    
    @pytest.fixture
    def snowflake_config(self):
        return {
            'account': 'test-account',
            'user': 'test-user',
            'password': 'test-password',
            'database': 'ECOMMERCE_DW',
            'schema': 'RAW'
        }
    
    def test_debezium_connector_config(self):
        """Test Debezium connector configuration validity."""
        config = {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "localhost",
            "database.port": "5432",
            "database.dbname": "testdb",
            "database.server.name": "testserver",
            "plugin.name": "pgoutput",
            "slot.name": "test_slot",
            "publication.name": "test_publication",
            "topic.prefix": "test",
            "table.include.list": "public.users"
        }
        
        # Validate required fields
        required_fields = [
            'connector.class', 'database.hostname', 'database.port',
            'database.dbname', 'database.server.name', 'plugin.name'
        ]
        
        for field in required_fields:
            assert field in config, f"Missing required field: {field}"
        
        # Validate connector class
        assert config['connector.class'] == 'io.debezium.connector.postgresql.PostgresConnector'
        
        # Validate plugin
        valid_plugins = ['pgoutput', 'wal2json', 'decoderbufs']
        assert config['plugin.name'] in valid_plugins
    
    @patch('confluent_kafka.Producer')
    def test_kafka_topic_creation(self, mock_producer):
        """Test Kafka topic creation for CDC."""
        from kafka.admin import AdminClient, NewTopic
        
        admin_config = {'bootstrap.servers': 'localhost:9092'}
        
        # Mock admin client
        mock_admin = Mock()
        mock_admin.create_topics.return_value = []
        
        # Test topic configuration
        topics = [
            NewTopic('cdc.users', num_partitions=6, replication_factor=3),
            NewTopic('cdc.products', num_partitions=6, replication_factor=3),
            NewTopic('cdc.orders', num_partitions=6, replication_factor=3)
        ]
        
        # Validate topic configs
        for topic in topics:
            assert topic.num_partitions == 6
            assert topic.replication_factor == 3
    
    def test_cdc_event_structure(self):
        """Test CDC event structure matches expected format."""
        cdc_event = {
            "schema": {
                "type": "struct",
                "fields": [
                    {"type": "string", "field": "user_id", "optional": False},
                    {"type": "string", "field": "email", "optional": False},
                    {"type": "string", "field": "first_name", "optional": True},
                    {"type": "string", "field": "last_name", "optional": True},
                    {"type": "int64", "field": "updated_at", "optional": False}
                ]
            },
            "payload": {
                "user_id": "test-user-123",
                "email": "test@example.com",
                "first_name": "Test",
                "last_name": "User",
                "updated_at": int(datetime.now().timestamp() * 1000)
            }
        }
        
        # Validate schema structure
        assert cdc_event['schema']['type'] == 'struct'
        assert len(cdc_event['schema']['fields']) > 0
        
        # Validate payload matches schema
        for field in cdc_event['schema']['fields']:
            assert field['field'] in cdc_event['payload']
    
    @patch('snowflake.connector.connect')
    def test_snowflake_raw_insert(self, mock_connect):
        """Test inserting CDC events into Snowflake raw table."""
        mock_conn = Mock()
        mock_cursor = Mock()
        mock_connect.return_value = mock_conn
        mock_conn.cursor.return_value = mock_cursor
        
        # Simulate insert
        insert_sql = """
            INSERT INTO RAW.USERS (
                user_id, email, first_name, last_name,
                _cdc_operation, _cdc_timestamp, _ingestion_timestamp
            ) VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP())
        """
        
        test_data = (
            "user-123", "test@example.com", "Test", "User",
            "c", datetime.now(), datetime.now()
        )
        
        mock_cursor.execute(insert_sql, test_data)
        
        # Verify execute was called
        mock_cursor.execute.assert_called_once()
    
    def test_snowpipe_loading(self):
        """Test Snowpipe configuration for CDC loading."""
        snowpipe_config = {
            "PIPE_NAME": "CDC_USERS_PIPE",
            "PIPETABLE": "RAW.USERS",
            "PIPSOURCE": "s3://cdc-bucket/users/",
            "FILE_FORMAT": "CSV",
            "ON_ERROR": "CONTINUE"
        }
        
        # Validate Snowpipe SQL
        pipe_sql = f"""
            CREATE OR REPLACE PIPE {snowpipe_config['PIPE_NAME']}
            AUTO_INGEST = TRUE
            AS
            COPY INTO {snowpipe_config['PIPETABLE']}
            FROM {snowpipe_config['PIPSOURCE']}
            FILE_FORMAT = (TYPE = {snowpipe_config['FILE_FORMAT']})
            ON_ERROR = {snowpipe_config['ON_ERROR']}
        """
        
        assert "CREATE OR REPLACE PIPE" in pipe_sql
        assert "AUTO_INGEST = TRUE" in pipe_sql
    
    def test_data_quality_checks(self):
        """Test CDC data quality validation."""
        quality_checks = [
            {
                "check_name": "null_check",
                "table": "RAW.USERS",
                "column": "user_id",
                "sql": "SELECT COUNT(*) FROM RAW.USERS WHERE user_id IS NULL",
                "expected": 0
            },
            {
                "check_name": "duplicate_check",
                "table": "RAW.USERS",
                "column": "user_id",
                "sql": "SELECT COUNT(*) - COUNT(DISTINCT user_id) FROM RAW.USERS",
                "expected": 0
            },
            {
                "check_name": "freshness_check",
                "table": "RAW.USERS",
                "column": "_cdc_timestamp",
                "sql": "SELECT DATEDIFF('minute', MAX(_cdc_timestamp), CURRENT_TIMESTAMP()) FROM RAW.USERS",
                "expected_max": 5
            }
        ]
        
        for check in quality_checks:
            assert 'check_name' in check
            assert 'sql' in check
            assert 'expected' in check or 'expected_max' in check
    
    def test_schema_evolution(self):
        """Test schema evolution handling."""
        # Current schema
        current_schema = {
            "user_id": "VARCHAR(36)",
            "email": "VARCHAR(255)",
            "first_name": "VARCHAR(100)",
            "last_name": "VARCHAR(100)"
        }
        
        # New schema with added column
        new_schema = {
            "user_id": "VARCHAR(36)",
            "email": "VARCHAR(255)",
            "first_name": "VARCHAR(100)",
            "last_name": "VARCHAR(100)",
            "phone": "VARCHAR(20)"  # New column
        }
        
        # Find new columns
        new_columns = set(new_schema.keys()) - set(current_schema.keys())
        
        assert len(new_columns) == 1
        assert "phone" in new_columns
        
        # Generate ALTER TABLE statement
        for col in new_columns:
            alter_sql = f"ALTER TABLE RAW.USERS ADD COLUMN {col} {new_schema[col]}"
            assert "ALTER TABLE" in alter_sql
            assert col in alter_sql
    
    def test_cdc_lag_monitoring(self):
        """Test CDC lag monitoring and alerting."""
        # Simulate lag metrics
        lag_metrics = {
            "users_table": {
                "lag_seconds": 30,
                "records_per_minute": 1500,
                "last_update": datetime.now().isoformat()
            },
            "orders_table": {
                "lag_seconds": 45,
                "records_per_minute": 2500,
                "last_update": datetime.now().isoformat()
            }
        }
        
        # Define alert thresholds
        thresholds = {
            "lag_seconds_warning": 60,
            "lag_seconds_critical": 300,
            "records_per_minute_min": 100
        }
        
        # Check for alerts
        alerts = []
        for table, metrics in lag_metrics.items():
            if metrics['lag_seconds'] > thresholds['lag_seconds_critical']:
                alerts.append(f"CRITICAL: {table} lag exceeds {thresholds['lag_seconds_critical']}s")
            elif metrics['lag_seconds'] > thresholds['lag_seconds_warning']:
                alerts.append(f"WARNING: {table} lag exceeds {thresholds['lag_seconds_warning']}s")
        
        # Verify alert logic
        assert len(alerts) == 0  # Should be healthy
    
    def test_exactly_once_semantics(self):
        """Test exactly-once processing semantics."""
        # Simulate Kafka consumer with idempotent processing
        processed_offsets = set()
        
        def process_event(event_id, offset):
            if offset not in processed_offsets:
                # Process event
                processed_offsets.add(offset)
                return True
            return False
        
        # Test idempotency
        assert process_event("event1", 100) == True
        assert process_event("event1", 100) == False  # Should be idempotent
        assert process_event("event2", 101) == True
    
    def test_dead_letter_queue(self):
        """Test dead letter queue handling."""
        # Simulate DLQ processing
        dlq_events = [
            {
                "original_topic": "cdc.users",
                "error": "Schema validation failed",
                "event": {"user_id": "test", "email": None},
                "timestamp": datetime.now().isoformat()
            }
        ]
        
        # Validate DLQ structure
        for event in dlq_events:
            assert 'original_topic' in event
            assert 'error' in event
            assert 'event' in event
            assert 'timestamp' in event
        
        # Process DLQ events
        processed_count = 0
        for event in dlq_events:
            if event['error'] == 'Schema validation failed':
                # Fix and reprocess
                processed_count += 1
        
        assert processed_count == 1

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
SnowflakeMedium warehouse, 24/7$2,000
Confluent CloudStandard cluster$500
DebeziumSelf-hosted on EC2$200
EC2 (Kafka Connect)3x m5.xlarge$450
S3 Storage1TB CDC logs$23
Data TransferCross-region$100
MonitoringDatadog$150
Total$3,423

Cost Optimization Strategies

💡

Tip: Optimize CDC costs with these strategies:

  1. Snowflake Auto-Suspend: Suspend warehouse when idle
  2. Snowpipe Batching: Batch small files for efficient loading
  3. Compression: Use Avro/Parquet for 70% size reduction
  4. Partition Pruning: Query only relevant partitions
  5. Resource Monitors: Set spending limits

Performance Metrics

MetricBefore CDCAfter CDCImprovement
Data Freshness4 hours< 1 minute240x faster
Query Latency30 seconds2 seconds15x faster
ETL Maintenance20 hrs/week2 hrs/week90% reduction
Data Quality Issues50/month5/month90% reduction

Interview Talking Points

Architecture Decisions

ℹ️

Best Practice: Focus on these CDC concepts in interviews:

  1. Why Debezium over custom CDC?

    • Battle-tested with production workloads
    • Supports multiple databases (PG, MySQL, MongoDB)
    • Handles schema evolution automatically
    • Active community and support
  2. Why Snowpipe over COPY?

    • Automatic micro-batching
    • Better for streaming workloads
    • Reduced latency (< 1 minute)
    • Cost-efficient for small files
  3. Why exactly-once semantics?

    • Critical for financial data
    • Prevents duplicate transactions
    • Maintains data consistency
    • Requires idempotent processing

Common Interview Questions

Q: "How do you handle out-of-order events in CDC?"

out_of_order_handling = {
    "Event Time": "Use source timestamp, not ingestion time",
    "Watermarking": "Track maximum timestamp with allowed lateness",
    "Deduplication": "Use event ID for idempotent processing",
    "Partitioning": "Partition by source key for ordering"
}

Q: "How do you handle schema changes in source database?"

schema_evolution_strategy = {
    "Forward Compatibility": "New schema can read old data",
    "Backward Compatibility": "Old schema can read new data",
    "Full Compatibility": "Both directions work",
    "Breaking Changes": "Requires connector restart and snapshot"
}

Q: "How do you monitor CDC pipeline health?"

monitoring_metrics = {
    "Lag Metrics": "Replication lag per table",
    "Throughput": "Events processed per second",
    "Error Rate": "Failed events percentage",
    "Data Quality": "Null rates, duplicates, freshness"
}

Deployment Checklist

  • Configure PostgreSQL for logical replication
  • Deploy Debezium connector with proper configuration
  • Set up Kafka Connect cluster
  • Configure Snowflake sink connector
  • Create Snowflake schemas and tables
  • Set up Snowpipe for automated loading
  • Implement data quality checks
  • Configure monitoring and alerting
  • Test failover and recovery procedures
  • Document operational runbooks

⚠️

Warning: Always test CDC configuration in staging before production. Incorrect configuration can cause data loss or performance issues.


This project demonstrates real-time data warehousing skills and is highly relevant for data engineering interviews at companies with real-time analytics requirements.

Advertisement