πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Schema Evolution in Parquet, Avro, and Iceberg

Data EngineeringData Formats⭐ Premium

Advertisement

Databricks & Netflix Interview

Schema Evolution in Parquet, Avro, and Iceberg

Managing schema changes in data lakes and warehouses

Interview Question

"Your upstream team adds a new column 'discount_amount' to the orders table. The existing Parquet files don't have this column. Explain: (1) How do different file formats handle this? (2) What happens to existing queries? (3) How do you safely migrate without downtime? (4) What are the tradeoffs?"

Difficulty: Medium-Hard | Frequently asked at Databricks, Netflix, Uber, Airbnb


Theoretical Foundation

What is Schema Evolution?

Schema evolution is the ability to change the schema of a dataset over time while maintaining compatibility with existing data and queries.

Common schema changes:

  • Adding a column
  • Removing a column
  • Renaming a column
  • Changing data type
  • Reordering columns
  • Changing nullability

Schema Evolution Strategies

1. Schema-on-Read (Data Lakes)

Architecture Diagram
Raw Data:                    Schema Applied at Read:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ {"id": 1,           β”‚      β”‚ id: 1               β”‚
β”‚  "name": "John",    β”‚ ───▢ β”‚ name: "John"        β”‚
β”‚  "email": "..."}    β”‚      β”‚ email: "..."        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚ age: NULL (new col) β”‚
                             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Pros: Flexible, no write-time overhead Cons: No validation, potential read-time errors

2. Schema-on-Write (Data Warehouses)

Architecture Diagram
Schema Definition:           Data Written:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ id: INTEGER (NOT NULL)β”‚     β”‚ id: 1               β”‚
β”‚ name: VARCHAR(100)  β”‚ ───▢ β”‚ name: "John"        β”‚
β”‚ email: VARCHAR(200) β”‚      β”‚ email: "..."        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             (Rejects if schema doesn't match)

Pros: Guaranteed schema, early validation Cons: Inflexible, requires migration

3. Evolutionary Schema (Delta Lake/Iceberg)

Architecture Diagram
Version 1:                  Version 2 (after evolution):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ id: INTEGER         β”‚      β”‚ id: INTEGER         β”‚
β”‚ name: VARCHAR       β”‚ ───▢ β”‚ name: VARCHAR       β”‚
β”‚ email: VARCHAR      β”‚      β”‚ email: VARCHAR      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚ discount: DECIMAL   β”‚
                             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             (New column added, old files preserved)

Pros: Flexible, backward compatible, transactional Cons: Requires specific table format

Parquet Schema Evolution

Parquet stores schema in file metadata. Each file has its own schema.

File 1 (2024-01-01)File 2 (2024-01-02)Row Groupid: INTname: STRINGemail: STRINGRow Groupid: INTname: STRINGemail: STRINGdiscount: DECIMAL ← NEWEach file can have its own schema β†’ merge at read time

Evolution modes:

  1. Merge schemas: Combine schemas from all files
  2. Use latest schema: Apply latest schema to all files
  3. Fail on mismatch: Reject if schemas don't match
# PySpark Parquet schema evolution
spark.read \
    .option("mergeSchema", "true") \
    .parquet("s3://data-lake/orders/")

# Write with schema evolution
spark.write \
    .option("mergeSchema", "true") \
    .mode("append") \
    .parquet("s3://data-lake/orders/")

Avro Schema Evolution

Avro stores schema with data and supports schema resolution.

// Schema v1
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

// Schema v2 (added field with default)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "discount", "type": "double", "default": 0.0}
  ]
}

Avro evolution rules:

  • New fields must have default values
  • Removed fields must have had defaults
  • Field types must be compatible
  • Field names are case-sensitive

Iceberg Schema Evolution

Iceberg supports full schema evolution without data rewriting.

-- Add column
ALTER TABLE catalog.db.orders ADD COLUMN discount DECIMAL(10,2);

-- Rename column
ALTER TABLE catalog.db.orders RENAME COLUMN name TO customer_name;

-- Change column type (if compatible)
ALTER TABLE catalog.db.orders ALTER COLUMN amount TYPE DECIMAL(12,2);

-- Drop column
ALTER TABLE catalog.db.orders DROP COLUMN obsolete_field;

Iceberg advantages:

  • No data rewriting
  • Atomic schema changes
  • Partition evolution
  • Hidden partitioning

Delta Lake Schema Evolution

Delta Lake supports schema evolution through MERGE and column mapping.

# Schema evolution with Delta Lake
new_data = spark.read.json("s3://data-lake/new_orders/")

# Write with schema evolution
new_data.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("s3://delta-lake/orders/")

# Schema enforcement (reject mismatched schemas)
new_data.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "false") \
    .save("s3://delta-lake/orders/")

Schema Compatibility Matrix

ChangeParquetAvroIcebergDelta Lake
Add columnβœ… (merge)βœ… (with default)βœ…βœ… (mergeSchema)
Remove column⚠️ (may break)βœ… (if had default)βœ…βš οΈ (may break)
Rename columnβŒβŒβœ…βœ… (column mapping)
Change type❌⚠️ (compatible only)βœ… (compatible)⚠️ (compatible)
Reorder columnsβŒβŒβœ…βœ…

Backward vs Forward Compatibility

Backward compatibility: New schema can read old data Forward compatibility: Old schema can read new data

Architecture Diagram
Backward Compatible:
Old Reader + New Data = βœ… (ignores new columns)

Forward Compatible:
New Reader + Old Data = βœ… (uses defaults for missing columns)

Both Compatible:
Old Reader + New Data = βœ…
New Reader + Old Data = βœ…

Code Implementation

Parquet Schema Evolution

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder \
    .appName("ParquetSchemaEvolution") \
    .getOrCreate()

# ============================================================
# STEP 1: Create initial data with schema v1
# ============================================================

schema_v1 = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_name", StringType(), True),
    StructField("amount", DoubleType(), True),
])

data_v1 = [
    (1, "John Doe", 100.0),
    (2, "Jane Smith", 200.0),
    (3, "Bob Johnson", 150.0),
]

df_v1 = spark.createDataFrame(data_v1, schema_v1)
df_v1.write.mode("overwrite").parquet("s3://data-lake/orders/")

# Verify schema
df_read = spark.read.parquet("s3://data-lake/orders/")
print("Schema v1:")
df_read.printSchema()

# ============================================================
# STEP 2: Add new column (schema v2)
# ============================================================

schema_v2 = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_name", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("discount", DoubleType(), True),  # NEW COLUMN
])

data_v2 = [
    (4, "Alice Brown", 300.0, 10.0),
    (5, "Charlie Wilson", 250.0, 15.0),
]

df_v2 = spark.createDataFrame(data_v2, schema_v2)

# Write with mergeSchema option
df_v2.write \
    .mode("append") \
    .option("mergeSchema", "true") \
    .parquet("s3://data-lake/orders/")

# Read all data (old + new)
df_merged = spark.read \
    .option("mergeSchema", "true") \
    .parquet("s3://data-lake/orders/")

print("Schema v2 (merged):")
df_merged.printSchema()
df_merged.show()

# ============================================================
# STEP 3: Handle schema conflicts
# ============================================================

# If new data has incompatible schema, it will fail
# Example: Trying to write string to integer column
try:
    bad_data = [(6, "David Lee", "not_a_number")]
    df_bad = spark.createDataFrame(bad_data, ["order_id", "customer_name", "amount"])
    df_bad.write.mode("append").parquet("s3://data-lake/orders/")
except Exception as e:
    print(f"Schema conflict detected: {e}")

Avro Schema Evolution

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

# ============================================================
# SCHEMA REGISTRY SETUP
# ============================================================

schema_registry_url = 'http://schema-registry:8081'
schema_registry = CachedSchemaRegistryClient({'url': schema_registry_url})

# Schema v1
schema_v1 = avro.loads('''
{
    "type": "record",
    "name": "Order",
    "namespace": "com.company.orders",
    "fields": [
        {"name": "order_id", "type": "long"},
        {"name": "customer_name", "type": "string"},
        {"name": "amount", "type": "double"}
    ]
}
''')

# Schema v2 (added field with default)
schema_v2 = avro.loads('''
{
    "type": "record",
    "name": "Order",
    "namespace": "com.company.orders",
    "fields": [
        {"name": "order_id", "type": "long"},
        {"name": "customer_name", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "discount", "type": ["null", "double"], "default": null}
    ]
}
''')

# ============================================================
# PRODUCER WITH SCHEMA EVOLUTION
# ============================================================

producer = AvroProducer({
    'bootstrap.servers': 'kafka:9092',
    'schema.registry.url': schema_registry_url,
})

# Produce with schema v1
for i in range(100):
    order = {
        'order_id': i,
        'customer_name': f'Customer {i}',
        'amount': float(i * 10)
    }
    producer.produce(topic='orders', value=order, value_schema=schema_v1)
producer.flush()

# Produce with schema v2 (new field)
for i in range(100, 200):
    order = {
        'order_id': i,
        'customer_name': f'Customer {i}',
        'amount': float(i * 10),
        'discount': float(i * 0.5)  # NEW FIELD
    }
    producer.produce(topic='orders', value=order, value_schema=schema_v2)
producer.flush()

# ============================================================
# CONSUMER WITH SCHEMA EVOLUTION
# ============================================================

consumer = AvroConsumer({
    'bootstrap.servers': 'kafka:9092',
    'schema.registry.url': schema_registry_url,
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
})

consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Error: {msg.error()}")
        continue
    
    # Message will be deserialized with the schema it was written with
    order = msg.value()
    print(f"Order: {order}")
    
    # Old messages won't have 'discount' field
    discount = order.get('discount', 0.0)

Iceberg Schema Evolution

# ============================================================
# ICEBERG SCHEMA EVOLUTION
# ============================================================

# Create table with initial schema
spark.sql("""
    CREATE TABLE catalog.db.orders (
        order_id BIGINT,
        customer_name STRING,
        amount DECIMAL(10,2)
    ) USING iceberg
    PARTITIONED BY (days(order_id))  -- Example partitioning
""")

# Insert initial data
spark.sql("""
    INSERT INTO catalog.db.orders VALUES
    (1, 'John Doe', 100.00),
    (2, 'Jane Smith', 200.00)
""")

# ============================================================
# ADD COLUMN
# ============================================================

spark.sql("""
    ALTER TABLE catalog.db.orders 
    ADD COLUMN discount DECIMAL(10,2)
""")

# Insert new data (old data will have NULL for discount)
spark.sql("""
    INSERT INTO catalog.db.orders VALUES
    (3, 'Bob Johnson', 150.00, 10.00),
    (4, 'Alice Brown', 300.00, 15.00)
""")

# Query all data
spark.sql("SELECT * FROM catalog.db.orders").show()

# ============================================================
# RENAME COLUMN
# ============================================================

spark.sql("""
    ALTER TABLE catalog.db.orders 
    RENAME COLUMN customer_name TO customer_full_name
""")

# ============================================================
# CHANGE COLUMN TYPE (compatible only)
# ============================================================

spark.sql("""
    ALTER TABLE catalog.db.orders 
    ALTER COLUMN amount TYPE DECIMAL(12,2)
""")

# ============================================================
# DROP COLUMN
# ============================================================

spark.sql("""
    ALTER TABLE catalog.db.orders 
    DROP COLUMN obsolete_field
""")

# ============================================================
# PARTITION EVOLUTION
# ============================================================

# Change partitioning without rewriting data
spark.sql("""
    ALTER TABLE catalog.db.orders 
    DROP PARTITION FIELD days(order_id)
""")

spark.sql("""
    ALTER TABLE catalog.db.orders 
    PARTITION BY (customer_full_name)
""")

Delta Lake Schema Evolution

# ============================================================
# DELTA LAKE SCHEMA EVOLUTION
# ============================================================

# Create Delta table
spark.sql("""
    CREATE TABLE delta.`/delta-lake/orders/` (
        order_id BIGINT,
        customer_name STRING,
        amount DECIMAL(10,2)
    ) USING delta
""")

# Insert initial data
spark.sql("""
    INSERT INTO delta.`/delta-lake/orders/` VALUES
    (1, 'John Doe', 100.00),
    (2, 'Jane Smith', 200.00)
""")

# ============================================================
# SCHEMA EVOLUTION WITH MERGE
# ============================================================

# New data with additional column
new_data = spark.createDataFrame([
    (3, 'Bob Johnson', 150.00, 10.00),
    (4, 'Alice Brown', 300.00, 15.00),
], ["order_id", "customer_name", "amount", "discount"])

# Write with schema evolution
new_data.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/delta-lake/orders/")

# Read with schema evolution
df = spark.read.format("delta").load("/delta-lake/orders/")
df.printSchema()

# ============================================================
# COLUMN MAPPING (Delta Lake 2.0+)
# ============================================================

# Enable column mapping
spark.sql("""
    ALTER TABLE delta.`/delta-lake/orders/` 
    SET TBLPROPERTIES ('delta.enableColumnMapping' = 'true')
""")

# Now you can rename and drop columns
spark.sql("""
    ALTER TABLE delta.`/delta-lake/orders/` 
    RENAME COLUMN customer_name TO name
""")

spark.sql("""
    ALTER TABLE delta.`/delta-lake/orders/` 
    DROP COLUMN discount
""")

Safe Migration Strategy

# ============================================================
# SAFE MIGRATION STRATEGY
# ============================================================

def safe_schema_migration(spark, table_path, new_data, target_schema):
    """
    Safely migrate schema with zero downtime.
    
    Strategy:
    1. Write new data to separate location
    2. Validate schema compatibility
    3. Merge data
    4. Update views
    """
    
    # Step 1: Validate schema compatibility
    if not validate_schema_compatibility(new_data.schema, target_schema):
        raise ValueError("Schema incompatible with target")
    
    # Step 2: Write to staging area
    staging_path = f"{table_path}_staging"
    new_data.write \
        .format("delta") \
        .mode("overwrite") \
        .save(staging_path)
    
    # Step 3: Validate data quality
    staging_df = spark.read.format("delta").load(staging_path)
    if not validate_data_quality(staging_df):
        raise ValueError("Data quality check failed")
    
    # Step 4: Merge into main table
    main_df = spark.read.format("delta").load(table_path)
    
    # Use Delta Lake MERGE for atomic operation
    from delta.tables import DeltaTable
    
    delta_table = DeltaTable.forPath(spark, table_path)
    
    delta_table.alias("target").merge(
        staging_df.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
    
    # Step 5: Clean up staging
    import shutil
    shutil.rmtree(staging_path)
    
    print("Schema migration completed successfully")

def validate_schema_compatibility(old_schema, new_schema):
    """Validate that new schema is compatible"""
    
    old_fields = {f.name: f for f in old_schema.fields}
    new_fields = {f.name: f for f in new_schema.fields}
    
    # Check for removed required fields
    for name, field in old_fields.items():
        if not field.nullable and name not in new_fields:
            return False
    
    # Check for type compatibility
    for name, field in new_fields.items():
        if name in old_fields:
            if not is_type_compatible(old_fields[name].dataType, field.dataType):
                return False
    
    return True

def validate_data_quality(df):
    """Validate data quality"""
    # Add your quality checks here
    return df.count() > 0

Schema Registry Integration

# ============================================================
# SCHEMA REGISTRY INTEGRATION
# ============================================================

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka import SerializingProducer

# Schema Registry client
schema_registry = SchemaRegistryClient({
    'url': 'http://schema-registry:8081'
})

# Register schema
schema_str = '''
{
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "long"},
        {"name": "amount", "type": "double"}
    ]
}
'''

# Check compatibility
compatibility = schema_registry.test_compatibility('orders-value', schema_str)
print(f"Schema compatible: {compatibility}")

# Register new schema version
schema_id = schema_registry.register_schema('orders-value', schema_str)
print(f"Registered schema ID: {schema_id}")

# Get schema versions
versions = schema_registry.get_versions('orders-value')
print(f"Schema versions: {versions}")

πŸ’‘

Production Tip: Always use a Schema Registry for Avro data in Kafka. It ensures schema compatibility, provides version history, and enables schema evolution without breaking consumers.


Common Follow-Up Questions

Q1: How do you handle schema evolution in real-time pipelines?

# Real-time schema evolution with Kafka + Schema Registry
from confluent_kafka import DeserializingProducer
from confluent_kafka.schema_registry.avro import AvroDeserializer

# Consumer with schema evolution
consumer = AvroConsumer({
    'bootstrap.servers': 'kafka:9092',
    'schema.registry.url': 'http://schema-registry:8081',
    'group.id': 'real-time-processor',
})

# Messages are automatically deserialized with correct schema
# Old messages use old schema, new messages use new schema

Q2: What's the impact of schema evolution on downstream systems?

  • Data warehouses: May need to ALTER TABLE
  • BI tools: May need to refresh metadata
  • ML models: May need retraining if features change
  • APIs: May need versioning

Q3: How do you test schema evolution?

# Test schema evolution
def test_schema_evolution():
    # Create initial data
    df_v1 = spark.createDataFrame([(1, "John")], ["id", "name"])
    df_v1.write.parquet("/tmp/test_schema/")
    
    # Add column
    df_v2 = spark.createDataFrame([(2, "Jane", 25)], ["id", "name", "age"])
    df_v2.write.mode("append").parquet("/tmp/test_schema/")
    
    # Read with mergeSchema
    df_merged = spark.read.option("mergeSchema", "true").parquet("/tmp/test_schema/")
    
    # Verify schema
    assert "age" in df_merged.columns
    assert df_merged.count() == 2
    
    print("Schema evolution test passed!")

Q4: How do you rollback schema changes?

# Delta Lake: Time travel to previous schema
df_old_schema = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/delta-lake/orders/")

# Iceberg: Time travel
df_old_schema = spark.sql("""
    SELECT * FROM catalog.db.orders
    TIMESTAMP AS OF '2024-01-01 00:00:00'
""")

⚠️

Critical Consideration: Schema evolution can break downstream systems. Always: (1) communicate changes, (2) provide migration guides, (3) support old schemas for a transition period, and (4) monitor for errors.


Company-Specific Tips

Databricks Interview Tips

  • Discuss Delta Lake column mapping
  • Explain Time Travel for schema rollback
  • Mention Auto Loader for schema inference
  • Talk about schema evolution in Structured Streaming

Netflix Interview Tips

  • Focus on content metadata schema changes
  • Discuss A/B testing schema evolution
  • Mention multi-format support (Parquet, Avro)
  • Talk about backward compatibility requirements

Uber Interview Tips

  • Discuss ride data schema evolution
  • Explain geospatial schema changes
  • Mention real-time schema validation
  • Talk about schema registry best practices

ℹ️

Final Takeaway: Schema evolution is essential for long-lived data systems. Choose the right table format (Iceberg, Delta Lake) for your use case, use Schema Registry for Kafka, and always validate compatibility before deploying changes.

Advertisement