16. Schema Evolution in PySpark
DfSchema Evolution
Schema evolution is the ability to modify a table's schema over time while maintaining backward and forward compatibility with existing data. It supports adding, removing, renaming, and modifying columns without rewriting historical data.
DfSchema Compatibility
Schema compatibility defines whether a new schema can read data written with an old schema. Types: backward-compatible (new reads old), forward-compatible (old reads new), full-compatible (both directions).
Schema Evolution Cost
Here,
- =Total cost of schema evolution operation
- =Number of Parquet files affected
- =Cost of reading existing files with old schema
- =Cost of applying schema transformation
- =Cost of rewriting files with new schema
Delta Lake supports schema evolution via mergeSchema option. When enabled, new columns are added with null values for existing rows. Column renames and type widening are also supported.
Use overwriteSchema instead of mergeSchema when you want to completely replace the schema. mergeSchema is additive only β it cannot remove or rename columns without rewriting data.
ThParquet Schema Resolution
Theorem: Parquet stores schema as metadata in each file. When reading with a different schema, Parquet resolves by column name β missing columns return null, extra columns are ignored, and type mismatches cause errors unless compatible conversions are defined.
- Schema evolution modifies table schema over time without rewriting historical data
- Compatibility: backward (new reads old), forward (old reads new), full (both)
- Delta Lake:
mergeSchema(additive),overwriteSchema(replace) - Parquet: column-name-based resolution; each file stores its own schema
ποΈ Schema Evolution Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHEMA EVOLUTION ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β VERSION 1 (Initial Schema) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Schema V1 β β β
β β β ββββββββ¬βββββββ¬βββββββ¬βββββββ β β β
β β β β id β name β emailβ age β β β β
β β β β INT β STR β STR β INT β β β β
β β β ββββββββ΄βββββββ΄βββββββ΄βββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β VERSION 2 (Schema Change) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Schema V2 (Added column) β β β
β β β ββββββββ¬βββββββ¬βββββββ¬βββββββ¬βββββββ β β β
β β β β id β name β emailβ age βsalaryβ β β β
β β β β INT β STR β STR β INT βDOUBLEβ β β β
β β β ββββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ β β β
β β β β β β
β β β Backward Compatible: V1 readers can read V2 data β β β
β β β Forward Compatible: V2 readers can read V1 data β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β VERSION 3 (Breaking Change) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Schema V3 (Renamed column) β β β
β β β ββββββββ¬βββββββ¬ββββββββββ¬βββββββ¬βββββββ β β β
β β β β id β name βemail_id β age βsalaryβ β β β
β β β β INT β STR β STR β INT βDOUBLEβ β β β
β β β ββββββββ΄βββββββ΄ββββββββββ΄βββββββ΄βββββββ β β β
β β β β β β
β β β Breaking Change: V1 readers cannot read V3 data β β β
β β β Requires migration or compatibility layer β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β COMPATIBILITY MATRIX β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Change Type β Backward β Forward β Impact β β β
β β βββββββββββββββββββββΌβββββββββββΌββββββββββΌβββββββββββββββββ β β
β β β Add column β Yes β Yes β Low β β β
β β β Remove column β Yes β No β Medium β β β
β β β Rename column β No β No β High β β β
β β β Change type β Depends β Depends β High β β β
β β β Add nested field β Yes β Yes β Low β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Schema Compatibility Modes
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHEMA COMPATIBILITY MODES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β BACKWARD COMPATIBILITY β β
β β β β
β β Schema V1 (Old) ββββββββββββββββββββββββββββββββββββββββββΆ β β
β β ββββββββ¬βββββββ¬βββββββ¬βββββββ β β
β β β id β name β emailβ age β β β
β β ββββββββ΄βββββββ΄βββββββ΄βββββββ β β
β β β β
β β Schema V2 (New) ββββββββββββββββββββββββββββββββββββββββββΆ β β
β β ββββββββ¬βββββββ¬βββββββ¬βββββββ¬βββββββ β β
β β β id β name β emailβ age βsalaryβ β β
β β ββββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ β β
β β β β
β β V1 readers can read V2 data (new column has default value) β β
β β V2 readers can read V1 data (missing column uses null) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FORWARD COMPATIBILITY β β
β β β β
β β Schema V1 (New) ββββββββββββββββββββββββββββββββββββββββββΆ β β
β β ββββββββ¬βββββββ¬βββββββ β β
β β β id β name β emailβ β β
β β ββββββββ΄βββββββ΄βββββββ β β
β β β β
β β Schema V2 (Old) ββββββββββββββββββββββββββββββββββββββββββΆ β β
β β ββββββββ¬βββββββ¬βββββββ¬βββββββ¬βββββββ β β
β β β id β name β emailβ age βsalaryβ β β
β β ββββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ β β
β β β β
β β V2 readers can read V1 data (ignores extra columns) β β
β β V1 readers cannot read V2 data (unknown columns) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FULL COMPATIBILITY β β
β β β β
β β Both backward AND forward compatible β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Add nullable columns (with defaults) β β β
β β β β’ Remove columns with defaults β β β
β β β β’ Never rename columns β β β
β β β β’ Never change column types (except widening) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Ideal for: Multi-version readers, data lakes β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β NONE COMPATIBILITY β β
β β β β
β β No compatibility guarantees β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Any schema change allowed β β β
β β β β’ Readers must match exact schema β β β
β β β β’ Use only when all readers are updated simultaneously β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Risk: Readers may fail if schema doesn't match β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Schema Evolution Strategies
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SCHEMA EVOLUTION STRATEGIES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STRATEGY 1: ADDITIVE EVOLUTION β β
β β β β
β β Only add new columns (never remove or rename) β β
β β β β
β β V1: [id, name, email] β β
β β V2: [id, name, email, salary] β Added salary β β
β β V3: [id, name, email, salary, department] β Added department β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Pros: β β β
β β β β’ Simple to implement β β β
β β β β’ Backward compatible by default β β β
β β β β’ No data migration needed β β β
β β β β β β
β β β Cons: β β β
β β β β’ Schema grows indefinitely β β β
β β β β’ May contain unused columns β β β
β β β β’ Naming conflicts possible β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STRATEGY 2: VERSIONED SCHEMA β β
β β β β
β β Maintain multiple schema versions with mapping β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Schema Registry β β β
β β β ββββββββ¬ββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β V1 β {id: int, name: string, email: string} β β β β
β β β β V2 β {id: int, name: string, email: string, β β β β
β β β β β salary: double} β β β β
β β β β V3 β {id: int, name: string, email: string, β β β β
β β β β β salary: double, dept: string} β β β β
β β β ββββββββ΄ββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Mapping: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β V1 β V2: Add salary with default 0.0 β β β
β β β V2 β V3: Add dept with default "Unknown" β β β
β β β V3 β V2: Drop dept column β β β
β β β V2 β V1: Drop salary column β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β STRATEGY 3: SCHEMA-on-READ β β
β β β β
β β Apply schema at read time, not write time β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Raw Data (schema-free) β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β {"id": 1, "name": "Alice", "extra": "data"} β β β β
β β β β {"id": 2, "name": "Bob", "salary": 50000} β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Reader 1 (Schema V1): [id, name] β β β
β β β Reader 2 (Schema V2): [id, name, salary] β β β
β β β Reader 3 (Schema V3): [id, name, email, age] β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Best for: Data lakes, schema-flexible systems β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Schema evolution is a critical concept in data management that enables systems to handle changes in data structure over time without breaking existing applications or losing data. In PySpark, schema evolution is particularly important for data lakes where data is stored for long periods and accessed by multiple consumers with different schema requirements.
The fundamental challenge of schema evolution is maintaining compatibility between different versions of data. Backward compatibility ensures that older readers can read newer data, while forward compatibility ensures that newer readers can read older data. Understanding these compatibility modes is essential for designing robust data systems.
Additive evolution is the simplest and most common strategy. By only adding new columns and never removing or renaming existing ones, systems maintain backward compatibility automatically. New columns should have default values to handle cases where older readers encounter newer data. This strategy works well for many use cases but can lead to schema bloat over time.
Versioned schema management provides more flexibility by maintaining explicit mappings between schema versions. This approach allows for more complex changes like column renames and type changes, but requires a schema registry or similar mechanism to manage the mappings. It's ideal for systems that need to support multiple schema versions simultaneously.
Schema-on-read applies the schema at read time rather than write time. This approach stores data in a schema-free format (like JSON or Avro) and allows readers to apply their own schema when reading. It provides maximum flexibility but can impact performance and data quality since validation happens at read time.
The choice of format significantly impacts schema evolution capabilities. Parquet and Avro both support schema evolution but have different characteristics. Parquet is columnar and optimized for analytical queries, while Avro is row-based and better for schema evolution. Both support embedding schema in the data file, which helps with self-describing data.
Type evolution is a complex aspect of schema evolution. Widening changes (like int to long) are generally safe and backward compatible. Narrowing changes (like long to int) can cause data loss and are not backward compatible. Type changes that change semantics (like string to date) require careful handling and often need explicit mapping logic.
Best practices for schema evolution include: using additive evolution when possible, maintaining a schema registry for complex changes, testing compatibility thoroughly, documenting schema changes, and implementing schema validation at both write and read times. It's also important to monitor schema usage and clean up unused columns periodically.
Advanced techniques include schema inference (automatically detecting schema from data), schema classification (categorizing schema changes by impact), and schema evolution automation (automatically applying compatible changes). These techniques help manage schema evolution at scale.
π Key Concepts Table
| Concept | Description | Use Case |
|---|---|---|
| Backward Compatibility | Old readers can read new data | Multi-version support |
| Forward Compatibility | New readers can read old data | Legacy system support |
| Full Compatibility | Both backward and forward | Data lakes, shared datasets |
| Schema Registry | Centralized schema management | Complex schema evolution |
| Schema-on-Read | Apply schema at read time | Flexible data storage |
| Type Evolution | Changing column data types | Data model changes |
π» Code Examples
Basic Schema Evolution with Parquet
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("SchemaEvolution") \
.config("spark.sql.parquet.mergeSchema", "true") \
.getOrCreate()
# Write initial data (Schema V1)
df_v1 = spark.createDataFrame([
(1, "Alice", "alice@example.com", 25),
(2, "Bob", "bob@example.com", 30)
], ["id", "name", "email", "age"])
df_v1.write.mode("overwrite").parquet("/path/to/data")
# Read data (should work with Schema V1)
df_read_v1 = spark.read.parquet("/path/to/data")
df_read_v1.show()
# Write new data with additional column (Schema V2)
df_v2 = spark.createDataFrame([
(3, "Charlie", "charlie@example.com", 35, 75000.0),
(4, "Diana", "diana@example.com", 28, 85000.0)
], ["id", "name", "email", "age", "salary"])
# Append with schema evolution
df_v2.write.mode("append").option("mergeSchema", "true").parquet("/path/to/data")
# Read all data (should include both schemas)
df_read_all = spark.read.parquet("/path/to/data")
df_read_all.show()
# Verify schema evolution
print("Schema after evolution:")
df_read_all.printSchema()
Schema Evolution with Delta Lake
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("DeltaSchemaEvolution") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Write initial data (Schema V1)
df_v1 = spark.createDataFrame([
(1, "Alice", "alice@example.com", 25),
(2, "Bob", "bob@example.com", 30)
], ["id", "name", "email", "age"])
df_v1.write.format("delta").mode("overwrite").save("/path/to/delta/data")
# Read data
spark.read.format("delta").load("/path/to/delta/data").show()
# Write new data with additional column (Schema V2)
df_v2 = spark.createDataFrame([
(3, "Charlie", "charlie@example.com", 35, 75000.0),
(4, "Diana", "diana@example.com", 28, 85000.0)
], ["id", "name", "email", "age", "salary"])
# Append with schema evolution
df_v2.write.format("delta").mode("append").option("mergeSchema", "true").save("/path/to/delta/data")
# Read all data
spark.read.format("delta").load("/path/to/delta/data").show()
# Verify schema evolution
print("Delta schema after evolution:")
spark.read.format("delta").load("/path/to/delta/data").printSchema()
Schema Evolution with Avro
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("AvroSchemaEvolution") \
.config("spark.sql.avro.compression.codec", "snappy") \
.getOrCreate()
# Define Schema V1
schema_v1 = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
# Write initial data with Schema V1
df_v1 = spark.createDataFrame([
(1, "Alice", "alice@example.com", 25),
(2, "Bob", "bob@example.com", 30)
], ["id", "name", "email", "age"])
df_v1.write.mode("overwrite").format("avro").option("avroSchema", schema_v1).save("/path/to/avro/data")
# Define Schema V2 (added salary column)
schema_v2 = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": "int"},
{"name": "salary", "type": ["null", "double"], "default": null}
]
}
"""
# Write new data with Schema V2
df_v2 = spark.createDataFrame([
(3, "Charlie", "charlie@example.com", 35, 75000.0),
(4, "Diana", "diana@example.com", 28, 85000.0)
], ["id", "name", "email", "age", "salary"])
df_v2.write.mode("append").format("avro").option("avroSchema", schema_v2).save("/path/to/avro/data")
# Read all data with Schema V2
spark.read.format("avro").load("/path/to/avro/data").show()
# Verify schema evolution
print("Avro schema after evolution:")
spark.read.format("avro").load("/path/to/avro/data").printSchema()
Schema Evolution with Type Changes
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("TypeEvolution") \
.config("spark.sql.parquet.mergeSchema", "true") \
.getOrCreate()
# Write initial data with IntegerType
df_v1 = spark.createDataFrame([
(1, "Alice", 25),
(2, "Bob", 30)
], ["id", "name", "age"])
df_v1.write.mode("overwrite").parquet("/path/to/type_evolution/data")
# Read and verify initial schema
print("Initial schema:")
spark.read.parquet("/path/to/type_evolution/data").printSchema()
# Write new data with LongType (widening change)
df_v2 = spark.createDataFrame([
(3, "Charlie", 35),
(4, "Diana", 28)
], ["id", "name", "age"])
# Cast age to LongType for widening
df_v2 = df_v2.withColumn("age", col("age").cast(LongType()))
# Append with schema evolution
df_v2.write.mode("append").option("mergeSchema", "true").parquet("/path/to/type_evolution/data")
# Read all data
print("Schema after type evolution:")
spark.read.parquet("/path/to/type_evolution/data").printSchema()
# Show data
spark.read.parquet("/path/to/type_evolution/data").show()
π Performance Metrics
| Metric | Impact | Mitigation |
|---|---|---|
| Schema Merge Time | Increases with schema complexity | Use schema registry, limit changes |
| File Size | May increase with schema evolution | Regular compaction, optimize file sizes |
| Read Performance | May degrade with complex schemas | Use columnar formats, projection pushdown |
| Write Performance | Minimal impact for additive changes | Batch writes, use appropriate formats |
| Storage Overhead | Increases with schema versions | Clean up old versions, use compression |
π Best Practices
- Use additive evolution when possible - Only add columns, never remove or rename
- Maintain a schema registry - Track schema versions and compatibility
- Use appropriate formats - Parquet for analytics, Avro for schema evolution
- Test compatibility thoroughly - Validate backward and forward compatibility
- Document schema changes - Maintain clear documentation of evolution
- Implement schema validation - Validate data against schema at write time
- Monitor schema usage - Track which columns are actually used
- Clean up unused columns - Periodically remove columns that are no longer needed
- Use default values - Provide defaults for new columns to maintain compatibility
- Plan for type evolution - Consider future type changes when designing schemas
π Related Topics
- 15-data-quality.mdx: Schema validation and data quality
- 14-merge-upsert.mdx: Schema evolution during merge operations
- 20-monitoring-metrics.mdx: Monitoring schema changes
- 17-cluster-management.mdx: Cluster configuration for schema operations
See Also
- Kafka Streams (kafka/03): Schema evolution in Kafka message formats
- Data Engineering Streaming (data-engineering/022): Schema management in streaming data lakes