πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Data Migration Strategies: Zero-Downtime & Schema Evolution

Data EngineeringPlatform Operations⭐ Premium

Advertisement

Data Migration Strategies: Zero-Downtime & Schema Evolution

Difficulty: Senior Level | Companies: Netflix, Uber, Airbnb, Stripe, Databricks

1. Migration Patterns

Common Migration Scenariosβ”œβ”€β”€ Platform Migration (Redshift β†’ Snowflake)β”œβ”€β”€ Format Migration (Parquet β†’ Delta Lake)β”œβ”€β”€ Schema Evolution (Adding/removing columns)β”œβ”€β”€ Storage Migration (On-prem β†’ Cloud)β”œβ”€β”€ Database Migration (MySQL β†’ PostgreSQL)└── Pipeline Migration (Airflow v1 β†’ v2)

Strangler Fig Pattern (Gradual Migration)

Strangler Fig PatternPhase 1: Dual-writeSourceβ†’Legacyβ†˜NewPhase 2: Shadow-readSourceβ†’Legacy←NewPhase 3: CutoverSourceβ†’New OnlyGradual Migration StrategyPhase 1: Write to both systems β†’ Phase 2: Read from both for validation β†’ Phase 3: Cut over to new systemZero-downtime migration with rollback capability

2. Zero-Downtime Migration Implementation

Dual-Write Pattern

import threading
from typing import Any
from datetime import datetime

class DualWriter:
    def __init__(self, legacy_store, new_store):
        self.legacy = legacy_store
        self.new = new_store
        self.lock = threading.Lock()
    
    def write(self, key: str, value: Any):
        """Write to both stores atomically"""
        with self.lock:
            # Write to legacy first (it's the source of truth)
            self.legacy.write(key, value)
            
            # Write to new store
            try:
                self.new.write(key, value)
            except Exception as e:
                # Log but don't fail β€” legacy is source of truth
                log_migration_error(key, e)
    
    def read(self, key: str, source: str = "legacy"):
        """Read from specified source"""
        if source == "legacy":
            return self.legacy.read(key)
        else:
            return self.new.read(key)
    
    def verify_consistency(self, key: str) -> bool:
        """Verify both stores have same data"""
        legacy_val = self.legacy.read(key)
        new_val = self.new.read(key)
        return legacy_val == new_val

class MigrationVerifier:
    def __init__(self, dual_writer: DualWriter):
        self.writer = dual_writer
    
    def verify_batch(self, keys: list, sample_size: int = 1000):
        """Verify consistency for a batch of keys"""
        sample = keys[:sample_size]
        results = {"match": 0, "mismatch": 0, "errors": []}
        
        for key in sample:
            try:
                if self.writer.verify_consistency(key):
                    results["match"] += 1
                else:
                    results["mismatch"] += 1
                    results["errors"].append(key)
            except Exception as e:
                results["errors"].append(f"{key}: {e}")
        
        consistency_rate = results["match"] / len(sample)
        print(f"Consistency rate: {consistency_rate:.2%}")
        
        if consistency_rate < 0.99:
            raise MigrationConsistencyError(
                f"Consistency rate {consistency_rate:.2%} below threshold"
            )
        
        return results

3. Schema Evolution Strategies

Additive Schema Changes (Safe)

class SchemaEvolver:
    """Handle schema changes without downtime"""
    
    @staticmethod
    def add_column_with_default(df, column_name: str, default_value, column_type: str):
        """Add column with default β€” backward compatible"""
        return df.withColumn(
            column_name,
            F.lit(default_value).cast(column_type)
        )
    
    @staticmethod
    def rename_column_backwards_compatible(df, old_name: str, new_name: str):
        """Rename with backward compatibility"""
        # Add new column, keep old column
        return df.withColumn(new_name, F.col(old_name))
    
    @staticmethod
    def safely_drop_column(df, column_name: str, downstream_tables: list):
        """Drop column only after all consumers have migrated"""
        # Check if any downstream tables still reference this column
        for table in downstream_tables:
            if column_name in table.referenced_columns:
                raise CannotDropColumnError(
                    f"Column {column_name} still referenced by {table.name}"
                )
        return df.drop(column_name)

Big Schema Changes (Breaking)

class BreakingMigration:
    """Handle breaking schema changes with a migration plan"""
    
    def migrate_orders_v1_to_v2(self, spark):
        """
        V1: {order_id, user_id, amount, timestamp}
        V2: {order_id, user_id, items[Array], total_amount, currency, timestamp}
        """
        
        # Step 1: Create new table with V2 schema
        spark.sql("""
            CREATE TABLE orders_v2 (
                order_id STRING,
                user_id STRING,
                items ARRAY<STRUCT<product_id STRING, quantity INT, price DECIMAL>>,
                total_amount DECIMAL(12,2),
                currency STRING DEFAULT 'USD',
                created_at TIMESTAMP,
                updated_at TIMESTAMP
            )
        """)
        
        # Step 2: Migrate existing data
        spark.sql("""
            INSERT INTO orders_v2
            SELECT
                order_id,
                user_id,
                ARRAY(STRUCT(product_id, 1 AS quantity, amount AS price)) AS items,
                amount AS total_amount,
                'USD' AS currency,
                timestamp AS created_at,
                CURRENT_TIMESTAMP() AS updated_at
            FROM orders_v1
        """)
        
        # Step 3: Create view for backward compatibility
        spark.sql("""
            CREATE VIEW orders_compat AS
            SELECT
                order_id,
                user_id,
                total_amount AS amount,
                created_at AS timestamp
            FROM orders_v2
        """)
        
        # Step 4: Redirect traffic to new table
        # Step 5: Drop old table after verification period

4. Data Platform Migration

Redshift β†’ Snowflake

class PlatformMigration:
    def __init__(self, source, target):
        self.source = source
        self.target = target
        self.migration_log = []
    
    def migrate_table(self, table_name: str, strategy: str = "full"):
        """Migrate a single table"""
        
        if strategy == "full":
            # Full dump and load
            data = self.source.export_table(table_name)
            self.target.import_table(table_name, data)
        
        elif strategy == "incremental":
            # Incremental migration with watermarks
            last_watermark = self.get_watermark(table_name)
            data = self.source.export_incremental(table_name, last_watermark)
            self.target.import_incremental(table_name, data)
            self.update_watermark(table_name, data.max_timestamp())
        
        elif strategy == "ctas":
            # CREATE TABLE AS SELECT with transformation
            self.target.execute(f"""
                CREATE TABLE {table_name} AS
                SELECT * FROM {self.source.fully_qualified(table_name)}
            """)
    
    def verify_migration(self, table_name: str) -> dict:
        """Verify row counts, checksums, schema"""
        source_info = self.source.get_table_info(table_name)
        target_info = self.target.get_table_info(table_name)
        
        return {
            "row_count_match": source_info.row_count == target_info.row_count,
            "source_rows": source_info.row_count,
            "target_rows": target_info.row_count,
            "schema_match": source_info.columns == target_info.columns,
            "checksum_match": source_info.checksum == target_info.checksum,
        }

5. Migration Checklist

Architecture Diagram
Pre-Migration:
β–‘ Inventory all tables and dependencies
β–‘ Document current schema and SLAs
β–‘ Identify downstream consumers
β–‘ Create rollback plan
β–‘ Set up monitoring and alerting

Migration:
β–‘ Dual-write to both systems
β–‘ Run consistency verification
β–‘ Shadow-read from new system
β–‘ Compare query results
β–‘ Monitor performance metrics

Post-Migration:
β–‘ Cutover traffic to new system
β–‘ Keep legacy system as backup (30 days)
β–‘ Update documentation
β–‘ Decommission old infrastructure
β–‘ Notify all stakeholders

⚠️

Common Interview Trap: Don't forget about the rollback plan. Always have a way to revert if the migration fails β€” this is the #1 thing interviewers look for.

ℹ️

Best Practice: Migrate the easiest tables first to build confidence, then tackle the complex ones. Use data validation checksums to prove consistency.

Follow-Up Questions

  1. How would you migrate a 5PB data lake from on-prem to S3?
  2. Design a migration plan for a real-time streaming pipeline.
  3. How do you handle foreign key constraints during migration?
  4. Design a dual-write system for a globally distributed database.
  5. How would you migrate a data warehouse while it's serving 1000+ dashboards?

Advertisement