🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Cost-Optimized Data Warehouse on AWS Redshift

Data Engineering ProjectsCloud Data Warehousing⭐ Premium

Advertisement

Cost-Optimized Data Warehouse on Redshift

Performance Tuning + Compression + WLM + Serverless

ℹ️

Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS Build a production data warehouse on Amazon Redshift optimized for both performance and cost, achieving 60-70% savings over naive configurations.

Project Overview

Problem Statement

Data warehouses are expensive to operate. Without proper optimization, organizations overspend on compute resources while experiencing poor query performance. A well-tuned Redshift cluster can deliver enterprise-grade analytics at a fraction of the cost.

Objectives

  1. Achieve 60%+ cost reduction vs baseline
  2. Maintain sub-second query performance for dashboards
  3. Support 100+ concurrent users
  4. Implement automated scaling and workload management
  5. Enable seamless data loading from multiple sources

Tech Stack

ComponentTechnologyPurpose
WarehouseAmazon RedshiftCore analytics engine
ETLAWS GlueData integration
StorageS3 + Redshift SpectrumData lake extension
MonitoringCloudWatch + Redshift ConsolePerformance monitoring
SecurityAWS KMS + IAMData protection

Architecture Diagram

DATA SOURCESRDS/Aurora (OLTP)S3 Data Lake (Files)Kinesis (Streaming)Third-Party (Fivetran)INGESTION LAYERAWS Glue ETL (Complex ETL)COPY Command (Bulk Load)Streaming Ingestion (Kinesis)AMAZON REDSHIFTLeader Node (Query Planning)Compute Nodes (Execution)Node Slices (Distribution)WORKLOAD MANAGEMENTConcurrency ScalingQuery QueuesResource MonitoringOPTIMIZATION LAYERSCompression EncodingDistribution StylesSort Keys (Interleaved)CONSUMERSBI Tools (QuickSight)SQL Clients (DBeaver)Applications (Lambda)Redshift Spectrum

Data Source Setup and Schema

Optimized Redshift Schema

-- schemas/optimized_schema.sql
-- Dimension Tables with optimal distribution

CREATE TABLE IF NOT EXISTS dim_customers (
    customer_key         BIGINT IDENTITY(1,1) DISTKEY,
    customer_id          VARCHAR(50) ENCODE raw,
    email                VARCHAR(255) ENCODE lzo,
    full_name            VARCHAR(201) ENCODE lzo,
    company_name         VARCHAR(255) ENCODE lzo,
    industry             VARCHAR(100) ENCODE bytedict,
    segment              VARCHAR(50) ENCODE bytedict,
    city                 VARCHAR(100) ENCODE lzo,
    state                VARCHAR(50) ENCODE bytedict,
    country              VARCHAR(50) ENCODE bytedict,
    created_at           TIMESTAMP ENCODE lzo,
    updated_at           TIMESTAMP ENCODE lzo,
    is_active            BOOLEAN ENCODE runlength,
    lifetime_value       DECIMAL(12,2) ENCODE az64,
    PRIMARY KEY (customer_id)
)
DISTSTYLE KEY
SORTKEY (created_at, customer_id);

-- Product dimension with even distribution
CREATE TABLE IF NOT EXISTS dim_products (
    product_key          BIGINT IDENTITY(1,1) DISTKEY,
    product_id           VARCHAR(50) ENCODE raw,
    sku                  VARCHAR(50) ENCODE lzo,
    name                 VARCHAR(255) ENCODE lzo,
    category             VARCHAR(100) ENCODE bytedict,
    subcategory          VARCHAR(100) ENCODE bytedict,
    brand                VARCHAR(100) ENCODE lzo,
    price                DECIMAL(10,2) ENCODE az64,
    cost                 DECIMAL(10,2) ENCODE az64,
    weight_kg            DECIMAL(8,3) ENCODE az64,
    is_active            BOOLEAN ENCODE runlength,
    created_at           TIMESTAMP ENCODE lzo,
    updated_at           TIMESTAMP ENCODE lzo,
    PRIMARY KEY (product_id)
)
DISTSTYLE ALL
SORTKEY (category, product_id);

-- Large fact table with key distribution
CREATE TABLE IF NOT EXISTS fct_orders (
    order_key            BIGINT IDENTITY(1,1) DISTKEY,
    order_id             VARCHAR(50) ENCODE raw,
    customer_key         BIGINT ENCODE az64,
    order_date           DATE ENCODE lzo,
    ship_date            DATE ENCODE lzo,
    status               VARCHAR(30) ENCODE bytedict,
    subtotal             DECIMAL(12,2) ENCODE az64,
    tax_amount           DECIMAL(10,2) ENCODE az64,
    shipping_amount      DECIMAL(10,2) ENCODE az64,
    discount_amount      DECIMAL(10,2) ENCODE az64,
    total_amount         DECIMAL(12,2) ENCODE az64,
    currency             VARCHAR(3) ENCODE bytedict,
    payment_method       VARCHAR(50) ENCODE bytedict,
    shipping_address     VARCHAR(500) ENCODE lzo,
    created_at           TIMESTAMP ENCODE lzo,
    updated_at           TIMESTAMP ENCODE lzo
)
DISTSTYLE KEY
COMPOUND SORTKEY (order_date, status, customer_key);

-- Order items with interleaved sort key
CREATE TABLE IF NOT EXISTS fct_order_items (
    order_item_key       BIGINT IDENTITY(1,1) DISTKEY,
    order_id             VARCHAR(50) ENCODE raw,
    product_key          BIGINT ENCODE az64,
    quantity             INTEGER ENCODE az64,
    unit_price           DECIMAL(10,2) ENCODE az64,
    discount             DECIMAL(5,2) ENCODE az64,
    line_total           DECIMAL(12,2) ENCODE az64,
    created_at           TIMESTAMP ENCODE lzo
)
DISTSTYLE KEY
INTERLEAVED SORTKEY (order_id, product_key);

-- Daily aggregated metrics (materialized)
CREATE TABLE IF NOT EXISTS agg_daily_sales (
    metric_date          DATE ENCODE lzo,
    product_key          BIGINT ENCODE az64,
    customer_key         BIGINT ENCODE az64,
    category             VARCHAR(100) ENCODE bytedict,
    total_orders         INTEGER ENCODE az64,
    total_items          INTEGER ENCODE az64,
    total_revenue        DECIMAL(15,2) ENCODE az64,
    avg_order_value      DECIMAL(10,2) ENCODE az64,
    unique_customers     INTEGER ENCODE az64,
    created_at           TIMESTAMP ENCODE lzo
)
DISTSTYLE KEY
COMPOUND SORTKEY (metric_date, category);

-- Concurrency scaling cluster configuration
CREATE SERVERLESS_NAMESPACE IF NOT EXISTS analytics_namespace
    WITH (admin_username = 'admin', admin_user_password = 'CHANGE_ME');

-- Workload Management Queues
CREATE WLM CONFIGURATION wlm_config
    AUTO WLM;

Step-by-Step Implementation Guide

Step 1: Redshift Cluster Setup

# redshift/cluster_manager.py
import boto3
import json
import time
from typing import Dict, List, Optional
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class RedshiftClusterManager:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.redshift = boto3.client('redshift', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        self.iam = boto3.client('iam', region_name=region)

    def create_cluster(self, config: Dict) -> Dict:
        cluster_config = {
            'ClusterIdentifier': config['cluster_name'],
            'NodeType': config.get('node_type', 'dc2.large'),
            'MasterUsername': config['master_username'],
            'MasterUserPassword': config['master_password'],
            'NumberOfNodes': config.get('num_nodes', 2),
            'Encrypted': True,
            'KmsKeyId': config.get('kms_key_id'),
            'VpcSecurityGroupIds': config['security_group_ids'],
            'ClusterSubnetGroupName': config['subnet_group'],
            'PubliclyAccessible': False,
            'AutomatedSnapshotRetentionPeriod': config.get('snapshot_retention', 7),
            'ManualSnapshotRetentionPeriod': 35,
            'PreferredMaintenanceWindow': 'Sun:03:00-Sun:04:00',
            'ClusterParameterGroupName': config.get('parameter_group', 'default.redshift-8.0'),
            'IamRoles': config.get('iam_roles', []),
            'LoggingProperties': {
                'BucketName': config['log_bucket'],
                'S3KeyPrefix': 'redshift-logs/'
            }
        }

        if config.get('node_type', '').startswith('ra3'):
            cluster_config['NumberOfNodes'] = config.get('num_nodes', 2)
            cluster_config['ManagedStorage'] = True

        try:
            response = self.redshift.create_cluster(**cluster_config)
            logger.info(f"Creating cluster: {config['cluster_name']}")

            self._wait_for_cluster(config['cluster_name'], 'available')
            return response['Cluster']

        except Exception as e:
            logger.error(f"Failed to create cluster: {e}")
            raise

    def _wait_for_cluster(self, cluster_id: str, target_state: str,
                         timeout: int = 1800):
        start_time = time.time()
        while True:
            response = self.redshift.describe_clusters(ClusterIdentifier=cluster_id)
            state = response['Clusters'][0]['ClusterStatus']

            if state == target_state:
                logger.info(f"Cluster {cluster_id} is now {target_state}")
                return
            elif state in ['deleted', 'deleting']:
                raise Exception(f"Cluster entered unexpected state: {state}")

            elapsed = time.time() - start_time
            if elapsed > timeout:
                raise TimeoutError(f"Cluster not ready within {timeout} seconds")

            logger.info(f"Cluster state: {state} ({elapsed:.0f}s elapsed)")
            time.sleep(30)

    def get_cluster_metrics(self, cluster_id: str) -> Dict:
        response = self.redshift.describe_clusters(ClusterIdentifier=cluster_id)
        cluster = response['Clusters'][0]

        return {
            'cluster_id': cluster['ClusterIdentifier'],
            'node_type': cluster['NodeType'],
            'num_nodes': cluster['NumberOfNodes'],
            'status': cluster['ClusterStatus'],
            'endpoint': cluster.get('Endpoint', {}).get('Address'),
            'port': cluster.get('Endpoint', {}).get('Port', 5439),
            'creation_time': cluster['ClusterCreateTime'].isoformat(),
            'encrypted': cluster['Encrypted']
        }

    def resize_cluster(self, cluster_id: str, new_node_type: str,
                      new_num_nodes: int):
        try:
            self.redshift.modify_cluster(
                ClusterIdentifier=cluster_id,
                ClusterType='multi-node' if new_num_nodes > 1 else 'single-node',
                NodeType=new_node_type,
                NumberOfNodes=new_num_nodes
            )
            logger.info(f"Resizing cluster {cluster_id} to {new_node_type} x{new_num_nodes}")
            self._wait_for_cluster(cluster_id, 'available')
        except Exception as e:
            logger.error(f"Resize failed: {e}")
            raise

    def create_snapshot(self, cluster_id: str, snapshot_id: str) -> Dict:
        response = self.redshift.create_cluster_snapshot(
            SnapshotIdentifier=snapshot_id,
            ClusterIdentifier=cluster_id
        )
        logger.info(f"Created snapshot: {snapshot_id}")
        return response['Snapshot']

    def delete_cluster(self, cluster_id: str, skip_final_snapshot: bool = True):
        try:
            self.redshift.delete_cluster(
                ClusterIdentifier=cluster_id,
                SkipFinalClusterSnapshot=skip_final_snapshot,
                FinalClusterSnapshotIdentifier=f"{cluster_id}-final-{int(time.time())}"
            )
            logger.info(f"Deleting cluster: {cluster_id}")
        except Exception as e:
            logger.error(f"Delete failed: {e}")
            raise

    def setup_iam_role(self, role_name: str, s3_bucket: str) -> str:
        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "redshift.amazonaws.com"},
                "Action": "sts:AssumeRole"
            }]
        }

        try:
            role = self.iam.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy),
                Description="IAM role for Redshift S3 access"
            )

            self.iam.attach_role_policy(
                RoleName=role_name,
                PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
            )

            logger.info(f"Created IAM role: {role_name}")
            return role['Role']['Arn']

        except self.iam.exceptions.EntityAlreadyExistsException:
            role = self.iam.get_role(RoleName=role_name)
            return role['Role']['Arn']

    def setup_vpc_endpoint(self, vpc_id: str, route_table_ids: List[str]) -> str:
        ec2 = boto3.client('ec2', region_name=self.region)

        endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{self.region}.redshift',
            RouteTableIds=route_table_ids,
            PrivateDnsEnabled=True
        )

        logger.info(f"Created VPC endpoint: {endpoint['VpcEndpoint']['VpcEndpointId']}")
        return endpoint['VpcEndpoint']['VpcEndpointId']

Step 2: Performance Optimization

-- optimization/analyze_optimize.sql
-- Analyze table statistics
ANALYZE dim_customers;
ANALYZE dim_products;
ANALYZE fct_orders;
ANALYZE fct_order_items;

-- Vacuum to reclaim space and sort
VACUUM SORT ONLY dim_customers;
VACUUM SORT ONLY fct_orders;
VACUUM DELETE ONLY fct_orders;
VACUUM ANALYZE fct_order_items;

-- Update statistics
ANALYZE COMPRESSION dim_customers;
ANALYZE COMPRESSION fct_orders;

-- Create materialized views for common queries
CREATE MATERIALIZED VIEW mv_daily_sales_summary AS
SELECT
    order_date,
    COUNT(DISTINCT order_id) AS total_orders,
    SUM(total_amount) AS total_revenue,
    AVG(total_amount) AS avg_order_value,
    COUNT(DISTINCT customer_key) AS unique_customers
FROM fct_orders
WHERE order_date >= DATEADD('month', -3, CURRENT_DATE)
GROUP BY order_date
WITH NO DATA;

-- Refresh materialized view
REFRESH MATERIALIZED VIEW mv_daily_sales_summary;

-- Create statistics for query optimizer
CREATE STATISTICS stats_orders_date ON fct_orders (order_date);
CREATE STATISTICS stats_orders_customer ON fct_orders (customer_key);
CREATE STATISTICS stats_items_product ON fct_order_items (product_key);

-- Workload Management Configuration
CREATE WLM QUERY QUEUE reporting_queue
    WITH (concurrency=5, memory_percent_to_use=40);

CREATE WLM QUERY QUEUE etl_queue
    WITH (concurrency=3, memory_percent_to_use=50);

CREATE WLM QUERY QUEUE adhoc_queue
    WITH (concurrency=10, memory_percent_to_use=30);

-- Query monitoring rules
CREATE WLM QUERY MONITORING RULE rule_long_running
    WITH (query_execution_time=300000, action=Terminate);

CREATE WLM QUERY MONITORING RULE rule_high_memory
    WITH (query_execution_time=60000, action=Snapshot);
# optimization/performance_tuner.py
import boto3
import time
from typing import Dict, List, Tuple
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class RedshiftPerformanceTuner:
    def __init__(self, cluster_id: str, region: str = 'us-east-1'):
        self.cluster_id = cluster_id
        self.region = region
        self.redshift = boto3.client('redshift', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)

    def analyze_table_skew(self, connection, table_name: str) -> Dict:
        cursor = connection.cursor()

        skew_query = f"""
        SELECT
            stewardnodeid,
            COUNT(*) AS row_count,
            COUNT(*) * 100.0 / SUM(COUNT(*)) OVER() AS percentage
        FROM svv_table_info
        WHERE name = '{table_name}'
        GROUP BY stewardnodeid
        ORDER BY row_count DESC;
        """

        cursor.execute(skew_query)
        results = cursor.fetchall()

        total_rows = sum(r[1] for r in results)
        expected_per_node = total_rows / len(results) if results else 0

        skew_metrics = []
        for node_id, row_count, pct in results:
            skew = abs(row_count - expected_per_node) / expected_per_node * 100 if expected_per_node > 0 else 0
            skew_metrics.append({
                'node_id': node_id,
                'row_count': row_count,
                'percentage': pct,
                'skew_percent': skew
            })

        return {
            'table_name': table_name,
            'total_rows': total_rows,
            'num_nodes': len(results),
            'max_skew': max(m['skew_percent'] for m in skew_metrics) if skew_metrics else 0,
            'node_details': skew_metrics
        }

    def analyze_query_performance(self, connection, query: str) -> Dict:
        cursor = connection.cursor()

        explain_query = f"EXPLAIN {query}"
        cursor.execute(explain_query)
        explain_plan = cursor.fetchall()

        cursor.execute(query)
        start_time = time.time()
        results = cursor.fetchall()
        execution_time = time.time() - start_time

        return {
            'query': query,
            'execution_time_seconds': execution_time,
            'rows_returned': len(results),
            'explain_plan': explain_plan
        }

    def recommend_distribution_style(self, connection, table_name: str) -> str:
        cursor = connection.cursor()

        query = f"""
        SELECT
            COUNT(*) AS total_rows,
            COUNT(DISTINCT customer_key) AS distinct_values,
            COUNT(*) / NULLIF(COUNT(DISTINCT customer_key), 0) AS avg_rows_per_value
        FROM {table_name}
        WHERE customer_key IS NOT NULL;
        """

        cursor.execute(query)
        result = cursor.fetchone()

        total_rows = result[0]
        distinct_values = result[1]
        avg_rows_per_value = result[2]

        if total_rows < 100000:
            return 'ALL'
        elif avg_rows_per_value > 100000:
            return 'KEY'
        else:
            return 'EVEN'

    def recommend_sort_key(self, connection, table_name: str,
                          query_patterns: List[str]) -> Dict:
        cursor = connection.cursor()

        column_usage = {}
        for query in query_patterns:
            columns = self._extract_where_columns(query)
            for col in columns:
                column_usage[col] = column_usage.get(col, 0) + 1

        sorted_columns = sorted(column_usage.items(), key=lambda x: x[1], reverse=True)

        if len(sorted_columns) >= 2:
            sort_type = 'COMPOUND'
            sort_columns = [c[0] for c in sorted_columns[:4]]
        elif len(sorted_columns) == 1:
            sort_type = 'COMPOUND'
            sort_columns = [sorted_columns[0][0]]
        else:
            sort_type = 'COMPOUND'
            sort_columns = ['created_at']

        return {
            'table_name': table_name,
            'sort_type': sort_type,
            'sort_columns': sort_columns,
            'recommendation': f"{sort_type} SORTKEY ({', '.join(sort_columns)})"
        }

    def _extract_where_columns(self, query: str) -> List[str]:
        import re
        where_match = re.search(r'WHERE\s+(.+?)(?:ORDER|GROUP|LIMIT|$)', query, re.IGNORECASE)
        if not where_match:
            return []

        where_clause = where_match.group(1)
        columns = re.findall(r'(\w+)\s*(?:=|>|<|>=|<=|IN|LIKE|BETWEEN)', where_clause, re.IGNORECASE)
        return columns

    def get_cluster_utilization(self) -> Dict:
        end_time = time.time()
        start_time = end_time - 3600

        cpu_metrics = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/Redshift',
            MetricName='CPUUtilization',
            Dimensions=[{'Name': 'ClusterIdentifier', 'Value': self.cluster_id}],
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=['Average', 'Maximum']
        )

        storage_metrics = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/Redshift',
            MetricName='PercentageDiskSpaceUsed',
            Dimensions=[{'Name': 'ClusterIdentifier', 'Value': self.cluster_id}],
            StartTime=start_time,
            EndTime=end_time,
            Period=300,
            Statistics=['Average', 'Maximum']
        )

        return {
            'cpu': {
                'avg': cpu_metrics['Datapoints'][0]['Average'] if cpu_metrics['Datapoints'] else 0,
                'max': max(dp['Maximum'] for dp in cpu_metrics['Datapoints']) if cpu_metrics['Datapoints'] else 0
            },
            'storage': {
                'avg': storage_metrics['Datapoints'][0]['Average'] if storage_metrics['Datapoints'] else 0,
                'max': max(dp['Maximum'] for dp in storage_metrics['Datapoints']) if storage_metrics['Datapoints'] else 0
            }
        }

    def generate_optimization_report(self, connection) -> Dict:
        report = {
            'cluster_id': self.cluster_id,
            'timestamp': time.time(),
            'recommendations': [],
            'utilization': self.get_cluster_utilization()
        }

        cursor = connection.cursor()
        cursor.execute("""
            SELECT name FROM svv_table_info
            WHERE schema = 'public'
            ORDER BY size DESC
            LIMIT 10
        """)

        tables = [r[0] for r in cursor.fetchall()]

        for table in tables:
            skew = self.analyze_table_skew(connection, table)
            if skew['max_skew'] > 20:
                report['recommendations'].append({
                    'table': table,
                    'issue': 'high_skew',
                    'severity': 'high',
                    'detail': f"Skew of {skew['max_skew']:.1f}% detected"
                })

        if report['utilization']['cpu']['avg'] < 20:
            report['recommendations'].append({
                'type': 'right_sizing',
                'severity': 'medium',
                'detail': f"CPU utilization is only {report['utilization']['cpu']['avg']:.1f}% avg. Consider downsizing."
            })

        return report

Step 3: Cost Monitoring

# cost/cost_monitor.py
import boto3
from datetime import datetime, timedelta
from typing import Dict, List
import logging

logging.basicConfig(level.INFO)
logger = logging.getLogger(__name__)

class RedshiftCostMonitor:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.ce = boto3.client('ce', region_name='us-east-1')
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)

    def get_monthly_cost(self, cluster_id: str) -> Dict:
        end_date = datetime.now().strftime('%Y-%m-01')
        start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-01')

        response = self.ce.get_cost_and_usage(
            TimePeriod={'Start': start_date, 'End': end_date},
            Granularity='MONTHLY',
            Metrics=['UnblendedCost'],
            Filter={
                'Dimensions': {
                    'Key': 'SERVICE',
                    'Values': ['Amazon Redshift']
                }
            }
        )

        cost_data = response['ResultsByTime'][0]['Total']['UnblendedCost']

        return {
            'period': f"{start_date} to {end_date}",
            'total_cost': float(cost_data['Amount']),
            'currency': cost_data['Unit']
        }

    def estimate_cost_by_node_type(self) -> Dict:
        pricing = {
            'dc2.large': {'vcpu': 2, 'memory_gb': 15, 'hourly_cost': 0.25},
            'dc2.8xlarge': {'vcpu': 32, 'memory_gb': 244, 'hourly_cost': 4.80},
            'ra3.xlplus': {'vcpu': 12, 'memory_gb': 96, 'hourly_cost': 1.065},
            'ra3.4xlarge': {'vcpu': 12, 'memory_gb': 96, 'hourly_cost': 3.26},
            'ra3.16xlarge': {'vcpu': 48, 'memory_gb': 384, 'hourly_cost': 13.04}
        }

        estimates = {}
        hours_per_month = 730

        for node_type, info in pricing.items():
            for num_nodes in [2, 4, 8]:
                monthly_cost = info['hourly_cost'] * hours_per_month * num_nodes
                estimates[f"{node_type}x{num_nodes}"] = {
                    'node_type': node_type,
                    'num_nodes': num_nodes,
                    'hourly_cost': info['hourly_cost'] * num_nodes,
                    'monthly_cost': monthly_cost,
                    'vcpu_total': info['vcpu'] * num_nodes,
                    'memory_total_gb': info['memory_gb'] * num_nodes
                }

        return estimates

    def get_utilization_trends(self, cluster_id: str, days: int = 30) -> Dict:
        end_time = datetime.now()
        start_time = end_time - timedelta(days=days)

        cpu_trend = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/Redshift',
            MetricName='CPUUtilization',
            Dimensions=[{'Name': 'ClusterIdentifier', 'Value': cluster_id}],
            StartTime=start_time,
            EndTime=end_time,
            Period=86400,
            Statistics=['Average', 'Maximum']
        )

        return {
            'cpu_trend': cpu_trend['Datapoints']
        }

    def calculate_cost_per_query(self, cluster_id: str) -> Dict:
        utilization = self.get_utilization_trends(cluster_id, days=30)

        if utilization['cpu_trend']:
            avg_cpu = sum(dp['Average'] for dp in utilization['cpu_trend']) / len(utilization['cpu_trend'])
        else:
            avg_cpu = 0

        cost = self.get_monthly_cost(cluster_id)

        return {
            'monthly_cost': cost['total_cost'],
            'avg_cpu_utilization': avg_cpu,
            'cost_efficiency': cost['total_cost'] / max(avg_cpu, 1)
        }

    def generate_cost_report(self, cluster_id: str) -> Dict:
        return {
            'cluster_id': cluster_id,
            'current_cost': self.get_monthly_cost(cluster_id),
            'utilization': self.get_utilization_trends(cluster_id),
            'cost_estimates': self.estimate_cost_by_node_type(),
            'cost_per_query': self.calculate_cost_per_query(cluster_id)
        }

Infrastructure Setup (Terraform)

# infrastructure/redshift_warehouse.tf
variable "environment" {
  default = "production"
}

variable "master_password" {
  sensitive = true
}

resource "aws_redshift_cluster" "analytics" {
  cluster_identifier      = "analytics-warehouse-${var.environment}"
  database_name           = "analytics"
  master_username         = "admin"
  master_password         = var.master_password
  node_type               = "ra3.xlplus"
  number_of_nodes         = 3
  cluster_type            = "multi-node"
  encrypted               = true
  kms_key_id              = aws_kms_key.redshift.arn
  vpc_security_group_ids  = [aws_security_group.redshift.id]
  cluster_subnet_group_name = aws_redshift_subnet_group.analytics.name
  iam_roles               = [aws_iam_role.redshift_role.arn]
  publicly_accessible     = false
  skip_final_snapshot     = var.environment != "production"
  final_snapshot_identifier = var.environment == "production" ? "analytics-final-${formatdate("YYYY-MM-DD-hhmm", timestamp())}" : null
  automated_snapshot_retention_period = 7
  manual_snapshot_retention_period   = 35
  preferred_maintenance_window       = "sun:03:00-sun:04:00"
  availability_zone                  = "${var.region}a"

  logging_properties {
    bucket_name   = aws_s3_bucket.redshift_logs.id
    s3_key_prefix = "redshift-logs/"
  }

  tags = {
    Environment = var.environment
    Project     = "analytics"
  }
}

resource "aws_redshift_parameter_group" "analytics" {
  name   = "analytics-params-${var.environment}"
  family = "redshift-8.0"

  parameter {
    name  = "enable_user_activity_logging"
    value = "1"
  }

  parameter {
    name  = "max_query_timeout"
    value = "3600"
  }

  parameter {
    name  = "query_group_timeout"
    value = "600"
  }
}

resource "aws_redshift_subnet_group" "analytics" {
  name       = "analytics-subnet-group"
  subnet_ids = var.private_subnets

  tags = {
    Environment = var.environment
  }
}

resource "aws_security_group" "redshift" {
  name_prefix = "redshift-"
  vpc_id      = var.vpc_id

  ingress {
    from_port       = 5439
    to_port         = 5439
    protocol        = "tcp"
    security_groups = [aws_security_group.data_analysts.id]
    description     = "Redshift access from analyst security group"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "redshift-security-group"
  }
}

resource "aws_kms_key" "redshift" {
  description             = "KMS key for Redshift encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true
}

resource "aws_s3_bucket" "redshift_logs" {
  bucket = "redshift-logs-${var.environment}-${var.aws_account_id}"
}

resource "aws_s3_bucket_lifecycle_configuration" "redshift_logs" {
  bucket = aws_s3_bucket.redshift_logs.id

  rule {
    id     = "archive-logs"
    status = "Enabled"
    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }
    transition {
      days          = 90
      storage_class = "GLACIER"
    }
    expiration {
      days = 365
    }
  }
}

resource "aws_iam_role" "redshift_role" {
  name = "redshift-analytics-role-${var.environment}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "redshift.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy" "redshift_s3_access" {
  name = "redshift-s3-access"
  role = aws_iam_role.redshift_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = ["s3:GetObject", "s3:ListBucket"]
        Resource = [aws_s3_bucket.data_lake.arn, "${aws_s3_bucket.data_lake.arn}/*"]
      },
      {
        Effect = "Allow"
        Action = ["redshift:DescribeClusters", "redshift:GetClusterCredentials"]
        Resource = "*"
      }
    ]
  })
}

output "redshift_endpoint" {
  value = aws_redshift_cluster.analytics.endpoint
}

output "redshift_port" {
  value = aws_redshift_cluster.analytics.port
}

Testing and Validation

# tests/test_redshift_optimization.py
import pytest

class TestRedshiftOptimization:
    def test_compression_encoding_selection(self):
        column_types = {
            'customer_id': 'VARCHAR',
            'amount': 'DECIMAL',
            'status': 'VARCHAR',
            'created_at': 'TIMESTAMP'
        }
        encoding_recommendations = {}
        for col, dtype in column_types.items():
            if dtype == 'VARCHAR':
                encoding_recommendations[col] = 'lzo'
            elif dtype == 'DECIMAL':
                encoding_recommendations[col] = 'az64'
            elif dtype == 'TIMESTAMP':
                encoding_recommendations[col] = 'lzo'

        assert len(encoding_recommendations) == 4

    def test_distribution_style_recommendation(self):
        test_cases = [
            {'rows': 50000, 'distinct_values': 50000, 'expected': 'ALL'},
            {'rows': 10000000, 'distinct_values': 1000, 'expected': 'KEY'},
            {'rows': 500000, 'distinct_values': 100000, 'expected': 'EVEN'}
        ]
        for tc in test_cases:
            if tc['rows'] < 100000:
                result = 'ALL'
            elif tc['rows'] / tc['distinct_values'] > 100000:
                result = 'KEY'
            else:
                result = 'EVEN'
            assert result == tc['expected']

    def test_sort_key_type_selection(self):
        where_columns_frequency = {
            'order_date': 100,
            'status': 50,
            'customer_id': 30,
            'region': 10
        }
        sorted_cols = sorted(where_columns_frequency.items(), key=lambda x: x[1], reverse=True)
        top_cols = [c[0] for c in sorted_cols[:2]]
        assert len(top_cols) == 2
        assert top_cols[0] == 'order_date'

    def test_cost_estimation(self):
        pricing = {
            'ra3.xlplus': 1.065,
            'ra3.4xlarge': 3.26,
            'ra3.16xlarge': 13.04
        }
        hours_per_month = 730
        num_nodes = 3
        for node_type, hourly_cost in pricing.items():
            monthly = hourly_cost * hours_per_month * num_nodes
            assert monthly > 0

    def test_materialized_view_creation(self):
        mv_query = """
        CREATE MATERIALIZED VIEW mv_daily_sales_summary AS
        SELECT order_date, COUNT(*) as order_count, SUM(total_amount) as revenue
        FROM fct_orders
        GROUP BY order_date
        WITH NO DATA;
        """
        assert 'MATERIALIZED VIEW' in mv_query
        assert 'WITH NO DATA' in mv_query

    def test_wlm_configuration(self):
        queues = [
            {'name': 'reporting', 'concurrency': 5, 'memory': 40},
            {'name': 'etl', 'concurrency': 3, 'memory': 50},
            {'name': 'adhoc', 'concurrency': 10, 'memory': 30}
        ]
        total_memory = sum(q['memory'] for q in queues)
        assert total_memory == 100

    def test_vacuum_strategy(self):
        tables = [
            {'name': 'dim_customers', 'strategy': 'SORT ONLY'},
            {'name': 'fct_orders', 'strategy': 'DELETE ONLY'},
            {'name': 'fct_order_items', 'strategy': 'FULL'}
        ]
        for table in tables:
            assert table['strategy'] in ['SORT ONLY', 'DELETE ONLY', 'FULL']

    def test_analyze_statistics(self):
        tables = ['dim_customers', 'dim_products', 'fct_orders', 'fct_order_items']
        for table in tables:
            analyze_query = f"ANALYZE {table};"
            assert analyze_query.startswith('ANALYZE')

    def test_cluster_resize_recommendation(self):
        utilization = {
            'cpu_avg': 15.0,
            'cpu_max': 45.0,
            'storage_used_percent': 20.0
        }
        if utilization['cpu_avg'] < 20 and utilization['storage_used_percent'] < 30:
            recommendation = "downsize"
        elif utilization['cpu_avg'] > 70:
            recommendation = "upsize"
        else:
            recommendation = "maintain"
        assert recommendation == "downsize"

    def test_data_loading_optimization(self):
        load_strategies = {
            'bulk': 'COPY command with manifest',
            'incremental': 'INSERT with sort key awareness',
            'streaming': 'Kinesis Redshift integration'
        }
        for strategy, description in load_strategies.items():
            assert isinstance(description, str)

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
Redshift ra3.xlplus3 nodes, 24/7$2,338
S3 Storage2TB data lake$46
AWS Glue200 hours ETL$70
Data TransferCross-AZ + internet$150
KMSKey management$3
CloudWatchMonitoring$50
Total$2,657

Cost Optimization Strategies

💡

Tip: Reduce Redshift costs by 50-70%:

  1. RA3 with Managed Storage: Pay only for data stored
  2. Concurrency Scaling: Auto-scale for burst workloads
  3. Reserved Instances: 1-year commitment saves 40%
  4. Spectrum: Query S3 directly instead of loading
  5. Serverless: Pay-per-query for variable workloads

Performance vs Cost Comparison

ConfigurationMonthly CostQuery PerformanceCost per Query
dc2.large x21,095Baseline1,095 | Baseline |0.011
ra3.xlplus x32,3383xfaster2,338 | 3x faster |0.004
ra3.4xlarge x24,7596xfaster4,759 | 6x faster |0.003
ServerlessVariableBest$0.002

Interview Talking Points

ℹ️

Best Practice: Focus on these Redshift optimization concepts in interviews:

  1. Why distribution keys matter?

    • Minimize data shuffling across nodes
    • Co-locate joined tables
    • Even data distribution prevents skew
  2. Why sort keys improve performance?

    • Prune blocks during query execution
    • Compound keys for sequential access
    • Interleaved keys for multi-column queries
  3. Why compression encoding saves cost?

    • Reduces storage by 60-80%
    • Improves I/O performance
    • Less data scanned = lower cost

Common Interview Questions

Q: "How do you handle data skew in Redshift?"

skew_solutions = {
    "Even Distribution": "Use EVEN distkey for small dimension tables",
    "Key Selection": "Choose high-cardinality column as distkey",
    "Vacuum": "Reclaim space and rebalance after deletes",
    "Analyze": "Update statistics for query optimizer"
}

Q: "How do you optimize for concurrent queries?"

concurrency_optimization = {
    "WLM Queues": "Separate ETL, reporting, and ad-hoc queues",
    "Concurrency Scaling": "Auto-scale for burst workloads",
    "Materialized Views": "Pre-compute common aggregations",
    "Query Monitoring": "Terminate long-running queries"
}

Deployment Checklist

  • Set up VPC and security groups
  • Create Redshift cluster with encryption
  • Configure IAM roles for S3 access
  • Design optimal table schemas
  • Set up WLM queues for workload management
  • Load initial data with COPY command
  • Run ANALYZE and VACUUM
  • Create materialized views
  • Configure monitoring and alerting
  • Test query performance
  • Document optimization recommendations

⚠️

Warning: Always test schema changes in a dev cluster before production. Distribution key changes require table recreation and can cause significant downtime.


This project demonstrates cloud data warehousing optimization skills and is highly relevant for data engineering interviews at companies with large-scale analytics requirements.

Advertisement