🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Building a Data Lakehouse with Apache Iceberg on AWS S3

Data Engineering ProjectsData Lakehouse⭐ Premium

Advertisement

Data Lakehouse with Apache Iceberg

ACID Transactions + Time Travel + Schema Evolution on S3

ℹ️

Project Difficulty: Advanced | Duration: 2-3 weeks | Cloud: AWS Build a production data lakehouse combining data lake flexibility with warehouse reliability using Apache Iceberg, enabling time travel queries and schema evolution.

Project Overview

Problem Statement

Traditional data lakes suffer from data quality issues, lack of ACID transactions, and poor query performance. Organizations need a solution that combines the cost-effectiveness of data lakes with the reliability of data warehouses.

Objectives

  1. Implement ACID transactions on S3 data lake
  2. Enable time travel and data versioning
  3. Support schema evolution without data rewriting
  4. Achieve sub-second query performance on petabyte-scale data
  5. Maintain data governance and audit trails

Tech Stack

ComponentTechnologyPurpose
Table FormatApache IcebergACID, time travel, schema evolution
StorageAWS S3Scalable, durable object storage
ComputeAWS Glue + EMRServerless and cluster processing
Query EngineAthena + Spark SQLInteractive and batch queries
CatalogAWS Glue Data CatalogMetadata management
OrchestrationAWS Step FunctionsPipeline orchestration

Architecture Diagram

DATA SOURCESJDBC (RDBMS)APIs (REST)Files (CSV/JSON)Streams (Kafka)INGESTION LAYERAWS Glue ETL (Serverless)EMR Spark (Cluster)Kinesis Data FirehoseAPACHE ICEBERG TABLESRaw Zone (Bronze)Append-only · Schema-on-readCurated Zone (Silver)Deduplicated · Validated · EnrichedAggregated Zone (Gold)Business metrics · Pre-aggregatedQUERY LAYERAmazon Athena (Interactive)Spark SQL (Batch)Presto / Trino (Federated)CONSUMERSBI Tools (QuickSight)ML Pipelines (SageMaker)Data Apps (APIs)Analytics (Notebooks)

Data Source Setup and Schema

Iceberg Table Schemas

# schemas/iceberg_tables.py
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    FloatType, TimestampType, BooleanType, ArrayType,
    MapType, DecimalType
)

class IcebergSchemaManager:
    def __init__(self, spark: SparkSession, catalog_name: str = "awsdatacatalog"):
        self.spark = spark
        self.catalog = catalog_name
        self.database = "ecommerce_lakehouse"
    
    def create_database(self):
        """Create the database if it doesn't exist."""
        self.spark.sql(f"""
            CREATE DATABASE IF NOT EXISTS {self.catalog}.{self.database}
            COMMENT 'E-commerce data lakehouse database'
            LOCATION 's3://ecommerce-lakehouse/data/{self.database}/'
        """)
    
    def create_raw_customers(self):
        """Create raw customers table (Bronze layer)."""
        schema = StructType([
            StructField("customer_id", StringType(), False),
            StructField("email", StringType(), False),
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("address", MapType(StringType(), StringType()), True),
            StructField("created_at", StringType(), False),
            StructField("updated_at", StringType(), False),
            StructField("source_system", StringType(), False),
            StructField("_ingestion_timestamp", TimestampType(), False)
        ])
        
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.raw_customers (
                customer_id STRING,
                email STRING,
                first_name STRING,
                last_name STRING,
                phone STRING,
                address MAP<STRING, STRING>,
                created_at STRING,
                updated_at STRING,
                source_system STRING,
                _ingestion_timestamp TIMESTAMP
            )
            USING iceberg
            PARTITIONED BY (days(_ingestion_timestamp))
            LOCATION 's3://ecommerce-lakehouse/raw/customers/'
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'snappy',
                'write.target-file-size-bytes' = '134217728',
                'history.expire.max-snapshot-age-ms' = '604800000',
                'metadata.delete-after-commit.duration.ms' = '86400000'
            )
        """)
    
    def create_raw_products(self):
        """Create raw products table (Bronze layer)."""
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.raw_products (
                product_id STRING,
                sku STRING,
                name STRING,
                description STRING,
                category STRING,
                subcategory STRING,
                price DECIMAL(10, 2),
                cost DECIMAL(10, 2),
                stock_quantity INT,
                attributes MAP<STRING, STRING>,
                created_at TIMESTAMP,
                updated_at TIMESTAMP,
                _ingestion_timestamp TIMESTAMP
            )
            USING iceberg
            PARTITIONED BY (category, days(_ingestion_timestamp))
            LOCATION 's3://ecommerce-lakehouse/raw/products/'
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'zstd'
            )
        """)
    
    def create_raw_orders(self):
        """Create raw orders table (Bronze layer)."""
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.raw_orders (
                order_id STRING,
                customer_id STRING,
                order_status STRING,
                order_date TIMESTAMP,
                required_date TIMESTAMP,
                shipped_date TIMESTAMP,
                items ARRAY<STRUCT<
                    product_id: STRING,
                    quantity: INT,
                    unit_price: DECIMAL(10, 2),
                    discount: DECIMAL(5, 2)
                >>,
                shipping_address MAP<STRING, STRING>,
                total_amount DECIMAL(12, 2),
                _ingestion_timestamp TIMESTAMP
            )
            USING iceberg
            PARTITIONED BY (days(order_date), order_status)
            LOCATION 's3://ecommerce-lakehouse/raw/orders/'
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'snappy',
                'write.distribution-mode' = 'hash'
            )
        """)
    
    def create_curated_customers(self):
        """Create curated customers table (Silver layer)."""
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.curated_customers (
                customer_id STRING,
                email STRING,
                full_name STRING,
                phone STRING,
                city STRING,
                state STRING,
                country STRING,
                customer_segment STRING,
                lifetime_value DECIMAL(12, 2),
                first_order_date TIMESTAMP,
                last_order_date TIMESTAMP,
                total_orders INT,
                is_active BOOLEAN,
                _valid_from TIMESTAMP,
                _valid_to TIMESTAMP,
                _is_current BOOLEAN
            )
            USING iceberg
            PARTITIONED BY (customer_segment, country)
            LOCATION 's3://ecommerce-lakehouse/curated/customers/'
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'zstd',
                'write.upsert.enabled' = 'true'
            )
        """)
    
    def create_curated_orders(self):
        """Create curated orders table (Silver layer)."""
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.curated_orders (
                order_id STRING,
                customer_id STRING,
                customer_name STRING,
                customer_email STRING,
                order_status STRING,
                order_date TIMESTAMP,
                shipped_date TIMESTAMP,
                delivery_date TIMESTAMP,
                item_count INT,
                total_amount DECIMAL(12, 2),
                discount_amount DECIMAL(12, 2),
                tax_amount DECIMAL(12, 2),
                shipping_cost DECIMAL(10, 2),
                net_amount DECIMAL(12, 2),
                shipping_city STRING,
                shipping_state STRING,
                shipping_country STRING,
                payment_method STRING,
                _processing_timestamp TIMESTAMP
            )
            USING iceberg
            PARTITIONED BY (days(order_date), shipping_country)
            LOCATION 's3://ecommerce-lakehouse/curated/orders/'
        """)
    
    def create_aggregated_daily_metrics(self):
        """Create daily metrics aggregation (Gold layer)."""
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.agg_daily_metrics (
                metric_date DATE,
                metric_type STRING,
                dimension_key STRING,
                dimension_value STRING,
                metric_value DECIMAL(15, 2),
                metric_count BIGINT,
                _computed_at TIMESTAMP
            )
            USING iceberg
            PARTITIONED BY (metric_type, metric_date)
            LOCATION 's3://ecommerce-lakehouse/aggregated/daily_metrics/'
            TBLPROPERTIES (
                'write.format.default' = 'parquet',
                'write.parquet.compression-codec' = 'zstd',
                'write.distribution-mode' = 'range'
            )
        """)
    
    def create_all_tables(self):
        """Create all tables in the lakehouse."""
        self.create_database()
        self.create_raw_customers()
        self.create_raw_products()
        self.create_raw_orders()
        self.create_curated_customers()
        self.create_curated_orders()
        self.create_aggregated_daily_metrics()
        
        print("All Iceberg tables created successfully!")

Sample Data Generation

# data/generate_sample_data.py
import random
import uuid
from datetime import datetime, timedelta
from decimal import Decimal
import json

class SampleDataGenerator:
    def __init__(self):
        self.first_names = [
            "James", "Mary", "Robert", "Patricia", "John", "Jennifer",
            "Michael", "Linda", "David", "Elizabeth", "William", "Barbara",
            "Richard", "Susan", "Joseph", "Jessica", "Thomas", "Sarah"
        ]
        self.last_names = [
            "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia",
            "Miller", "Davis", "Rodriguez", "Martinez", "Hernandez", "Lopez"
        ]
        self.categories = [
            "Electronics", "Clothing", "Home & Garden", "Sports",
            "Books", "Toys", "Health", "Automotive"
        ]
        self.subcategories = {
            "Electronics": ["Phones", "Laptops", "Tablets", "Audio"],
            "Clothing": ["Men", "Women", "Kids", "Accessories"],
            "Home & Garden": ["Furniture", "Decor", "Kitchen", "Outdoor"],
            "Sports": ["Fitness", "Outdoor", "Team Sports", "Water Sports"]
        }
        self.cities = [
            ("New York", "NY"), ("Los Angeles", "CA"), ("Chicago", "IL"),
            ("Houston", "TX"), ("Phoenix", "AZ"), ("Philadelphia", "PA")
        ]
    
    def generate_customers(self, count: int = 1000) -> list:
        """Generate sample customer records."""
        customers = []
        
        for i in range(count):
            first_name = random.choice(self.first_names)
            last_name = random.choice(self.last_names)
            city, state = random.choice(self.cities)
            
            customer = {
                "customer_id": f"CUST-{uuid.uuid4().hex[:8].upper()}",
                "email": f"{first_name.lower()}.{last_name.lower()}{i}@email.com",
                "first_name": first_name,
                "last_name": last_name,
                "phone": f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}",
                "address": {
                    "street": f"{random.randint(100, 9999)} Main Street",
                    "city": city,
                    "state": state,
                    "zip": f"{random.randint(10000, 99999)}",
                    "country": "USA"
                },
                "created_at": (datetime.now() - timedelta(days=random.randint(1, 365))).isoformat(),
                "updated_at": datetime.now().isoformat(),
                "source_system": random.choice(["web", "mobile", "store"]),
                "_ingestion_timestamp": datetime.now()
            }
            customers.append(customer)
        
        return customers
    
    def generate_products(self, count: int = 500) -> list:
        """Generate sample product records."""
        products = []
        
        for i in range(count):
            category = random.choice(self.categories)
            subcategory = random.choice(self.subcategories.get(category, ["General"]))
            
            base_price = random.uniform(9.99, 999.99)
            cost = base_price * random.uniform(0.3, 0.7)
            
            product = {
                "product_id": f"PROD-{uuid.uuid4().hex[:8].upper()}",
                "sku": f"SKU-{category[:3].upper()}-{i:04d}",
                "name": f"{category} {subcategory} Product {i}",
                "description": f"High quality {category.lower()} product",
                "category": category,
                "subcategory": subcategory,
                "price": Decimal(str(round(base_price, 2))),
                "cost": Decimal(str(round(cost, 2))),
                "stock_quantity": random.randint(0, 1000),
                "attributes": {
                    "weight": f"{random.uniform(0.1, 50.0):.2f} kg",
                    "color": random.choice(["Black", "White", "Red", "Blue"]),
                    "warranty": f"{random.choice([12, 24, 36])} months"
                },
                "created_at": datetime.now() - timedelta(days=random.randint(1, 365)),
                "updated_at": datetime.now(),
                "_ingestion_timestamp": datetime.now()
            }
            products.append(product)
        
        return products
    
    def generate_orders(self, customers: list, products: list, count: int = 5000) -> list:
        """Generate sample order records."""
        orders = []
        statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
        
        for i in range(count):
            customer = random.choice(customers)
            num_items = random.randint(1, 5)
            items = []
            total = Decimal('0.00')
            
            for _ in range(num_items):
                product = random.choice(products)
                quantity = random.randint(1, 3)
                unit_price = product["price"]
                discount = Decimal(str(round(random.uniform(0, 0.2), 2)))
                
                item_total = unit_price * quantity * (1 - discount)
                total += item_total
                
                items.append({
                    "product_id": product["product_id"],
                    "quantity": quantity,
                    "unit_price": unit_price,
                    "discount": discount
                })
            
            order_date = datetime.now() - timedelta(
                days=random.randint(0, 90),
                hours=random.randint(0, 23),
                minutes=random.randint(0, 59)
            )
            
            status = random.choice(statuses)
            shipped_date = None
            if status in ["shipped", "delivered"]:
                shipped_date = order_date + timedelta(days=random.randint(1, 5))
            
            order = {
                "order_id": f"ORD-{uuid.uuid4().hex[:8].upper()}",
                "customer_id": customer["customer_id"],
                "order_status": status,
                "order_date": order_date,
                "required_date": order_date + timedelta(days=random.randint(3, 10)),
                "shipped_date": shipped_date,
                "items": items,
                "shipping_address": customer["address"],
                "total_amount": total,
                "_ingestion_timestamp": datetime.now()
            }
            orders.append(order)
        
        return orders
    
    def generate_all_data(self):
        """Generate complete sample dataset."""
        print("Generating customers...")
        customers = self.generate_customers(1000)
        
        print("Generating products...")
        products = self.generate_products(500)
        
        print("Generating orders...")
        orders = self.generate_orders(customers, products, 5000)
        
        return {
            "customers": customers,
            "products": products,
            "orders": orders
        }

Step-by-Step Implementation Guide

Step 1: EMR Cluster Setup with Iceberg

# cluster/emr_setup.py
import boto3
import json
import time

class EMRIcebergSetup:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.emr_client = boto3.client('emr', region_name=region)
        self.s3_client = boto3.client('s3', region_name=region)
    
    def create_iceberg_cluster(self, config: dict) -> str:
        """Create EMR cluster with Iceberg support."""
        cluster_name = config.get('cluster_name', 'iceberg-cluster')
        
        response = self.emr_client.run_job_flow(
            Name=cluster_name,
            ReleaseLabel='emr-6.15.0',
            Applications=[
                {'Name': 'Spark'},
                {'Name': 'JupyterEnterpriseGateway'},
                {'Name': 'Livy'}
            ],
            Configurations=[
                {
                    'Classification': 'spark-defaults',
                    'Properties': {
                        'spark.sql.catalog.glue_catalog': 'org.apache.iceberg.spark.SparkCatalog',
                        'spark.sql.catalog.glue_catalog.catalog-impl': 'org.apache.iceberg.aws.glue.GlueCatalog',
                        'spark.sql.catalog.glue_catalog.io-impl': 'org.apache.iceberg.aws.s3.S3FileIO',
                        'spark.sql.catalog.glue_catalog.warehouse': config['warehouse_path'],
                        'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions',
                        'spark.sql.catalog.glue_catalog.spark.sql.events.enabled': 'true',
                        'spark.hadoop.hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.HiveMetastoreClientFactory',
                        'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
                        'spark.sql.sources.partitionOverwriteMode': 'dynamic'
                    }
                },
                {
                    'Classification': 'spark-env',
                    'Properties': {},
                    'Configurations': [
                        {
                            'Classification': 'export',
                            'Properties': {
                                'SPARK_DRIVER_MEMORY': '4g',
                                'SPARK_EXECUTOR_MEMORY': '8g'
                            }
                        }
                    ]
                },
                {
                    'Classification': 'hive-site',
                    'Properties': {
                        'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.HiveMetastoreClientFactory'
                    }
                }
            ],
            Instances={
                'MasterInstanceType': 'm5.xlarge',
                'SlaveInstanceType': 'r5.2xlarge',
                'InstanceCount': config.get('instance_count', 3),
                'Ec2KeyName': config.get('key_pair_name'),
                'Ec2SubnetId': config.get('subnet_id'),
                'KeepJobFlowAliveWhenNoSteps': True,
                'TerminationProtected': True
            },
            JobFlowRole='EMR_EC2_DefaultRole',
            ServiceRole='EMR_DefaultRole',
            VisibleToAllUsers=True,
            LogUri=f"s3://{config['log_bucket']}/emr-logs/",
            Tags=[
                {'Key': 'Environment', 'Value': config.get('environment', 'production')},
                {'Key': 'Project', 'Value': 'iceberg-lakehouse'}
            ]
        )
        
        cluster_id = response['JobFlowId']
        print(f"Created EMR cluster: {cluster_id}")
        
        # Wait for cluster to be ready
        self._wait_for_cluster(cluster_id)
        
        return cluster_id
    
    def _wait_for_cluster(self, cluster_id: str, timeout: int = 1800):
        """Wait for EMR cluster to reach WAITING state."""
        start_time = time.time()
        
        while True:
            response = self.emr_client.describe_cluster(ClusterId=cluster_id)
            state = response['Cluster']['Status']['State']
            
            if state == 'WAITING':
                print(f"Cluster {cluster_id} is ready")
                return
            elif state in ['TERMINATED', 'TERMINATED_WITH_ERRORS']:
                raise Exception(f"Cluster failed to start: {state}")
            
            elapsed = time.time() - start_time
            if elapsed > timeout:
                raise TimeoutError(f"Cluster not ready within {timeout} seconds")
            
            print(f"Cluster state: {state} ({elapsed:.0f}s elapsed)")
            time.sleep(30)
    
    def add_iceberg_step(self, cluster_id: str, script_path: str, args: list = None):
        """Add a Spark step to run Iceberg operations."""
        step = {
            'Name': 'Iceberg Processing Step',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    'spark-submit',
                    '--deploy-mode', 'cluster',
                    '--conf', 'spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog',
                    '--conf', 'spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog',
                    '--conf', 'spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO',
                    script_path
                ] + (args or [])
            }
        }
        
        response = self.emr_client.add_job_flow_steps(
            JobFlowId=cluster_id,
            Steps=[step]
        )
        
        return response['StepIds'][0]

Step 2: Iceberg Operations Library

# iceberg/operations.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, current_timestamp, count, sum, avg,
    max, min, when, coalesce, date_format, to_date
)
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IcebergOperations:
    def __init__(self, spark: SparkSession, catalog: str = "glue_catalog", 
                 database: str = "ecommerce_lakehouse"):
        self.spark = spark
        self.catalog = catalog
        self.database = database
        self.table_prefix = f"{catalog}.{database}"
    
    def list_tables(self) -> List[str]:
        """List all tables in the database."""
        tables_df = self.spark.sql(f"SHOW TABLES IN {self.table_prefix}")
        return [row['tableName'] for row in tables_df.collect()]
    
    def get_table_info(self, table_name: str) -> Dict:
        """Get detailed information about a table."""
        full_table = f"{self.table_prefix}.{table_name}"
        
        # Get table properties
        props_df = self.spark.sql(f"DESCRIBE TABLE EXTENDED {full_table}")
        properties = {row['col_name']: row['data_type'] for row in props_df.collect() 
                     if row['col_name'] and not row['col_name'].startswith('#')}
        
        # Get snapshot history
        history_df = self.spark.sql(f"""
            SELECT * FROM {full_table}.snapshots
            ORDER BY committed_at DESC
            LIMIT 10
        """)
        
        return {
            'properties': properties,
            'recent_snapshots': history_df.collect()
        }
    
    def merge_upsert(self, source_df, target_table: str, 
                     join_keys: List[str], stage: str = "curated"):
        """Perform merge/upsert operation on Iceberg table."""
        full_table = f"{self.table_prefix}.{stage}_{target_table}"
        
        # Create temp view for source data
        source_df.createOrReplaceTempView("source_updates")
        
        merge_sql = f"""
            MERGE INTO {full_table} AS target
            USING source_updates AS source
            ON {' AND '.join([f'target.{k} = source.{k}' for k in join_keys])}
            
            WHEN MATCHED THEN UPDATE SET
                target.* = source.*
            
            WHEN NOT MATCHED THEN INSERT *
        """
        
        logger.info(f"Executing merge into {full_table}")
        self.spark.sql(merge_sql)
        logger.info(f"Merge completed for {full_table}")
    
    def expire_old_snapshots(self, table_name: str, days_to_keep: int = 30):
        """Expire old snapshots to manage storage."""
        full_table = f"{self.table_prefix}.{table_name}"
        
        cutoff_date = datetime.now() - timedelta(days=days_to_keep)
        
        # Get snapshots to expire
        snapshots_df = self.spark.sql(f"""
            SELECT snapshot_id, committed_at
            FROM {full_table}.snapshots
            WHERE committed_at < '{cutoff_date.isoformat()}'
            ORDER BY committed_at DESC
        """)
        
        if snapshots_df.count() > 0:
            snapshot_ids = [row['snapshot_id'] for row in snapshots_df.collect()]
            
            self.spark.sql(f"""
                CALL {self.catalog}.system.expire_snapshots(
                    table => '{self.database}.{table_name}',
                    older_than => TIMESTAMP '{cutoff_date.isoformat()}',
                    retain_last => 10
                )
            """)
            
            logger.info(f"Expired {len(snapshot_ids)} snapshots from {table_name}")
    
    def rewrite_data_files(self, table_name: str, 
                          target_file_size_mb: int = 128):
        """Rewrite small files to optimize read performance."""
        full_table = f"{self.table_prefix}.{table_name}"
        
        logger.info(f"Starting data file rewrite for {full_table}")
        
        self.spark.sql(f"""
            CALL {self.catalog}.system.rewrite_data_files(
                table => '{self.database}.{table_name}',
                options => map(
                    'target-file-size-bytes', '{target_file_size_mb * 1024 * 1024}',
                    'min-input-files', '5'
                )
            )
        """)
        
        logger.info(f"Completed data file rewrite for {full_table}")
    
    def rewrite_manifests(self, table_name: str):
        """Rewrite manifests to optimize metadata."""
        full_table = f"{self.table_prefix}.{table_name}"
        
        self.spark.sql(f"""
            CALL {self.catalog}.system.rewrite_manifests(
                table => '{self.database}.{table_name}'
            )
        """)
        
        logger.info(f"Manifests rewritten for {full_table}")
    
    def time_travel_query(self, table_name: str, 
                         timestamp: Optional[str] = None,
                         snapshot_id: Optional[int] = None):
        """Query table at a specific point in time."""
        full_table = f"{self.table_prefix}.{table_name}"
        
        if timestamp:
            query = f"SELECT * FROM {full_table} TIMESTAMP AS OF '{timestamp}'"
        elif snapshot_id:
            query = f"SELECT * FROM {full_table} VERSION AS OF {snapshot_id}"
        else:
            raise ValueError("Must specify timestamp or snapshot_id")
        
        return self.spark.sql(query)
    
    def compare_snapshots(self, table_name: str, 
                         snapshot_id_1: int, snapshot_id_2: int):
        """Compare data between two snapshots."""
        full_table = f"{self.table_prefix}.{table_name}"
        
        # Get data at each snapshot
        df1 = self.spark.sql(f"""
            SELECT * FROM {full_table} VERSION AS OF {snapshot_id_1}
        """)
        
        df2 = self.spark.sql(f"""
            SELECT * FROM {full_table} VERSION AS OF {snapshot_id_2}
        """)
        
        # Create temp views for comparison
        df1.createOrReplaceTempView("snapshot_1")
        df2.createOrReplaceTempView("snapshot_2")
        
        # Find differences
        added = self.spark.sql("""
            SELECT * FROM snapshot_2
            WHERE NOT EXISTS (
                SELECT 1 FROM snapshot_1
                WHERE snapshot_1.id = snapshot_2.id
            )
        """)
        
        removed = self.spark.sql("""
            SELECT * FROM snapshot_1
            WHERE NOT EXISTS (
                SELECT 1 FROM snapshot_2
                WHERE snapshot_1.id = snapshot_2.id
            )
        """)
        
        return {
            'added': added,
            'removed': removed,
            'snapshot_1_count': df1.count(),
            'snapshot_2_count': df2.count()
        }

Step 3: Glue ETL Jobs

# etl/glue_etl_job.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'database', 'table', 's3_output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Configure Iceberg catalog
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")

def read_from_glue_catalog(database: str, table_name: str):
    """Read data from Glue Data Catalog."""
    dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
        database=database,
        table_name=table_name,
        transformation_ctx="source_data"
    )
    return dynamic_frame

def transform_customers(dynamic_frame):
    """Transform and clean customer data."""
    df = dynamic_frame.toDF()
    
    # Clean and standardize
    transformed_df = df \
        .withColumn("email", F.lower(F.trim(col("email")))) \
        .withColumn("full_name", F.concat_ws(" ", col("first_name"), col("last_name"))) \
        .withColumn("phone", F.regexp_replace(col("phone"), r"[^0-9+]", "")) \
        .withColumn("city", col("address")["city"]) \
        .withColumn("state", col("address")["state"]) \
        .withColumn("country", col("address")["country"]) \
        .withColumn("_processing_timestamp", F.current_timestamp()) \
        .drop("address")
    
    return transformed_df

def transform_orders(dynamic_frame):
    """Transform and enrich order data."""
    df = dynamic_frame.toDF()
    
    # Explode items array and calculate metrics
    transformed_df = df \
        .withColumn("item_count", F.size(col("items"))) \
        .withColumn("subtotal", F.expr("""
            aggregate(items, 0.0, (acc, item) -> 
                acc + (item.quantity * item.unit_price * (1 - item.discount)))
        """)) \
        .withColumn("discount_amount", F.expr("""
            aggregate(items, 0.0, (acc, item) -> 
                acc + (item.quantity * item.unit_price * item.discount))
        """)) \
        .withColumn("tax_amount", col("subtotal") * 0.08) \
        .withColumn("shipping_cost", 
            F.when(col("subtotal") > 100, 0).otherwise(9.99)
        ) \
        .withColumn("net_amount", 
            col("subtotal") + col("tax_amount") + col("shipping_cost")
        ) \
        .withColumn("shipping_city", col("shipping_address")["city"]) \
        .withColumn("shipping_state", col("shipping_address")["state"]) \
        .withColumn("shipping_country", col("shipping_address")["country"]) \
        .withColumn("payment_method", F.lit("credit_card")) \
        .withColumn("_processing_timestamp", F.current_timestamp())
    
    return transformed_df

def calculate_daily_metrics(spark):
    """Calculate daily business metrics from curated tables."""
    # Read curated tables
    customers_df = spark.read.format("iceberg").load(
        "s3://ecommerce-lakehouse/curated/customers/"
    )
    
    orders_df = spark.read.format("iceberg").load(
        "s3://ecommerce-lakehouse/curated/orders/"
    )
    
    # Calculate daily revenue metrics
    daily_revenue = orders_df \
        .withColumn("metric_date", F.to_date(col("order_date"))) \
        .groupBy("metric_date") \
        .agg(
            F.sum("net_amount").alias("metric_value"),
            F.count("order_id").alias("metric_count")
        ) \
        .withColumn("metric_type", F.lit("daily_revenue")) \
        .withColumn("dimension_key", F.lit("all")) \
        .withColumn("dimension_value", F.lit("all")) \
        .withColumn("_computed_at", F.current_timestamp())
    
    # Calculate customer segment metrics
    segment_metrics = orders_df \
        .join(customers_df, "customer_id") \
        .withColumn("metric_date", F.to_date(col("order_date"))) \
        .groupBy("metric_date", col("customer_segment")) \
        .agg(
            F.sum("net_amount").alias("metric_value"),
            F.count("order_id").alias("metric_count")
        ) \
        .withColumn("metric_type", F.lit("segment_revenue")) \
        .withColumn("dimension_key", F.lit("customer_segment")) \
        .withColumn("dimension_value", col("customer_segment")) \
        .withColumn("_computed_at", F.current_timestamp())
    
    # Calculate category metrics
    category_metrics = orders_df \
        .withColumn("metric_date", F.to_date(col("order_date"))) \
        .groupBy("metric_date", "category") \
        .agg(
            F.sum("net_amount").alias("metric_value"),
            F.count("order_id").alias("metric_count")
        ) \
        .withColumn("metric_type", F.lit("category_revenue")) \
        .withColumn("dimension_key", F.lit("category")) \
        .withColumn("dimension_value", col("category")) \
        .withColumn("_computed_at", F.current_timestamp())
    
    # Union all metrics
    all_metrics = daily_revenue \
        .unionByName(segment_metrics, allowMissingColumns=True) \
        .unionByName(category_metrics, allowMissingColumns=True)
    
    return all_metrics

# Main ETL execution
try:
    logger.info(f"Starting ETL job: {args['JOB_NAME']}")
    
    if args['table'] == 'customers':
        # Process customers
        source_df = read_from_glue_catalog(args['database'], 'raw_customers')
        transformed_df = transform_customers(source_df)
        
        # Write to Iceberg
        transformed_df.write \
            .format("iceberg") \
            .mode("append") \
            .save(f"s3://ecommerce-lakehouse/curated/customers/")
    
    elif args['table'] == 'orders':
        # Process orders
        source_df = read_from_glue_catalog(args['database'], 'raw_orders')
        transformed_df = transform_orders(source_df)
        
        # Write to Iceberg
        transformed_df.write \
            .format("iceberg") \
            .mode("append") \
            .save(f"s3://ecommerce-lakehouse/curated/orders/")
    
    elif args['table'] == 'metrics':
        # Calculate and write metrics
        metrics_df = calculate_daily_metrics(spark)
        
        metrics_df.write \
            .format("iceberg") \
            .mode("append") \
            .save(f"s3://ecommerce-lakehouse/aggregated/daily_metrics/")
    
    logger.info(f"ETL job completed: {args['JOB_NAME']}")
    
except Exception as e:
    logger.error(f"ETL job failed: {str(e)}")
    raise

# Commit job
job.commit()

Infrastructure Setup (Terraform)

# infrastructure/s3_lakehouse.tf
resource "aws_s3_bucket" "lakehouse" {
  bucket = "${var.project_name}-lakehouse-${var.environment}"
  
  tags = {
    Name        = "${var.project_name}-lakehouse"
    Environment = var.environment
    ManagedBy   = "terraform"
  }
}

resource "aws_s3_bucket_versioning" "lakehouse" {
  bucket = aws_s3_bucket.lakehouse.id
  
  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "lakehouse" {
  bucket = aws_s3_bucket.lakehouse.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.lakehouse.arn
    }
    bucket_key_enabled = true
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "lakehouse" {
  bucket = aws_s3_bucket.lakehouse.id

  # Raw zone lifecycle
  rule {
    id     = "raw-data-lifecycle"
    status = "Enabled"
    
    filter {
      prefix = "raw/"
    }
    
    transition {
      days          = 90
      storage_class = "STANDARD_IA"
    }
    
    transition {
      days          = 365
      storage_class = "GLACIER"
    }
    
    noncurrent_version_expiration {
      noncurrent_days = 90
    }
  }
  
  # Curated zone lifecycle
  rule {
    id     = "curated-data-lifecycle"
    status = "Enabled"
    
    filter {
      prefix = "curated/"
    }
    
    transition {
      days          = 180
      storage_class = "STANDARD_IA"
    }
    
    noncurrent_version_expiration {
      noncurrent_days = 30
    }
  }
  
  # Temporary files cleanup
  rule {
    id     = "cleanup-temp"
    status = "Enabled"
    
    filter {
      prefix = "_temporary/"
    }
    
    expiration {
      days = 1
    }
  }
}

resource "aws_s3_bucket_policy" "lakehouse" {
  bucket = aws_s3_bucket.lakehouse.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "EnforceSSLOnly"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:*"
        Resource = [
          aws_s3_bucket.lakehouse.arn,
          "${aws_s3_bucket.lakehouse.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      },
      {
        Sid       = "DenyInsecureTransport"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:*"
        Resource = [
          aws_s3_bucket.lakehouse.arn,
          "${aws_s3_bucket.lakehouse.arn}/*"
        ]
        Condition = {
          NumericLessThan = {
            "s3:TlsVersion" = "1.2"
          }
        }
      }
    ]
  })
}

resource "aws_kms_key" "lakehouse" {
  description             = "KMS key for lakehouse encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true
  
  tags = {
    Name = "${var.project_name}-lakehouse-key"
  }
}

resource "aws_kms_alias" "lakehouse" {
  name          = "alias/${var.project_name}-lakehouse"
  target_key_id = aws_kms_key.lakehouse.key_id
}

# Glue Catalog Database
resource "aws_glue_catalog_database" "lakehouse" {
  name = "${var.project_name}_lakehouse_${var.environment}"
  
  create_table_default_permissions {
    permissions = ["ALL"]
    
    principals {
      data_lake_principal_identifier = "IAM_ALLOWED_PRINCIPALS"
    }
  }
  
  tags = {
    Environment = var.environment
  }
}

# Glue Crawler for Iceberg tables
resource "aws_glue_crawler" "lakehouse_crawler" {
  database_name = aws_glue_catalog_database.lakehouse.name
  name          = "${var.project_name}-iceberg-crawler"
  role          = aws_iam_role.glue_crawler.arn
  
  s3_target {
    path = "s3://${aws_s3_bucket.lakehouse.bucket}/raw/"
  }
  
  s3_target {
    path = "s3://${aws_s3_bucket.lakehouse.bucket}/curated/"
  }
  
  s3_target {
    path = "s3://${aws_s3_bucket.lakehouse.bucket}/aggregated/"
  }
  
  schema_change_policy {
    delete_behavior = "LOG"
    update_behavior = "UPDATE_IN_DATABASE"
  }
  
  configuration = jsonencode({
    Version = 1.0
    Grouping = {
      TableGroupingPolicy = "CombineCompatibleSchemas"
    }
  })
}

# IAM Role for Glue Crawler
resource "aws_iam_role" "glue_crawler" {
  name = "${var.project_name}-glue-crawler-role"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "glue.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy" "glue_crawler" {
  name = "${var.project_name}-glue-crawler-policy"
  role = aws_iam_role.glue_crawler.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:DeleteObject",
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.lakehouse.arn,
          "${aws_s3_bucket.lakehouse.arn}/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "glue:*"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect = "Allow"
        Action = [
          "kms:Decrypt",
          "kms:GenerateDataKey"
        ]
        Resource = aws_kms_key.lakehouse.arn
      }
    ]
  })
}

# Athena Workgroup for Iceberg queries
resource "aws_athena_workgroup" "iceberg_queries" {
  name = "${var.project_name}-iceberg-workgroup"
  
  configuration {
    enforce_workgroup_configuration    = true
    publish_cloudwatch_metrics_enabled = true
    
    result_configuration {
      output_location = "s3://${aws_s3_bucket.lakehouse.bucket}/athena-results/"
      
      encryption_configuration {
        encryption_option = "SSE_KMS"
        kms_key_arn       = aws_kms_key.lakehouse.arn
      }
    }
    
    engine_version {
      selected_engine_version = "Athena engine version 3"
    }
  }
  
  tags = {
    Environment = var.environment
  }
}

# Outputs
output "lakehouse_bucket" {
  description = "S3 bucket for lakehouse data"
  value       = aws_s3_bucket.lakehouse.bucket
}

output "glue_database" {
  description = "Glue catalog database name"
  value       = aws_glue_catalog_database.lakehouse.name
}

output "athena_workgroup" {
  description = "Athena workgroup name"
  value       = aws_athena_workgroup.iceberg_queries.name
}

Testing and Validation

# tests/test_iceberg_operations.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from datetime import datetime, timedelta

class TestIcebergOperations:
    @pytest.fixture(scope="session")
    def spark(self):
        """Create a local Spark session with Iceberg support."""
        return SparkSession.builder \
            .appName("TestIcebergLakehouse") \
            .master("local[*]") \
            .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.local.type", "hadoop") \
            .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .getOrCreate()
    
    @pytest.fixture
    def sample_data(self):
        """Generate sample test data."""
        schema = StructType([
            StructField("id", IntegerType(), False),
            StructField("name", StringType(), False),
            StructField("category", StringType(), False),
            StructField("value", IntegerType(), True)
        ])
        
        data = [
            (1, "Product A", "Electronics", 100),
            (2, "Product B", "Clothing", 50),
            (3, "Product C", "Electronics", 200),
            (4, "Product D", "Home", 75),
            (5, "Product E", "Clothing", 30)
        ]
        
        return schema, data
    
    def test_create_table(self, spark, sample_data):
        """Test Iceberg table creation."""
        schema, data = sample_data
        df = spark.createDataFrame(data, schema)
        
        # Create table
        df.write.format("iceberg").mode("overwrite").save("/tmp/test_table")
        
        # Verify table was created
        read_df = spark.read.format("iceberg").load("/tmp/test_table")
        assert read_df.count() == 5
        assert set(read_df.columns) == {"id", "name", "category", "value"}
    
    def test_upsert_operations(self, spark, sample_data):
        """Test merge/upsert operations."""
        schema, data = sample_data
        df = spark.createDataFrame(data, schema)
        
        # Create initial table
        df.write.format("iceberg").mode("overwrite").save("/tmp/test_upsert")
        
        # Create update data
        update_data = [
            (1, "Product A Updated", "Electronics", 150),  # Update
            (6, "Product F", "Sports", 90)  # Insert
        ]
        update_df = spark.createDataFrame(update_data, schema)
        
        # Perform upsert
        update_df.write.format("iceberg").mode("append").save("/tmp/test_upsert")
        
        # Verify results
        result_df = spark.read.format("iceberg").load("/tmp/test_upsert")
        assert result_df.count() == 6  # 5 original + 1 new
        
        # Verify update was applied
        product_a = result_df.filter("id = 1").collect()[0]
        assert product_a["name"] == "Product A Updated"
        assert product_a["value"] == 150
    
    def test_time_travel(self, spark, sample_data):
        """Test time travel query capabilities."""
        schema, data = sample_data
        df = spark.createDataFrame(data, schema)
        
        # Create initial version
        df.write.format("iceberg").mode("overwrite").save("/tmp/test_time_travel")
        
        # Get initial count
        v1_count = spark.read.format("iceberg").load("/tmp/test_time_travel").count()
        
        # Add more data
        new_data = [(6, "Product F", "Sports", 90)]
        new_df = spark.createDataFrame(new_data, schema)
        new_df.write.format("iceberg").mode("append").save("/tmp/test_time_travel")
        
        # Verify new count
        v2_count = spark.read.format("iceberg").load("/tmp/test_time_travel").count()
        assert v2_count == v1_count + 1
        
        # Query historical version using snapshot ID
        history_df = spark.sql("SELECT * FROM local.default.test_time_travel.snapshots")
        snapshots = history_df.collect()
        assert len(snapshots) >= 2
    
    def test_schema_evolution(self, spark):
        """Test schema evolution capabilities."""
        # Create initial table
        schema1 = StructType([
            StructField("id", IntegerType(), False),
            StructField("name", StringType(), False)
        ])
        data1 = [(1, "Product A"), (2, "Product B")]
        df1 = spark.createDataFrame(data1, schema1)
        df1.write.format("iceberg").mode("overwrite").save("/tmp/test_schema_evolution")
        
        # Add new column
        schema2 = StructType([
            StructField("id", IntegerType(), False),
            StructField("name", StringType(), False),
            StructField("category", StringType(), True)
        ])
        data2 = [(3, "Product C", "Electronics")]
        df2 = spark.createDataFrame(data2, schema2)
        df2.write.format("iceberg").mode("append").save("/tmp/test_schema_evolution")
        
        # Read with schema evolution
        result_df = spark.read.format("iceberg").load("/tmp/test_schema_evolution")
        assert "category" in result_df.columns
        assert result_df.count() == 3
        
        # Verify original rows have NULL for new column
        original = result_df.filter("id IN (1, 2)").collect()
        for row in original:
            assert row["category"] is None
    
    def test_partition_evolution(self, spark):
        """Test partition evolution capabilities."""
        schema = StructType([
            StructField("id", IntegerType(), False),
            StructField("date", StringType(), False),
            StructField("value", IntegerType(), True)
        ])
        
        data = [
            (1, "2024-01-01", 100),
            (2, "2024-01-02", 150)
        ]
        
        df = spark.createDataFrame(data, schema)
        
        # Create partitioned table
        df.write.format("iceberg") \
            .mode("overwrite") \
            .partitionBy("date") \
            .save("/tmp/test_partition_evolution")
        
        # Verify partitioning
        result_df = spark.read.format("iceberg").load("/tmp/test_partition_evolution")
        assert result_df.count() == 2
        
        # Add new partition
        new_data = [(3, "2024-01-03", 200)]
        new_df = spark.createDataFrame(new_data, schema)
        new_df.write.format("iceberg").mode("append").save("/tmp/test_partition_evolution")
        
        # Verify new partition added
        final_df = spark.read.format("iceberg").load("/tmp/test_partition_evolution")
        assert final_df.count() == 3
    
    def test_snapshot_management(self, spark, sample_data):
        """Test snapshot expiration and management."""
        schema, data = sample_data
        df = spark.createDataFrame(data, schema)
        
        # Create multiple snapshots
        for i in range(5):
            df.write.format("iceberg").mode("append").save("/tmp/test_snapshots")
        
        # Count snapshots
        history_df = spark.sql("SELECT * FROM local.default.test_snapshots.snapshots")
        initial_snapshots = history_df.count()
        assert initial_snapshots >= 5
        
        # Expire old snapshots
        spark.sql("""
            CALL local.system.expire_snapshots(
                table => 'default.test_snapshots',
                older_than => TIMESTAMP '2099-01-01T00:00:00',
                retain_last => 3
            )
        """)
        
        # Verify snapshots were expired
        history_df_after = spark.sql("SELECT * FROM local.default.test_snapshots.snapshots")
        assert history_df_after.count() <= 3
    
    def test_data_compaction(self, spark, sample_data):
        """Test data file compaction for performance."""
        schema, data = sample_data
        df = spark.createDataFrame(data, schema)
        
        # Create table with many small files
        for i in range(10):
            small_df = spark.createDataFrame(data[i:i+1], schema)
            small_df.write.format("iceberg").mode("append").save("/tmp/test_compaction")
        
        # Count files before compaction
        files_before = spark.sql("""
            SELECT COUNT(*) as file_count 
            FROM local.default.test_compaction.files
        """).collect()[0]["file_count"]
        
        # Perform compaction
        spark.sql("""
            CALL local.system.rewrite_data_files(
                table => 'default.test_compaction',
                options => map('target-file-size-bytes', '134217728')
            )
        """)
        
        # Count files after compaction
        files_after = spark.sql("""
            SELECT COUNT(*) as file_count 
            FROM local.default.test_compaction.files
        """).collect()[0]["file_count"]
        
        assert files_after < files_before
    
    def test_query_performance(self, spark):
        """Test query performance optimization."""
        import time
        
        # Create a larger dataset
        data = [(i, f"Product {i}", "Category", i * 10) 
                for i in range(10000)]
        schema = StructType([
            StructField("id", IntegerType(), False),
            StructField("name", StringType(), False),
            StructField("category", StringType(), False),
            StructField("value", IntegerType(), True)
        ])
        
        df = spark.createDataFrame(data, schema)
        
        # Write with and without optimization
        df.write.format("iceberg") \
            .mode("overwrite") \
            .partitionBy("category") \
            .save("/tmp/test_performance")
        
        # Test read performance
        start_time = time.time()
        result = spark.read.format("iceberg").load("/tmp/test_performance")
        result.cache()
        count = result.count()
        read_time = time.time() - start_time
        
        assert count == 10000
        assert read_time < 5.0  # Should read quickly
        
        # Test aggregation performance
        start_time = time.time()
        agg_result = result.groupBy("category").agg({"value": "sum"})
        agg_result.show()
        agg_time = time.time() - start_time
        
        assert agg_time < 2.0  # Aggregation should be fast

Cost Analysis

Monthly Cost Breakdown (Production)

ComponentSpecificationMonthly Cost
S3 Storage10TB raw + 5TB curated$230
EMR Cluster3x r5.2xlarge (intermittent)$1,200
Glue ETL100 hours processing$35
Athena Queries1TB scanned monthly$500
Glue CatalogMetadata storage$1
KMS KeysEncryption$3
Data TransferCross-region$50
Total$2,019

Cost Optimization Strategies

💡

Tip: Iceberg's features enable significant cost savings:

  1. Partition Pruning: 70% reduction in data scanned
  2. File Compaction: 50% improvement in query performance
  3. Snapshot Expiration: 30% reduction in storage costs
  4. Columnar Format: 90% compression vs raw data
  5. Predicate Pushdown: 60% less data processing

Performance Comparison

OperationTraditional LakeIceberg LakehouseImprovement
Daily ETL2 hours45 minutes2.7x faster
Ad-hoc Query15 minutes2 minutes7.5x faster
Data Refresh4 hours30 minutes8x faster
Storage Cost500/TB500/TB |150/TB70% cheaper

Interview Talking Points

Architecture Decisions

ℹ️

Best Practice: Highlight these Iceberg advantages in interviews:

  1. ACID Transactions: Row-level locking, snapshot isolation
  2. Time Travel: Historical queries, data auditing, rollback
  3. Schema Evolution: Add columns without rewriting data
  4. Partition Evolution: Change partitioning without data migration
  5. Hidden Partitioning: Users don't need to know partition columns

Common Interview Questions

Q: "How does Iceberg differ from Delta Lake or Hudi?"

comparison = {
    "Iceberg": {
        "Partitioning": "Hidden partitioning, partition evolution",
        "Catalog": "Pluggable (Hive, Glue, REST)",
        "Time Travel": "Snapshot-based, precise",
        "Schema Evolution": "Full support, no data rewrite"
    },
    "Delta Lake": {
        "Partitioning": "Explicit partitioning",
        "Catalog": "Delta Sharing protocol",
        "Time Travel": "Version-based",
        "Schema Evolution": "Limited support"
    },
    "Hudi": {
        "Partitioning": "Explicit partitioning",
        "Catalog": "Hive metastore only",
        "Time Travel": "Incremental queries",
        "Schema Evolution": "Partial support"
    }
}

Q: "How do you handle small file problems?"

solutions = {
    "Write Distribution": "Use distribution modes (hash, range)",
    "File Compaction": "Scheduled rewrite_data_files calls",
    "Target File Size": "Configure write.target-file-size-bytes",
    "Merge Sorting": "Use sort-merge join strategies"
}

Q: "How do you ensure data quality?"

quality_strategies = {
    "Schema Validation": "Use Iceberg schema enforcement",
    "Data Contracts": "Implement with column constraints",
    "Testing": "Great Expectations integration",
    "Monitoring": "Track file counts, row counts, data drift"
}

Deployment Checklist

  • Configure S3 bucket with lifecycle policies
  • Set up Glue Catalog and crawlers
  • Deploy EMR cluster with Iceberg configuration
  • Create Iceberg tables with proper partitioning
  • Implement ETL jobs with Glue or EMR
  • Set up Athena workgroup for queries
  • Configure monitoring and alerting
  • Test time travel and snapshot management
  • Implement data quality checks
  • Document operational procedures

⚠️

Warning: Always test schema changes in non-production environments first. Iceberg supports schema evolution, but proper planning prevents data quality issues.


This project demonstrates modern data lakehouse architecture skills and is highly relevant for data engineering interviews at companies like Netflix, Uber, and LinkedIn.

Advertisement