π§ Apache Iceberg Integration in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ICEBERG TABLE ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Catalog ββββββΆβ Metadata ββββββΆβ Snapshot β β
β β (Hive/HDFS) β β Layer β β Registry β β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Manifest Files β β Snapshot Log β β
β β (Parquet) β β (Append-only) β β
β ββββββββββ¬ββββββββββ ββββββββββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββ β
β β Data Files β β
β β (Parquet/ORC) β β
β ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TIME TRAVEL QUERY FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β SQL Query: SELECT * FROM table TIMESTAMP AS OF '2024-01-15' β
β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββ β
β β Query βββββΆβ Snapshot βββββΆβ Manifest βββββΆβ Data β β
β β Parser β β Lookup β β List β β Files β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββ β
β β Resolve β β Find β β Filter by β β Read β β
β β Snapshot β β Nearest β β Partition β β Parquet β β
β β ID/Time β β Snapshot β β Specs β β Chunks β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββ ββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SNAPSHOT EVOLUTION & CLEANUP β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Time βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β
β β
β Snapshot 1 Snapshot 2 Snapshot 3 Snapshot 4 β
β βββββββ βββββββ βββββββ βββββββ β
β β S1 ββββββββΆβ S2 ββββββββΆβ S3 ββββββββΆβ S4 β β
β βββββββ βββββββ βββββββ βββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β βββββββ βββββββ βββββββ βββββββ β
β βManifest βManifest βManifest βManifest β
β β A,B,C β D,E,F β G,H,I β J,K,L β
β βββββββ βββββββ βββββββ βββββββ β
β β
β retention.policy: snapshots older than S2 are candidates for cleanup β
β expire_snapshots() removes unreferenced snapshots and data files β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Apache Iceberg is an open table format designed for huge analytic datasets. It brings the reliability and simplicity of SQL tables to big data while making it possible for engines like Spark, Trino, Flink, and Hive to safely work with the same tables concurrently. Iceberg was developed at Netflix to solve the fundamental limitations of Hive tables, particularly around schema evolution, partition evolution, and hidden partitioning.
The core innovation of Iceberg lies in its metadata layer, which provides a complete snapshot of the table at any point in time. Each commit creates a new snapshot, which is an immutable record of the table's state. The metadata tree consists of three layers: the catalog (which stores the pointer to the current snapshot), the metadata file (which contains table-level settings and a list of manifest files), and manifest files (which list data files and their partition values). This layered approach enables efficient query planning without scanning the entire table.
Time travel in Iceberg is implemented through snapshot IDs or timestamps. When you query SELECT * FROM table TIMESTAMP AS OF '2024-01-15', the engine resolves the snapshot closest to that timestamp, then reads only the data files referenced by that snapshot. This is not merely reading a copy of old dataβit is reading the exact bytes that constituted the table at that moment, including deleted rows that are simply marked as inactive in newer snapshots.
Partition evolution is another revolutionary feature. Unlike Hive where changing partitions requires rewriting the entire table, Iceberg allows you to add new partition specs without affecting existing data. Old data remains in its original partition spec, and new data uses the new spec. The engine automatically handles both specs during query execution, applying partition pruning across all specs.
The hidden partitioning mechanism means users never need to specify partition columns in their queries. Iceberg automatically translates user-provided predicates into partition filters based on the table's partition spec. This eliminates the common anti-pattern of users filtering on partition columns that don't align with the actual partitioning, which is a significant source of performance issues in Hive-based systems.
Mathematical Foundations
Definition: Iceberg Snapshot
An Iceberg snapshot is an immutable, point-in-time representation of table state, defined as a tuple where is the set of manifest files, is the set of data files referenced by those manifests, and is the commit timestamp. Snapshots form a directed acyclic graph (DAG) where each node points to its parent snapshot.
Time Travel Resolution
Given a query timestamp , the resolved snapshot is:
This finds the latest snapshot committed no later than the requested time.
Snapshot Isolation Theorem
Under Iceberg's snapshot isolation model, any read operation sees a consistent view of the table as of snapshot , regardless of concurrent write operations. This is guaranteed because:
Snapshot Retention Cost
Storage cost for retained snapshots with average data file size :
where is the number of manifest files and is average manifest size.
Partition Pruning Effectiveness
With partition pruning, scanned data reduces from total table size to:
where is the number of partition dimensions.
Key Insight
Iceberg's hidden partitioning eliminates the user-error of filtering on non-partitioned columns. The engine automatically translates predicates into partition filters using the partition spec's transform functions.
Summary
Iceberg snapshots enable O(1) time travel resolution, snapshot isolation eliminates read-write conflicts, and partition pruning achieves multiplicative data reduction across partition dimensions. These properties make Iceberg suitable for concurrent analytical workloads at petabyte scale.
Key Concepts Table
| Concept | Description | Performance Impact |
|---|---|---|
| Snapshot | Immutable point-in-time view of the table | Enables time travel with zero copy overhead |
| Manifest File | Lists data files with partition values and column stats | Enables partition pruning and file-level filtering |
| Partition Spec | Defines how data is partitioned (can evolve over time) | Supports partition evolution without rewrite |
| Catalog | Stores current snapshot pointer and table metadata | Atomic commits via catalog-level locking |
| Snapshot ID | Unique identifier for each table commit | Enables precise time travel by snapshot ID |
| Sequence Number | Monotonically increasing ID per snapshot | Resolves ordering in concurrent writes |
| Equality Delete | Marks rows as deleted by column values | Soft delete without rewriting data files |
| Position Delete | Marks rows as deleted by file position | Enables row-level updates in merge operations |
Code Examples
Setting Up Iceberg with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark with Iceberg support
spark = SparkSession.builder \
.appName("IcebergIntegration") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.catalog.iceberg.warehouse", "hdfs://cluster/iceberg-warehouse") \
.getOrCreate()
# Create an Iceberg table with partitioning
spark.sql("""
CREATE TABLE iceberg.sales (
transaction_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
price DECIMAL(10, 2),
transaction_date DATE,
region STRING
)
USING iceberg
PARTITIONED BY (days(transaction_date), region)
TBLPROPERTIES (
'format-version' = '2',
'write.parquet.compression-codec' = 'zstd',
'write.distribution-mode' = 'hash'
)
""")
# Insert sample data
sales_data = [
("TXN001", "CUST001", "PROD001", 5, 29.99, "2024-01-15", "North"),
("TXN002", "CUST002", "PROD002", 3, 49.99, "2024-01-16", "South"),
("TXN003", "CUST001", "PROD003", 10, 19.99, "2024-01-17", "East"),
("TXN004", "CUST003", "PROD001", 2, 29.99, "2024-01-18", "West"),
("TXN005", "CUST004", "PROD004", 7, 39.99, "2024-01-19", "North"),
]
df = spark.createDataFrame(sales_data, [
"transaction_id", "customer_id", "product_id",
"quantity", "price", "transaction_date", "region"
])
df.writeTo("iceberg.sales").append()
# Second batch - creates new snapshot
sales_data_2 = [
("TXN006", "CUST005", "PROD001", 4, 29.99, "2024-01-20", "South"),
("TXN007", "CUST006", "PROD002", 1, 49.99, "2024-01-21", "East"),
]
df2 = spark.createDataFrame(sales_data_2, [
"transaction_id", "customer_id", "product_id",
"quantity", "price", "transaction_date", "region"
])
df2.writeTo("iceberg.sales").append()
Time Travel Queries
# Query by timestamp
df_current = spark.sql("""
SELECT * FROM iceberg.sales
WHERE transaction_date >= '2024-01-18'
""")
df_historical = spark.sql("""
SELECT * FROM iceberg.sales
TIMESTAMP AS OF '2024-01-16 12:00:00'
""")
# Query by snapshot ID
snapshots = spark.sql("SELECT * FROM iceberg.sales.history").collect()
first_snapshot_id = snapshots[0]['snapshot_id']
df_at_snapshot = spark.sql(f"""
SELECT * FROM iceberg.sales VERSION AS OF {first_snapshot_id}
""")
# Compare data between snapshots
df_diff = spark.sql("""
SELECT t1.transaction_id, t1.quantity as old_qty, t2.quantity as new_qty
FROM (
SELECT * FROM iceberg.sales VERSION AS OF {first_snapshot_id}
) t1
INNER JOIN iceberg.sales t2
ON t1.transaction_id = t2.transaction_id
WHERE t1.quantity != t2.quantity
""")
Schema Evolution and Partition Evolution
# Add a new column (schema evolution)
spark.sql("""
ALTER TABLE iceberg.sales ADD COLUMNS (
discount DECIMAL(5, 2) AFTER price,
payment_method STRING AFTER region
)
""")
# Drop a column
spark.sql("ALTER TABLE iceberg.sales DROP COLUMN payment_method")
# Rename a column
spark.sql("ALTER TABLE iceberg.sales RENAME COLUMN discount TO discount_pct")
# Partition evolution - change partitioning without rewriting data
spark.sql("""
ALTER TABLE iceberg.sales DROP PARTITION FIELD days(transaction_date)
""")
spark.sql("""
ALTER TABLE iceberg.sales ADD PARTITION FIELD bucket(16, customer_id)
""")
# New data will use new partition spec
new_data = [
("TXN008", "CUST007", "PROD005", 6, 59.99, None, "2024-01-22", "North"),
]
df_new = spark.createDataFrame(new_data, [
"transaction_id", "customer_id", "product_id",
"quantity", "price", "transaction_date", "region"
])
df_new.writeTo("iceberg.sales").append()
Snapshot Management and Cleanup
# View all snapshots
spark.sql("SELECT * FROM iceberg.sales.snapshots").show(truncate=False)
# View snapshot history with operation details
spark.sql("""
SELECT
committed_at,
snapshot_id,
operation,
summary
FROM iceberg.sales.snapshots
ORDER BY committed_at
""").show(truncate=False)
# Expire old snapshots (keep last 3)
spark.sql("""
CALL iceberg.system.expire_snapshots(
table => 'iceberg.sales',
retain_last => 3
)
""")
# Manually compact small files
spark.sql("""
CALL iceberg.system.rewrite_data_files(
table => 'iceberg.sales',
options => map(
'min-input-files', '5',
'max-concurrent-file-group-rewrites', '9',
'rewrite-all', 'true'
)
)
""")
# Rewrite manifests for better pruning
spark.sql("""
CALL iceberg.system.rewrite_manifests(
table => 'iceberg.sales'
)
""")
Performance Metrics
| Metric | Hive Tables | Iceberg Tables | Improvement |
|---|---|---|---|
| Time Travel Query Latency | Full table scan required | Snapshot pointer + manifest scan | 10-50x faster |
| Schema Evolution | Requires table rebuild | Metadata-only operation | Instant (milliseconds) |
| Partition Evolution | Requires INSERT OVERWRITE | Metadata-only operation | No data rewrite needed |
| Concurrent Write Conflicts | High (partition-level locking) | Low (snapshot-level conflicts) | 5-10x fewer conflicts |
| Small File Compaction | Manual process (INSERT OVERWRITE) | Built-in rewrite_data_files | Automated compaction |
| Data Skipping (Partition Pruning) | Column-level only | Partition + column statistics | 2-3x better pruning |
| Metadata Overhead | Minimal | ~1KB per manifest file | Negligible for most workloads |
| Storage Efficiency | No built-in dedup | Equality/position deletes | Better update semantics |
Best Practices
- Always use Z-ordered columns for frequently filtered columns beyond partition keys to enable data skipping via manifest file column statistics
- Enable format-version=2 to unlock equality deletes and row-level updates required for MERGE operations
- Schedule regular compaction using
rewrite_data_filesto merge small files and improve read performance - Set
write.distribution-mode=hashfor write-heavy tables to ensure even data distribution across partitions - Use
rewrite_manifestsperiodically to consolidate small manifests and improve pruning efficiency - Never hardcode snapshot IDs in production queriesβuse timestamp-based time travel for reproducibility
- Configure
snapshot.target.max-file-size-bytesto control when snapshots trigger file splitting - Leverage partition evolution when query patterns change, rather than rewriting the entire table
- Monitor snapshot count and set
snapshot.retention.max-snapshot-ageto prevent unbounded growth - Use
call metastore.partition()to inspect partition metadata and validate partition pruning is working correctly - Implement a data retention policy with automated expire_snapshots calls to balance storage costs and time travel requirements
- Always validate schema before writes using
spark.sql("DESCRIBE EXTENDED table")to catch column type mismatches early
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)