Real-Time Data Warehouse with CDC
Debezium + Kafka + Snowflake for Instant Analytics
ℹ️
Project Difficulty: Advanced | Duration: 3-4 weeks | Cloud: AWS/GCP Build a real-time data warehouse using Change Data Capture to replicate transactional data into Snowflake with sub-second latency.
Project Overview
Problem Statement
Traditional ETL processes create data freshness gaps of hours or days, preventing real-time analytics and decision-making. Organizations need to replicate transactional databases to data warehouses in real-time while maintaining data consistency.
Objectives
- Capture changes from PostgreSQL in real-time (< 1 second latency)
- Transform and route events through Kafka
- Load data into Snowflake with exactly-once semantics
- Support schema evolution and data quality checks
- Enable real-time dashboards and analytics
Tech Stack
| Component | Technology | Purpose |
|---|---|---|
| CDC Tool | Debezium | Database change capture |
| Message Broker | Apache Kafka | Event streaming |
| Streaming | Kafka Connect | Data integration |
| Warehouse | Snowflake | Cloud data warehouse |
| Monitoring | Datadog + PagerDuty | Observability |
Architecture Diagram
Data Source Setup and Schema
Source Database Schema (PostgreSQL)
-- source_database/001_schema.sql
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- Users table
CREATE TABLE public.users (
user_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
email VARCHAR(255) UNIQUE NOT NULL,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
phone VARCHAR(20),
address JSONB,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
deleted_at TIMESTAMP WITH TIME ZONE
);
-- Create index for CDC
CREATE INDEX idx_users_updated_at ON public.users(updated_at);
CREATE INDEX idx_users_email ON public.users(email);
-- Products table
CREATE TABLE public.products (
product_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
sku VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
category VARCHAR(100),
subcategory VARCHAR(100),
price DECIMAL(10, 2) NOT NULL,
cost DECIMAL(10, 2),
stock_quantity INTEGER DEFAULT 0,
attributes JSONB,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Orders table
CREATE TABLE public.orders (
order_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
user_id UUID REFERENCES public.users(user_id),
order_number VARCHAR(50) UNIQUE NOT NULL,
status VARCHAR(30) NOT NULL,
total_amount DECIMAL(12, 2) NOT NULL,
tax_amount DECIMAL(10, 2) DEFAULT 0,
shipping_amount DECIMAL(10, 2) DEFAULT 0,
discount_amount DECIMAL(10, 2) DEFAULT 0,
currency VARCHAR(3) DEFAULT 'USD',
shipping_address JSONB,
billing_address JSONB,
notes TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
shipped_at TIMESTAMP WITH TIME ZONE,
delivered_at TIMESTAMP WITH TIME ZONE
);
-- Order items table
CREATE TABLE public.order_items (
order_item_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
order_id UUID REFERENCES public.orders(order_id) ON DELETE CASCADE,
product_id UUID REFERENCES public.products(product_id),
quantity INTEGER NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
discount DECIMAL(5, 2) DEFAULT 0,
total_price DECIMAL(12, 2) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Inventory movements table
CREATE TABLE public.inventory_movements (
movement_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
product_id UUID REFERENCES public.products(product_id),
warehouse_id VARCHAR(50) NOT NULL,
movement_type VARCHAR(20) NOT NULL, -- 'inbound', 'outbound', 'adjustment'
quantity INTEGER NOT NULL,
reference_type VARCHAR(50),
reference_id UUID,
notes TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Audit log table
CREATE TABLE public.audit_log (
audit_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
table_name VARCHAR(100) NOT NULL,
record_id UUID NOT NULL,
action VARCHAR(10) NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
old_data JSONB,
new_data JSONB,
changed_by VARCHAR(100),
changed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create audit trigger function
CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO public.audit_log (table_name, record_id, action, new_data, changed_by)
VALUES (TG_TABLE_NAME, NEW.user_id, 'INSERT', to_jsonb(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO public.audit_log (table_name, record_id, action, old_data, new_data, changed_by)
VALUES (TG_TABLE_NAME, NEW.user_id, 'UPDATE', to_jsonb(OLD), to_jsonb(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO public.audit_log (table_name, record_id, action, old_data, changed_by)
VALUES (TG_TABLE_NAME, OLD.user_id, 'DELETE', to_jsonb(OLD), current_user);
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;
-- Apply audit triggers
CREATE TRIGGER users_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
CREATE TRIGGER orders_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.orders
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
-- Create replication user
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE ecommerce TO replicator;
GRANT USAGE ON SCHEMA public TO replicator;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator;
-- Create publication for CDC
CREATE PUBLICATION dbz_publication FOR TABLE
public.users,
public.products,
public.orders,
public.order_items,
public.inventory_movements;
Snowflake Target Schema
-- snowflake/001_warehouse_schema.sql
-- Create database and schemas
CREATE DATABASE IF NOT EXISTS ECOMMERCE_DW;
USE DATABASE ECOMMERCE_DW;
-- Raw layer (ingestion)
CREATE SCHEMA IF NOT EXISTS RAW;
CREATE SCHEMA IF NOT EXISTS STAGING;
CREATE SCHEMA IF NOT EXISTS ANALYTICS;
-- Raw tables (exact copy from source)
CREATE OR REPLACE TABLE RAW.USERS (
user_id VARCHAR(36),
email VARCHAR(255),
first_name VARCHAR(100),
last_name VARCHAR(100),
phone VARCHAR(20),
address VARIANT,
status VARCHAR(20),
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ,
deleted_at TIMESTAMP_NTZ,
_cdc_operation VARCHAR(10),
_cdc_timestamp TIMESTAMP_NTZ,
_kafka_offset NUMBER,
_kafka_partition NUMBER,
_ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
CREATE OR REPLACE TABLE RAW.PRODUCTS (
product_id VARCHAR(36),
sku VARCHAR(50),
name VARCHAR(255),
description TEXT,
category VARCHAR(100),
subcategory VARCHAR(100),
price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INTEGER,
attributes VARIANT,
is_active BOOLEAN,
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ,
_cdc_operation VARCHAR(10),
_cdc_timestamp TIMESTAMP_NTZ,
_kafka_offset NUMBER,
_kafka_partition NUMBER,
_ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
CREATE OR REPLACE TABLE RAW.ORDERS (
order_id VARCHAR(36),
user_id VARCHAR(36),
order_number VARCHAR(50),
status VARCHAR(30),
total_amount DECIMAL(12, 2),
tax_amount DECIMAL(10, 2),
shipping_amount DECIMAL(10, 2),
discount_amount DECIMAL(10, 2),
currency VARCHAR(3),
shipping_address VARIANT,
billing_address VARIANT,
notes TEXT,
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ,
shipped_at TIMESTAMP_NTZ,
delivered_at TIMESTAMP_NTZ,
_cdc_operation VARCHAR(10),
_cdc_timestamp TIMESTAMP_NTZ,
_kafka_offset NUMBER,
_kafka_partition NUMBER,
_ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Staging tables (cleaned and validated)
CREATE OR REPLACE TABLE STAGING.USERS (
user_id VARCHAR(36) PRIMARY KEY,
email VARCHAR(255),
full_name VARCHAR(201),
phone VARCHAR(20),
city VARCHAR(100),
state VARCHAR(100),
country VARCHAR(100),
status VARCHAR(20),
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ,
is_current BOOLEAN DEFAULT TRUE,
valid_from TIMESTAMP_NTZ,
valid_to TIMESTAMP_NTZ
);
CREATE OR REPLACE TABLE STAGING.PRODUCTS (
product_id VARCHAR(36) PRIMARY KEY,
sku VARCHAR(50),
name VARCHAR(255),
category VARCHAR(100),
subcategory VARCHAR(100),
price DECIMAL(10, 2),
cost DECIMAL(10, 2),
stock_quantity INTEGER,
is_active BOOLEAN,
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ
);
CREATE OR REPLACE TABLE STAGING.ORDERS (
order_id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36),
order_number VARCHAR(50),
status VARCHAR(30),
total_amount DECIMAL(12, 2),
item_count INTEGER,
shipping_city VARCHAR(100),
shipping_state VARCHAR(100),
shipping_country VARCHAR(100),
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ,
shipped_at TIMESTAMP_NTZ,
delivered_at TIMESTAMP_NTZ,
delivery_days INTEGER
);
-- Analytics tables (business-ready)
CREATE OR REPLACE TABLE ANALYTICS.DAILY_SALES (
sale_date DATE,
product_id VARCHAR(36),
category VARCHAR(100),
total_revenue DECIMAL(15, 2),
total_orders INTEGER,
total_items INTEGER,
avg_order_value DECIMAL(10, 2),
unique_customers INTEGER,
_computed_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
CREATE OR REPLACE TABLE ANALYTICS.CUSTOMER_SEGMENTS (
user_id VARCHAR(36),
segment VARCHAR(50),
lifetime_value DECIMAL(12, 2),
order_count INTEGER,
avg_order_value DECIMAL(10, 2),
last_order_date DATE,
_computed_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create tasks for incremental processing
CREATE OR REPLACE TASK STAGING.USERS_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('RAW.USERS_STREAM')
AS
INSERT INTO STAGING.USERS (
user_id, email, full_name, phone, city, state, country,
status, created_at, updated_at, is_current, valid_from, valid_to
)
SELECT
user_id,
email,
CONCAT(first_name, ' ', last_name) as full_name,
phone,
address:'city'::STRING as city,
address:'state'::STRING as state,
address:'country'::STRING as country,
status,
created_at,
updated_at,
TRUE as is_current,
_cdc_timestamp as valid_from,
NULL as valid_to
FROM RAW.USERS_STREAM
WHERE METADATA$ACTION = 'INSERT'
OR METADATA$ISUPDATE = TRUE;
-- Create streams for change tracking
CREATE OR REPLACE STREAM RAW.USERS_STREAM
ON TABLE RAW.USERS
APPEND_ONLY = FALSE;
CREATE OR REPLACE STREAM RAW.ORDERS_STREAM
ON TABLE RAW.ORDERS
APPEND_ONLY = FALSE;
CREATE OR REPLACE STREAM RAW.PRODUCTS_STREAM
ON TABLE RAW.PRODUCTS
APPEND_ONLY = FALSE;
Step-by-Step Implementation Guide
Step 1: Debezium Configuration
// debezium/connectors/postgres-connector.json
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "source-db.internal",
"database.port": "5432",
"database.user": "replicator",
"database.password": "${secrets:db-replicator-password}",
"database.dbname": "ecommerce",
"database.server.name": "dbserver1",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "dbserver1",
"schema.include.list": "public",
"table.include.list": "public.users,public.products,public.orders,public.order_items",
"transforms": "route,unwrap,addMetadata",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc.$3",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
"transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField",
"transforms.addMetadata.timestamp.field": "event_timestamp",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none",
"snapshot.fetch.size": "10000",
"heartbeat.interval.ms": "10000",
"tombstones.on.delete": "true",
"producer.batch.size": "16384",
"producer.buffer.memory": "33554432",
"producer.linger.ms": "50",
"producer.compression.type": "snappy",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance": "none"
}
}
Step 2: Kafka Connect Snowflake Sink
// kafka-connect/snowflake-sink.json
{
"name": "snowflake-sink-connector",
"config": {
"connector.class": "com.snowflake.kafka.connect.SnowflakeSinkConnector",
"topics": "cdc.users,cdc.products,cdc.orders,cdc.order_items",
"snowflake.ingestion.method": "KAFKA_CONNECT_SNOWPIPE",
"buffer.count.records": "10000",
"buffer.flush.time.sec": "60",
"buffer.size.bytes": "5000000",
"snowflake.database.name": "ECOMMERCE_DW",
"snowflake.schema.name": "RAW",
"snowflake.table.name": "${topic}",
"snowflake.topic2table.map": "cdc.users:USERS,cdc.products:PRODUCTS,cdc.orders:ORDERS,cdc.order_items:ORDER_ITEMS",
"snowflake.account": "${env:SNOWFLAKE_ACCOUNT}",
"snowflake.user": "${env:SNOWFLAKE_USER}",
"snowflake.private.key": "${env:SNOWFLAKE_PRIVATE_KEY}",
"snowflake.private.key.passphrase": "${env:SNOWFLAKE_PASSPHRASE}",
"snowflake.role": "KAFKA_CONNECT_ROLE",
"snowflake.warehouse": "COMPUTE_WH",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "addMetadata,extractKey",
"transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField",
"transforms.addMetadata.timestamp.field": "_cdc_timestamp",
"transforms.addMetadata.operation.field": "_cdc_operation",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "user_id",
"errors.log.enable": "true",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-snowflake",
"errors.deadletterqueue.topic.replication.factor": 3
}
}
Step 3: Snowflake Transformation Pipeline
# snowflake/transformations.py
import snowflake.connector
from snowflake.connector import DictCursor
import json
from datetime import datetime
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SnowflakeCDCManager:
def __init__(self, config: Dict):
self.config = config
self.conn = None
def connect(self):
"""Establish connection to Snowflake."""
self.conn = snowflake.connector.connect(
user=self.config['user'],
password=self.config['password'],
account=self.config['account'],
warehouse=self.config['warehouse'],
database=self.config['database'],
schema=self.config['schema'],
role=self.config.get('role', 'SYSADMIN')
)
logger.info("Connected to Snowflake")
def process_cdc_events(self):
"""Process CDC events from raw to staging."""
cursor = self.conn.cursor(DictCursor)
try:
# Process user changes
cursor.execute("""
INSERT INTO STAGING.USERS (
user_id, email, full_name, phone, city, state, country,
status, created_at, updated_at, is_current, valid_from, valid_to
)
SELECT
r.user_id,
r.email,
CONCAT(r.first_name, ' ', r.last_name) as full_name,
r.phone,
r.address:city::STRING as city,
r.address:state::STRING as state,
r.address:country::STRING as country,
r.status,
r.created_at,
r.updated_at,
CASE WHEN r._cdc_operation = 'd' THEN FALSE ELSE TRUE END as is_current,
r._cdc_timestamp as valid_from,
CASE WHEN r._cdc_operation = 'd' THEN r._cdc_timestamp ELSE NULL END as valid_to
FROM RAW.USERS r
LEFT JOIN STAGING.USERS s ON r.user_id = s.user_id
WHERE r._ingestion_timestamp > COALESCE(s.updated_at, '1900-01-01'::TIMESTAMP_NTZ)
AND r._cdc_operation IN ('c', 'u', 'd')
""")
# Process order changes
cursor.execute("""
INSERT INTO STAGING.ORDERS (
order_id, user_id, order_number, status, total_amount,
item_count, shipping_city, shipping_state, shipping_country,
created_at, updated_at, shipped_at, delivered_at, delivery_days
)
SELECT
r.order_id,
r.user_id,
r.order_number,
r.status,
r.total_amount,
COALESCE(item_counts.item_count, 0) as item_count,
r.shipping_address:city::STRING as shipping_city,
r.shipping_address:state::STRING as shipping_state,
r.shipping_address:country::STRING as shipping_country,
r.created_at,
r.updated_at,
r.shipped_at,
r.delivered_at,
CASE
WHEN r.delivered_at IS NOT NULL
THEN DATEDIFF('day', r.created_at, r.delivered_at)
ELSE NULL
END as delivery_days
FROM RAW.ORDERS r
LEFT JOIN (
SELECT order_id, COUNT(*) as item_count
FROM RAW.ORDER_ITEMS
GROUP BY order_id
) item_counts ON r.order_id = item_counts.order_id
LEFT JOIN STAGING.ORDERS s ON r.order_id = s.order_id
WHERE r._ingestion_timestamp > COALESCE(s.updated_at, '1900-01-01'::TIMESTAMP_NTZ)
AND r._cdc_operation IN ('c', 'u', 'd')
""")
logger.info("CDC events processed successfully")
except Exception as e:
logger.error(f"Error processing CDC events: {e}")
raise
def compute_analytics(self):
"""Compute analytics metrics from staged data."""
cursor = self.conn.cursor(DictCursor)
try:
# Daily sales metrics
cursor.execute("""
MERGE INTO ANALYTICS.DAILY_SALES t
USING (
SELECT
DATE(o.created_at) as sale_date,
oi.product_id,
p.category,
SUM(oi.total_price) as total_revenue,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(oi.quantity) as total_items,
AVG(o.total_amount) as avg_order_value,
COUNT(DISTINCT o.user_id) as unique_customers
FROM STAGING.ORDERS o
JOIN RAW.ORDER_ITEMS oi ON o.order_id = oi.order_id
JOIN STAGING.PRODUCTS p ON oi.product_id = p.product_id
WHERE DATE(o.created_at) >= CURRENT_DATE() - 1
GROUP BY 1, 2, 3
) s
ON t.sale_date = s.sale_date
AND t.product_id = s.product_id
WHEN MATCHED THEN UPDATE SET
t.total_revenue = s.total_revenue,
t.total_orders = s.total_orders,
t.total_items = s.total_items,
t.avg_order_value = s.avg_order_value,
t.unique_customers = s.unique_customers,
t._computed_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (
sale_date, product_id, category, total_revenue,
total_orders, total_items, avg_order_value,
unique_customers, _computed_at
) VALUES (
s.sale_date, s.product_id, s.category, s.total_revenue,
s.total_orders, s.total_items, s.avg_order_value,
s.unique_customers, CURRENT_TIMESTAMP()
)
""")
# Customer segments
cursor.execute("""
MERGE INTO ANALYTICS.CUSTOMER_SEGMENTS t
USING (
SELECT
u.user_id,
CASE
WHEN COUNT(o.order_id) >= 10 THEN 'VIP'
WHEN COUNT(o.order_id) >= 5 THEN 'Loyal'
WHEN COUNT(o.order_id) >= 2 THEN 'Repeat'
ELSE 'One-time'
END as segment,
SUM(o.total_amount) as lifetime_value,
COUNT(o.order_id) as order_count,
AVG(o.total_amount) as avg_order_value,
MAX(DATE(o.created_at)) as last_order_date
FROM STAGING.USERS u
LEFT JOIN STAGING.ORDERS o ON u.user_id = o.user_id
WHERE u.is_current = TRUE
GROUP BY u.user_id
) s
ON t.user_id = s.user_id
WHEN MATCHED THEN UPDATE SET
t.segment = s.segment,
t.lifetime_value = s.lifetime_value,
t.order_count = s.order_count,
t.avg_order_value = s.avg_order_value,
t.last_order_date = s.last_order_date,
t._computed_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (
user_id, segment, lifetime_value, order_count,
avg_order_value, last_order_date, _computed_at
) VALUES (
s.user_id, s.segment, s.lifetime_value, s.order_count,
s.avg_order_value, s.last_order_date, CURRENT_TIMESTAMP()
)
""")
logger.info("Analytics computed successfully")
except Exception as e:
logger.error(f"Error computing analytics: {e}")
raise
def monitor_cdc_lag(self) -> Dict:
"""Monitor CDC replication lag."""
cursor = self.conn.cursor(DictCursor)
try:
cursor.execute("""
SELECT
table_name,
MAX(_cdc_timestamp) as latest_cdc_timestamp,
DATEDIFF('second', MAX(_cdc_timestamp), CURRENT_TIMESTAMP()) as lag_seconds,
COUNT(*) as total_records,
COUNT(CASE WHEN _cdc_operation = 'c' THEN 1 END) as inserts,
COUNT(CASE WHEN _cdc_operation = 'u' THEN 1 END) as updates,
COUNT(CASE WHEN _cdc_operation = 'd' THEN 1 END) as deletes
FROM RAW.USERS
WHERE _ingestion_timestamp >= CURRENT_DATE()
GROUP BY table_name
""")
results = cursor.fetchall()
return {
'tables': results,
'monitoring_timestamp': datetime.now().isoformat(),
'status': 'healthy' if all(r['lag_seconds'] < 300 for r in results) else 'lagging'
}
except Exception as e:
logger.error(f"Error monitoring CDC lag: {e}")
raise
def handle_schema_evolution(self, table_name: str, new_columns: List[Dict]):
"""Handle schema evolution for CDC tables."""
cursor = self.conn.cursor()
try:
# Get current table schema
cursor.execute(f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'RAW'
AND table_name = '{table_name}'
ORDER BY ordinal_position
""")
current_columns = {row[0]: row[1] for row in cursor.fetchall()}
# Add new columns
for col in new_columns:
if col['name'] not in current_columns:
cursor.execute(f"""
ALTER TABLE RAW.{table_name}
ADD COLUMN {col['name']} {col['type']}
""")
logger.info(f"Added column {col['name']} to {table_name}")
# Update downstream schemas if needed
if table_name == 'USERS':
self._evolve_users_schema(new_columns)
elif table_name == 'ORDERS':
self._evolve_orders_schema(new_columns)
logger.info(f"Schema evolution completed for {table_name}")
except Exception as e:
logger.error(f"Error evolving schema: {e}")
raise
def _evolve_users_schema(self, new_columns: List[Dict]):
"""Evolve users staging schema."""
cursor = self.conn.cursor()
for col in new_columns:
try:
cursor.execute(f"""
ALTER TABLE STAGING.USERS
ADD COLUMN {col['name']} {col['type']}
""")
except Exception as e:
if "already exists" not in str(e):
raise
def _evolve_orders_schema(self, new_columns: List[Dict]):
"""Evolve orders staging schema."""
cursor = self.conn.cursor()
for col in new_columns:
try:
cursor.execute(f"""
ALTER TABLE STAGING.ORDERS
ADD COLUMN {col['name']} {col['type']}
""")
except Exception as e:
if "already exists" not in str(e):
raise
Infrastructure Setup (Terraform)
# infrastructure/snowflake_cdc.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
snowflake = {
source = "Snowflake-Labs/snowflake"
version = "~> 0.89"
}
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
confluent = {
source = "confluentinc/confluent"
version = "~> 1.50.0"
}
}
}
# Variables
variable "snowflake_account" {
description = "Snowflake account identifier"
type = string
}
variable "snowflake_user" {
description = "Snowflake user"
type = string
sensitive = true
}
variable "snowflake_password" {
description = "Snowflake password"
type = string
sensitive = true
}
variable "kafka_cluster_id" {
description = "Confluent Kafka cluster ID"
type = string
}
# Snowflake Provider
provider "snowflake" {
account = var.snowflake_account
user = var.snowflake_user
password = var.snowflake_password
role = "SYSADMIN"
}
# Snowflake Database
resource "snowflake_database" "ecommerce_dw" {
name = "ECOMMERCE_DW"
comment = "E-commerce data warehouse with CDC"
}
# Snowflake Schemas
resource "snowflake_schema" "raw" {
database = snowflake_database.ecommerce_dw.name
name = "RAW"
comment = "Raw CDC data layer"
}
resource "snowflake_schema" "staging" {
database = snowflake_database.ecommerce_dw.name
name = "STAGING"
comment = "Staged and cleaned data"
}
resource "snowflake_schema" "analytics" {
database = snowflake_database.ecommerce_dw.name
name = "ANALYTICS"
comment = "Business analytics layer"
}
# Snowflake Warehouse
resource "snowflake_warehouse" "compute" {
name = "COMPUTE_WH"
comment = "Compute warehouse for CDC processing"
warehouse_size = "medium"
auto_suspend = 60
auto_resume = true
scaling_policy = "ECONOMY"
min_cluster_count = 1
max_cluster_count = 3
resource_monitor = snowflake_resource_monitor.cdc_monitor.name
}
# Resource Monitor
resource "snowflake_resource_monitor" "cdc_monitor" {
name = "CDC_RESOURCE_MONITOR"
credit_quota = 100
frequency = "MONTHLY"
start_timestamp = "IMMEDIATELY"
notify_triggers = [80, 90, 100]
suspend_triggers = [100]
suspend_immediate_triggers = [110]
}
# Snowflake Roles
resource "snowflake_role" "kafka_connect" {
name = "KAFKA_CONNECT_ROLE"
comment = "Role for Kafka Connect Snowflake Sink"
}
# Grant permissions
resource "snowflake_grant_privileges_to_role" "kafka_connect_raw" {
role_name = snowflake_role.kafka_connect.name
privileges = ["USAGE", "CREATE TABLE", "INSERT", "UPDATE", "DELETE"]
on_account_object {
object_type = "SCHEMA"
object_name = "${snowflake_database.ecommerce_dw.name}.${snowflake_schema.raw.name}"
}
}
resource "snowflake_grant_privileges_to_role" "kafka_connect_staging" {
role_name = snowflake_role.kafka_connect.name
privileges = ["USAGE", "CREATE TABLE", "INSERT", "UPDATE", "DELETE"]
on_account_object {
object_type = "SCHEMA"
object_name = "${snowflake_database.ecommerce_dw.name}.${snowflake_schema.staging.name}"
}
}
# API Integration for external functions
resource "snowflake_api_integration" "cdc_integration" {
name = "CDC_API_INTEGRATION"
api_provider = "AWS_API_GATEWAY"
api_allowed_prefixes = ["https://*.execute-api.us-east-1.amazonaws.com/"]
enabled = true
}
# External Functions for CDC monitoring
resource "snowflake_external_function" "cdc_health_check" {
name = "CDC_HEALTH_CHECK"
database = snowflake_database.ecommerce_dw.name
schema = snowflake_schema.raw.name
return = "VARIANT"
api_integration = snowflake_api_integration.cdc_integration.name
arguments {
name = "TABLE_NAME"
type = "VARCHAR"
}
max_batch_rows = 1
comment = "CDC health check function"
}
# Confluent Kafka Cluster
resource "confluent_kafka_cluster" "cdc_cluster" {
display_name = "cdc-kafka-cluster"
availability = "HIGH"
cloud = "AWS"
region = "us-east-1"
basic {}
environment {
id = confluent_environment.cdc_environment.id
}
}
resource "confluent_environment" "cdc_environment" {
display_name = "cdc-environment"
description = "Environment for CDC pipeline"
}
# Kafka Topics for CDC
resource "confluent_kafka_topic" "cdc_topics" {
for_each = toset(["users", "products", "orders", "order_items"])
topic_name = "cdc.${each.key}"
partitions = 6
replication_factor = 3
config = {
"cleanup.policy" = "compact"
"delete.retention.ms" = "86400000"
"min.compaction.lag.ms" = "60000"
"retention.ms" = "604800000"
}
environment {
id = confluent_environment.cdc_environment.id
}
kafka_cluster {
id = confluent_kafka_cluster.cdc_cluster.id
}
}
# Service Account for Debezium
resource "confluent_service_account" "debezium" {
display_name = "debezium-service-account"
description = "Service account for Debezium CDC connector"
}
resource "confluent_role_binding" "debezium" {
principal = "User:${confluent_service_account.debezium.id}"
role_name = "CloudManager"
crn_pattern = confluent_kafka_cluster.cdc_cluster.rbac_crn
}
# API Key for Debezium
resource "confluent_api_key" "debezium" {
display_name = "debezium-api-key"
description = "API Key for Debezium connector"
managed_resource {
id = confluent_kafka_cluster.cdc_cluster.id
environment {
id = confluent_environment.cdc_environment.id
}
}
depends_on = [
confluent_role_binding.debezium
]
}
# Snowflake Sink Connector
resource "confluent_connector" "snowflake_sink" {
environment {
id = confluent_environment.cdc_environment.id
}
kafka_cluster {
id = confluent_kafka_cluster.cdc_cluster.id
}
config_nonsensitive {
connector_class = "com.snowflake.kafka.connect.SnowflakeSinkConnector"
topics = join(",", [
for topic in confluent_kafka_topic.cdc_topics : topic.topic_name
])
snowflake_ingestion_method = "KAFKA_CONNECT_SNOWPIPE"
snowflake_database_name = snowflake_database.ecommerce_dw.name
snowflake_schema_name = snowflake_schema.raw.name
snowflake_account = var.snowflake_account
key_converter = "io.confluent.connect.avro.AvroConverter"
value_converter = "io.confluent.connect.avro.AvroConverter"
buffer_count_records = "10000"
buffer_flush_time_sec = "60"
errors_log_enable = "true"
errors_tolerance = "all"
errors_deadletterqueue_topic_name = "dlq-snowflake"
}
depends_on = [
confluent_kafka_topic.cdc_topics
]
}
# Outputs
output "snowflake_database" {
description = "Snowflake database name"
value = snowflake_database.ecommerce_dw.name
}
output "kafka_cluster_id" {
description = "Kafka cluster ID"
value = confluent_kafka_cluster.cdc_cluster.id
}
output "cdc_topics" {
description = "CDC topic names"
value = [for topic in confluent_kafka_topic.cdc_topics : topic.topic_name]
}
Testing and Validation
# tests/test_cdc_pipeline.py
import pytest
import json
import time
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock
from confluent_kafka import Producer, Consumer, KafkaError
import snowflake.connector
class TestCDCPipeline:
@pytest.fixture
def kafka_config(self):
return {
'bootstrap.servers': 'localhost:9092',
'group.id': 'test-cdc-group',
'auto.offset.reset': 'earliest'
}
@pytest.fixture
def snowflake_config(self):
return {
'account': 'test-account',
'user': 'test-user',
'password': 'test-password',
'database': 'ECOMMERCE_DW',
'schema': 'RAW'
}
def test_debezium_connector_config(self):
"""Test Debezium connector configuration validity."""
config = {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.dbname": "testdb",
"database.server.name": "testserver",
"plugin.name": "pgoutput",
"slot.name": "test_slot",
"publication.name": "test_publication",
"topic.prefix": "test",
"table.include.list": "public.users"
}
# Validate required fields
required_fields = [
'connector.class', 'database.hostname', 'database.port',
'database.dbname', 'database.server.name', 'plugin.name'
]
for field in required_fields:
assert field in config, f"Missing required field: {field}"
# Validate connector class
assert config['connector.class'] == 'io.debezium.connector.postgresql.PostgresConnector'
# Validate plugin
valid_plugins = ['pgoutput', 'wal2json', 'decoderbufs']
assert config['plugin.name'] in valid_plugins
@patch('confluent_kafka.Producer')
def test_kafka_topic_creation(self, mock_producer):
"""Test Kafka topic creation for CDC."""
from kafka.admin import AdminClient, NewTopic
admin_config = {'bootstrap.servers': 'localhost:9092'}
# Mock admin client
mock_admin = Mock()
mock_admin.create_topics.return_value = []
# Test topic configuration
topics = [
NewTopic('cdc.users', num_partitions=6, replication_factor=3),
NewTopic('cdc.products', num_partitions=6, replication_factor=3),
NewTopic('cdc.orders', num_partitions=6, replication_factor=3)
]
# Validate topic configs
for topic in topics:
assert topic.num_partitions == 6
assert topic.replication_factor == 3
def test_cdc_event_structure(self):
"""Test CDC event structure matches expected format."""
cdc_event = {
"schema": {
"type": "struct",
"fields": [
{"type": "string", "field": "user_id", "optional": False},
{"type": "string", "field": "email", "optional": False},
{"type": "string", "field": "first_name", "optional": True},
{"type": "string", "field": "last_name", "optional": True},
{"type": "int64", "field": "updated_at", "optional": False}
]
},
"payload": {
"user_id": "test-user-123",
"email": "test@example.com",
"first_name": "Test",
"last_name": "User",
"updated_at": int(datetime.now().timestamp() * 1000)
}
}
# Validate schema structure
assert cdc_event['schema']['type'] == 'struct'
assert len(cdc_event['schema']['fields']) > 0
# Validate payload matches schema
for field in cdc_event['schema']['fields']:
assert field['field'] in cdc_event['payload']
@patch('snowflake.connector.connect')
def test_snowflake_raw_insert(self, mock_connect):
"""Test inserting CDC events into Snowflake raw table."""
mock_conn = Mock()
mock_cursor = Mock()
mock_connect.return_value = mock_conn
mock_conn.cursor.return_value = mock_cursor
# Simulate insert
insert_sql = """
INSERT INTO RAW.USERS (
user_id, email, first_name, last_name,
_cdc_operation, _cdc_timestamp, _ingestion_timestamp
) VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP())
"""
test_data = (
"user-123", "test@example.com", "Test", "User",
"c", datetime.now(), datetime.now()
)
mock_cursor.execute(insert_sql, test_data)
# Verify execute was called
mock_cursor.execute.assert_called_once()
def test_snowpipe_loading(self):
"""Test Snowpipe configuration for CDC loading."""
snowpipe_config = {
"PIPE_NAME": "CDC_USERS_PIPE",
"PIPETABLE": "RAW.USERS",
"PIPSOURCE": "s3://cdc-bucket/users/",
"FILE_FORMAT": "CSV",
"ON_ERROR": "CONTINUE"
}
# Validate Snowpipe SQL
pipe_sql = f"""
CREATE OR REPLACE PIPE {snowpipe_config['PIPE_NAME']}
AUTO_INGEST = TRUE
AS
COPY INTO {snowpipe_config['PIPETABLE']}
FROM {snowpipe_config['PIPSOURCE']}
FILE_FORMAT = (TYPE = {snowpipe_config['FILE_FORMAT']})
ON_ERROR = {snowpipe_config['ON_ERROR']}
"""
assert "CREATE OR REPLACE PIPE" in pipe_sql
assert "AUTO_INGEST = TRUE" in pipe_sql
def test_data_quality_checks(self):
"""Test CDC data quality validation."""
quality_checks = [
{
"check_name": "null_check",
"table": "RAW.USERS",
"column": "user_id",
"sql": "SELECT COUNT(*) FROM RAW.USERS WHERE user_id IS NULL",
"expected": 0
},
{
"check_name": "duplicate_check",
"table": "RAW.USERS",
"column": "user_id",
"sql": "SELECT COUNT(*) - COUNT(DISTINCT user_id) FROM RAW.USERS",
"expected": 0
},
{
"check_name": "freshness_check",
"table": "RAW.USERS",
"column": "_cdc_timestamp",
"sql": "SELECT DATEDIFF('minute', MAX(_cdc_timestamp), CURRENT_TIMESTAMP()) FROM RAW.USERS",
"expected_max": 5
}
]
for check in quality_checks:
assert 'check_name' in check
assert 'sql' in check
assert 'expected' in check or 'expected_max' in check
def test_schema_evolution(self):
"""Test schema evolution handling."""
# Current schema
current_schema = {
"user_id": "VARCHAR(36)",
"email": "VARCHAR(255)",
"first_name": "VARCHAR(100)",
"last_name": "VARCHAR(100)"
}
# New schema with added column
new_schema = {
"user_id": "VARCHAR(36)",
"email": "VARCHAR(255)",
"first_name": "VARCHAR(100)",
"last_name": "VARCHAR(100)",
"phone": "VARCHAR(20)" # New column
}
# Find new columns
new_columns = set(new_schema.keys()) - set(current_schema.keys())
assert len(new_columns) == 1
assert "phone" in new_columns
# Generate ALTER TABLE statement
for col in new_columns:
alter_sql = f"ALTER TABLE RAW.USERS ADD COLUMN {col} {new_schema[col]}"
assert "ALTER TABLE" in alter_sql
assert col in alter_sql
def test_cdc_lag_monitoring(self):
"""Test CDC lag monitoring and alerting."""
# Simulate lag metrics
lag_metrics = {
"users_table": {
"lag_seconds": 30,
"records_per_minute": 1500,
"last_update": datetime.now().isoformat()
},
"orders_table": {
"lag_seconds": 45,
"records_per_minute": 2500,
"last_update": datetime.now().isoformat()
}
}
# Define alert thresholds
thresholds = {
"lag_seconds_warning": 60,
"lag_seconds_critical": 300,
"records_per_minute_min": 100
}
# Check for alerts
alerts = []
for table, metrics in lag_metrics.items():
if metrics['lag_seconds'] > thresholds['lag_seconds_critical']:
alerts.append(f"CRITICAL: {table} lag exceeds {thresholds['lag_seconds_critical']}s")
elif metrics['lag_seconds'] > thresholds['lag_seconds_warning']:
alerts.append(f"WARNING: {table} lag exceeds {thresholds['lag_seconds_warning']}s")
# Verify alert logic
assert len(alerts) == 0 # Should be healthy
def test_exactly_once_semantics(self):
"""Test exactly-once processing semantics."""
# Simulate Kafka consumer with idempotent processing
processed_offsets = set()
def process_event(event_id, offset):
if offset not in processed_offsets:
# Process event
processed_offsets.add(offset)
return True
return False
# Test idempotency
assert process_event("event1", 100) == True
assert process_event("event1", 100) == False # Should be idempotent
assert process_event("event2", 101) == True
def test_dead_letter_queue(self):
"""Test dead letter queue handling."""
# Simulate DLQ processing
dlq_events = [
{
"original_topic": "cdc.users",
"error": "Schema validation failed",
"event": {"user_id": "test", "email": None},
"timestamp": datetime.now().isoformat()
}
]
# Validate DLQ structure
for event in dlq_events:
assert 'original_topic' in event
assert 'error' in event
assert 'event' in event
assert 'timestamp' in event
# Process DLQ events
processed_count = 0
for event in dlq_events:
if event['error'] == 'Schema validation failed':
# Fix and reprocess
processed_count += 1
assert processed_count == 1
Cost Analysis
Monthly Cost Breakdown (Production)
| Component | Specification | Monthly Cost |
|---|---|---|
| Snowflake | Medium warehouse, 24/7 | $2,000 |
| Confluent Cloud | Standard cluster | $500 |
| Debezium | Self-hosted on EC2 | $200 |
| EC2 (Kafka Connect) | 3x m5.xlarge | $450 |
| S3 Storage | 1TB CDC logs | $23 |
| Data Transfer | Cross-region | $100 |
| Monitoring | Datadog | $150 |
| Total | $3,423 |
Cost Optimization Strategies
💡
Tip: Optimize CDC costs with these strategies:
- Snowflake Auto-Suspend: Suspend warehouse when idle
- Snowpipe Batching: Batch small files for efficient loading
- Compression: Use Avro/Parquet for 70% size reduction
- Partition Pruning: Query only relevant partitions
- Resource Monitors: Set spending limits
Performance Metrics
| Metric | Before CDC | After CDC | Improvement |
|---|---|---|---|
| Data Freshness | 4 hours | < 1 minute | 240x faster |
| Query Latency | 30 seconds | 2 seconds | 15x faster |
| ETL Maintenance | 20 hrs/week | 2 hrs/week | 90% reduction |
| Data Quality Issues | 50/month | 5/month | 90% reduction |
Interview Talking Points
Architecture Decisions
ℹ️
Best Practice: Focus on these CDC concepts in interviews:
-
Why Debezium over custom CDC?
- Battle-tested with production workloads
- Supports multiple databases (PG, MySQL, MongoDB)
- Handles schema evolution automatically
- Active community and support
-
Why Snowpipe over COPY?
- Automatic micro-batching
- Better for streaming workloads
- Reduced latency (< 1 minute)
- Cost-efficient for small files
-
Why exactly-once semantics?
- Critical for financial data
- Prevents duplicate transactions
- Maintains data consistency
- Requires idempotent processing
Common Interview Questions
Q: "How do you handle out-of-order events in CDC?"
out_of_order_handling = {
"Event Time": "Use source timestamp, not ingestion time",
"Watermarking": "Track maximum timestamp with allowed lateness",
"Deduplication": "Use event ID for idempotent processing",
"Partitioning": "Partition by source key for ordering"
}
Q: "How do you handle schema changes in source database?"
schema_evolution_strategy = {
"Forward Compatibility": "New schema can read old data",
"Backward Compatibility": "Old schema can read new data",
"Full Compatibility": "Both directions work",
"Breaking Changes": "Requires connector restart and snapshot"
}
Q: "How do you monitor CDC pipeline health?"
monitoring_metrics = {
"Lag Metrics": "Replication lag per table",
"Throughput": "Events processed per second",
"Error Rate": "Failed events percentage",
"Data Quality": "Null rates, duplicates, freshness"
}
Deployment Checklist
- Configure PostgreSQL for logical replication
- Deploy Debezium connector with proper configuration
- Set up Kafka Connect cluster
- Configure Snowflake sink connector
- Create Snowflake schemas and tables
- Set up Snowpipe for automated loading
- Implement data quality checks
- Configure monitoring and alerting
- Test failover and recovery procedures
- Document operational runbooks
⚠️
Warning: Always test CDC configuration in staging before production. Incorrect configuration can cause data loss or performance issues.
This project demonstrates real-time data warehousing skills and is highly relevant for data engineering interviews at companies with real-time analytics requirements.