Schema Evolution in Parquet, Avro, and Iceberg
Managing schema changes in data lakes and warehouses
Interview Question
"Your upstream team adds a new column 'discount_amount' to the orders table. The existing Parquet files don't have this column. Explain: (1) How do different file formats handle this? (2) What happens to existing queries? (3) How do you safely migrate without downtime? (4) What are the tradeoffs?"
Difficulty: Medium-Hard | Frequently asked at Databricks, Netflix, Uber, Airbnb
Theoretical Foundation
What is Schema Evolution?
Schema evolution is the ability to change the schema of a dataset over time while maintaining compatibility with existing data and queries.
Common schema changes:
- Adding a column
- Removing a column
- Renaming a column
- Changing data type
- Reordering columns
- Changing nullability
Schema Evolution Strategies
1. Schema-on-Read (Data Lakes)
Raw Data: Schema Applied at Read:
βββββββββββββββββββββββ βββββββββββββββββββββββ
β {"id": 1, β β id: 1 β
β "name": "John", β ββββΆ β name: "John" β
β "email": "..."} β β email: "..." β
βββββββββββββββββββββββ β age: NULL (new col) β
βββββββββββββββββββββββ
Pros: Flexible, no write-time overhead Cons: No validation, potential read-time errors
2. Schema-on-Write (Data Warehouses)
Schema Definition: Data Written:
βββββββββββββββββββββββ βββββββββββββββββββββββ
β id: INTEGER (NOT NULL)β β id: 1 β
β name: VARCHAR(100) β ββββΆ β name: "John" β
β email: VARCHAR(200) β β email: "..." β
βββββββββββββββββββββββ βββββββββββββββββββββββ
(Rejects if schema doesn't match)
Pros: Guaranteed schema, early validation Cons: Inflexible, requires migration
3. Evolutionary Schema (Delta Lake/Iceberg)
Version 1: Version 2 (after evolution):
βββββββββββββββββββββββ βββββββββββββββββββββββ
β id: INTEGER β β id: INTEGER β
β name: VARCHAR β ββββΆ β name: VARCHAR β
β email: VARCHAR β β email: VARCHAR β
βββββββββββββββββββββββ β discount: DECIMAL β
βββββββββββββββββββββββ
(New column added, old files preserved)
Pros: Flexible, backward compatible, transactional Cons: Requires specific table format
Parquet Schema Evolution
Parquet stores schema in file metadata. Each file has its own schema.
Evolution modes:
- Merge schemas: Combine schemas from all files
- Use latest schema: Apply latest schema to all files
- Fail on mismatch: Reject if schemas don't match
# PySpark Parquet schema evolution
spark.read \
.option("mergeSchema", "true") \
.parquet("s3://data-lake/orders/")
# Write with schema evolution
spark.write \
.option("mergeSchema", "true") \
.mode("append") \
.parquet("s3://data-lake/orders/")
Avro Schema Evolution
Avro stores schema with data and supports schema resolution.
// Schema v1
{
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
// Schema v2 (added field with default)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "discount", "type": "double", "default": 0.0}
]
}
Avro evolution rules:
- New fields must have default values
- Removed fields must have had defaults
- Field types must be compatible
- Field names are case-sensitive
Iceberg Schema Evolution
Iceberg supports full schema evolution without data rewriting.
-- Add column
ALTER TABLE catalog.db.orders ADD COLUMN discount DECIMAL(10,2);
-- Rename column
ALTER TABLE catalog.db.orders RENAME COLUMN name TO customer_name;
-- Change column type (if compatible)
ALTER TABLE catalog.db.orders ALTER COLUMN amount TYPE DECIMAL(12,2);
-- Drop column
ALTER TABLE catalog.db.orders DROP COLUMN obsolete_field;
Iceberg advantages:
- No data rewriting
- Atomic schema changes
- Partition evolution
- Hidden partitioning
Delta Lake Schema Evolution
Delta Lake supports schema evolution through MERGE and column mapping.
# Schema evolution with Delta Lake
new_data = spark.read.json("s3://data-lake/new_orders/")
# Write with schema evolution
new_data.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3://delta-lake/orders/")
# Schema enforcement (reject mismatched schemas)
new_data.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "false") \
.save("s3://delta-lake/orders/")
Schema Compatibility Matrix
| Change | Parquet | Avro | Iceberg | Delta Lake |
|---|---|---|---|---|
| Add column | β (merge) | β (with default) | β | β (mergeSchema) |
| Remove column | β οΈ (may break) | β (if had default) | β | β οΈ (may break) |
| Rename column | β | β | β | β (column mapping) |
| Change type | β | β οΈ (compatible only) | β (compatible) | β οΈ (compatible) |
| Reorder columns | β | β | β | β |
Backward vs Forward Compatibility
Backward compatibility: New schema can read old data Forward compatibility: Old schema can read new data
Backward Compatible:
Old Reader + New Data = β
(ignores new columns)
Forward Compatible:
New Reader + Old Data = β
(uses defaults for missing columns)
Both Compatible:
Old Reader + New Data = β
New Reader + Old Data = β
Code Implementation
Parquet Schema Evolution
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder \
.appName("ParquetSchemaEvolution") \
.getOrCreate()
# ============================================================
# STEP 1: Create initial data with schema v1
# ============================================================
schema_v1 = StructType([
StructField("order_id", IntegerType(), False),
StructField("customer_name", StringType(), True),
StructField("amount", DoubleType(), True),
])
data_v1 = [
(1, "John Doe", 100.0),
(2, "Jane Smith", 200.0),
(3, "Bob Johnson", 150.0),
]
df_v1 = spark.createDataFrame(data_v1, schema_v1)
df_v1.write.mode("overwrite").parquet("s3://data-lake/orders/")
# Verify schema
df_read = spark.read.parquet("s3://data-lake/orders/")
print("Schema v1:")
df_read.printSchema()
# ============================================================
# STEP 2: Add new column (schema v2)
# ============================================================
schema_v2 = StructType([
StructField("order_id", IntegerType(), False),
StructField("customer_name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("discount", DoubleType(), True), # NEW COLUMN
])
data_v2 = [
(4, "Alice Brown", 300.0, 10.0),
(5, "Charlie Wilson", 250.0, 15.0),
]
df_v2 = spark.createDataFrame(data_v2, schema_v2)
# Write with mergeSchema option
df_v2.write \
.mode("append") \
.option("mergeSchema", "true") \
.parquet("s3://data-lake/orders/")
# Read all data (old + new)
df_merged = spark.read \
.option("mergeSchema", "true") \
.parquet("s3://data-lake/orders/")
print("Schema v2 (merged):")
df_merged.printSchema()
df_merged.show()
# ============================================================
# STEP 3: Handle schema conflicts
# ============================================================
# If new data has incompatible schema, it will fail
# Example: Trying to write string to integer column
try:
bad_data = [(6, "David Lee", "not_a_number")]
df_bad = spark.createDataFrame(bad_data, ["order_id", "customer_name", "amount"])
df_bad.write.mode("append").parquet("s3://data-lake/orders/")
except Exception as e:
print(f"Schema conflict detected: {e}")
Avro Schema Evolution
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
# ============================================================
# SCHEMA REGISTRY SETUP
# ============================================================
schema_registry_url = 'http://schema-registry:8081'
schema_registry = CachedSchemaRegistryClient({'url': schema_registry_url})
# Schema v1
schema_v1 = avro.loads('''
{
"type": "record",
"name": "Order",
"namespace": "com.company.orders",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "customer_name", "type": "string"},
{"name": "amount", "type": "double"}
]
}
''')
# Schema v2 (added field with default)
schema_v2 = avro.loads('''
{
"type": "record",
"name": "Order",
"namespace": "com.company.orders",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "customer_name", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "discount", "type": ["null", "double"], "default": null}
]
}
''')
# ============================================================
# PRODUCER WITH SCHEMA EVOLUTION
# ============================================================
producer = AvroProducer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': schema_registry_url,
})
# Produce with schema v1
for i in range(100):
order = {
'order_id': i,
'customer_name': f'Customer {i}',
'amount': float(i * 10)
}
producer.produce(topic='orders', value=order, value_schema=schema_v1)
producer.flush()
# Produce with schema v2 (new field)
for i in range(100, 200):
order = {
'order_id': i,
'customer_name': f'Customer {i}',
'amount': float(i * 10),
'discount': float(i * 0.5) # NEW FIELD
}
producer.produce(topic='orders', value=order, value_schema=schema_v2)
producer.flush()
# ============================================================
# CONSUMER WITH SCHEMA EVOLUTION
# ============================================================
consumer = AvroConsumer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': schema_registry_url,
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
# Message will be deserialized with the schema it was written with
order = msg.value()
print(f"Order: {order}")
# Old messages won't have 'discount' field
discount = order.get('discount', 0.0)
Iceberg Schema Evolution
# ============================================================
# ICEBERG SCHEMA EVOLUTION
# ============================================================
# Create table with initial schema
spark.sql("""
CREATE TABLE catalog.db.orders (
order_id BIGINT,
customer_name STRING,
amount DECIMAL(10,2)
) USING iceberg
PARTITIONED BY (days(order_id)) -- Example partitioning
""")
# Insert initial data
spark.sql("""
INSERT INTO catalog.db.orders VALUES
(1, 'John Doe', 100.00),
(2, 'Jane Smith', 200.00)
""")
# ============================================================
# ADD COLUMN
# ============================================================
spark.sql("""
ALTER TABLE catalog.db.orders
ADD COLUMN discount DECIMAL(10,2)
""")
# Insert new data (old data will have NULL for discount)
spark.sql("""
INSERT INTO catalog.db.orders VALUES
(3, 'Bob Johnson', 150.00, 10.00),
(4, 'Alice Brown', 300.00, 15.00)
""")
# Query all data
spark.sql("SELECT * FROM catalog.db.orders").show()
# ============================================================
# RENAME COLUMN
# ============================================================
spark.sql("""
ALTER TABLE catalog.db.orders
RENAME COLUMN customer_name TO customer_full_name
""")
# ============================================================
# CHANGE COLUMN TYPE (compatible only)
# ============================================================
spark.sql("""
ALTER TABLE catalog.db.orders
ALTER COLUMN amount TYPE DECIMAL(12,2)
""")
# ============================================================
# DROP COLUMN
# ============================================================
spark.sql("""
ALTER TABLE catalog.db.orders
DROP COLUMN obsolete_field
""")
# ============================================================
# PARTITION EVOLUTION
# ============================================================
# Change partitioning without rewriting data
spark.sql("""
ALTER TABLE catalog.db.orders
DROP PARTITION FIELD days(order_id)
""")
spark.sql("""
ALTER TABLE catalog.db.orders
PARTITION BY (customer_full_name)
""")
Delta Lake Schema Evolution
# ============================================================
# DELTA LAKE SCHEMA EVOLUTION
# ============================================================
# Create Delta table
spark.sql("""
CREATE TABLE delta.`/delta-lake/orders/` (
order_id BIGINT,
customer_name STRING,
amount DECIMAL(10,2)
) USING delta
""")
# Insert initial data
spark.sql("""
INSERT INTO delta.`/delta-lake/orders/` VALUES
(1, 'John Doe', 100.00),
(2, 'Jane Smith', 200.00)
""")
# ============================================================
# SCHEMA EVOLUTION WITH MERGE
# ============================================================
# New data with additional column
new_data = spark.createDataFrame([
(3, 'Bob Johnson', 150.00, 10.00),
(4, 'Alice Brown', 300.00, 15.00),
], ["order_id", "customer_name", "amount", "discount"])
# Write with schema evolution
new_data.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/delta-lake/orders/")
# Read with schema evolution
df = spark.read.format("delta").load("/delta-lake/orders/")
df.printSchema()
# ============================================================
# COLUMN MAPPING (Delta Lake 2.0+)
# ============================================================
# Enable column mapping
spark.sql("""
ALTER TABLE delta.`/delta-lake/orders/`
SET TBLPROPERTIES ('delta.enableColumnMapping' = 'true')
""")
# Now you can rename and drop columns
spark.sql("""
ALTER TABLE delta.`/delta-lake/orders/`
RENAME COLUMN customer_name TO name
""")
spark.sql("""
ALTER TABLE delta.`/delta-lake/orders/`
DROP COLUMN discount
""")
Safe Migration Strategy
# ============================================================
# SAFE MIGRATION STRATEGY
# ============================================================
def safe_schema_migration(spark, table_path, new_data, target_schema):
"""
Safely migrate schema with zero downtime.
Strategy:
1. Write new data to separate location
2. Validate schema compatibility
3. Merge data
4. Update views
"""
# Step 1: Validate schema compatibility
if not validate_schema_compatibility(new_data.schema, target_schema):
raise ValueError("Schema incompatible with target")
# Step 2: Write to staging area
staging_path = f"{table_path}_staging"
new_data.write \
.format("delta") \
.mode("overwrite") \
.save(staging_path)
# Step 3: Validate data quality
staging_df = spark.read.format("delta").load(staging_path)
if not validate_data_quality(staging_df):
raise ValueError("Data quality check failed")
# Step 4: Merge into main table
main_df = spark.read.format("delta").load(table_path)
# Use Delta Lake MERGE for atomic operation
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, table_path)
delta_table.alias("target").merge(
staging_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Step 5: Clean up staging
import shutil
shutil.rmtree(staging_path)
print("Schema migration completed successfully")
def validate_schema_compatibility(old_schema, new_schema):
"""Validate that new schema is compatible"""
old_fields = {f.name: f for f in old_schema.fields}
new_fields = {f.name: f for f in new_schema.fields}
# Check for removed required fields
for name, field in old_fields.items():
if not field.nullable and name not in new_fields:
return False
# Check for type compatibility
for name, field in new_fields.items():
if name in old_fields:
if not is_type_compatible(old_fields[name].dataType, field.dataType):
return False
return True
def validate_data_quality(df):
"""Validate data quality"""
# Add your quality checks here
return df.count() > 0
Schema Registry Integration
# ============================================================
# SCHEMA REGISTRY INTEGRATION
# ============================================================
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka import SerializingProducer
# Schema Registry client
schema_registry = SchemaRegistryClient({
'url': 'http://schema-registry:8081'
})
# Register schema
schema_str = '''
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "amount", "type": "double"}
]
}
'''
# Check compatibility
compatibility = schema_registry.test_compatibility('orders-value', schema_str)
print(f"Schema compatible: {compatibility}")
# Register new schema version
schema_id = schema_registry.register_schema('orders-value', schema_str)
print(f"Registered schema ID: {schema_id}")
# Get schema versions
versions = schema_registry.get_versions('orders-value')
print(f"Schema versions: {versions}")
π‘
Production Tip: Always use a Schema Registry for Avro data in Kafka. It ensures schema compatibility, provides version history, and enables schema evolution without breaking consumers.
Common Follow-Up Questions
Q1: How do you handle schema evolution in real-time pipelines?
# Real-time schema evolution with Kafka + Schema Registry
from confluent_kafka import DeserializingProducer
from confluent_kafka.schema_registry.avro import AvroDeserializer
# Consumer with schema evolution
consumer = AvroConsumer({
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081',
'group.id': 'real-time-processor',
})
# Messages are automatically deserialized with correct schema
# Old messages use old schema, new messages use new schema
Q2: What's the impact of schema evolution on downstream systems?
- Data warehouses: May need to ALTER TABLE
- BI tools: May need to refresh metadata
- ML models: May need retraining if features change
- APIs: May need versioning
Q3: How do you test schema evolution?
# Test schema evolution
def test_schema_evolution():
# Create initial data
df_v1 = spark.createDataFrame([(1, "John")], ["id", "name"])
df_v1.write.parquet("/tmp/test_schema/")
# Add column
df_v2 = spark.createDataFrame([(2, "Jane", 25)], ["id", "name", "age"])
df_v2.write.mode("append").parquet("/tmp/test_schema/")
# Read with mergeSchema
df_merged = spark.read.option("mergeSchema", "true").parquet("/tmp/test_schema/")
# Verify schema
assert "age" in df_merged.columns
assert df_merged.count() == 2
print("Schema evolution test passed!")
Q4: How do you rollback schema changes?
# Delta Lake: Time travel to previous schema
df_old_schema = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/delta-lake/orders/")
# Iceberg: Time travel
df_old_schema = spark.sql("""
SELECT * FROM catalog.db.orders
TIMESTAMP AS OF '2024-01-01 00:00:00'
""")
β οΈ
Critical Consideration: Schema evolution can break downstream systems. Always: (1) communicate changes, (2) provide migration guides, (3) support old schemas for a transition period, and (4) monitor for errors.
Company-Specific Tips
Databricks Interview Tips
- Discuss Delta Lake column mapping
- Explain Time Travel for schema rollback
- Mention Auto Loader for schema inference
- Talk about schema evolution in Structured Streaming
Netflix Interview Tips
- Focus on content metadata schema changes
- Discuss A/B testing schema evolution
- Mention multi-format support (Parquet, Avro)
- Talk about backward compatibility requirements
Uber Interview Tips
- Discuss ride data schema evolution
- Explain geospatial schema changes
- Mention real-time schema validation
- Talk about schema registry best practices
βΉοΈ
Final Takeaway: Schema evolution is essential for long-lived data systems. Choose the right table format (Iceberg, Delta Lake) for your use case, use Schema Registry for Kafka, and always validate compatibility before deploying changes.