π Apache Hudi Operations in PySpark
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HUDI TABLE ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Hudi ββββββΆβ Metadata ββββββΆβ Timeline β β
β β Client β β Table β β Service β β
β ββββββββββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β .commit Files β β .inflight Files β β
β β (JSON metadata) β β (Pending ops) β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β .deltacommit β β .compaction β β
β β (MOR logs) β β (Merge logs) β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β Parquet Files β β Log Files β β
β β (Base data) β β (.log format) β β
β ββββββββββββββββββββ ββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β COW vs MOR TABLE COMPARISON β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β COW (Copy on Write) MOR (Merge on Read) β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β Write Operation β β Write Operation β β
β β β β β β
β β βββββ βββββ βββββ β β βββββ βββββ βββββ β β
β β β A β β B β β C β β β β A β β B β β C β β β
β β βββββ βββββ βββββ β β βββββ βββββ βββββ β β
β β β β β β β β β β
β β βΌ βΌ β β βΌ βΌ β β
β β ββββββββββββββββ β β ββββββββββββββββ β β
β β β Rewrite ALL β β β β Append Log β β β
β β β Parquet Filesβ β β β (.log file) β β β
β β ββββββββββββββββ β β ββββββββββββββββ β β
β β β β β β β β
β β βΌ β β βΌ β β
β β ββββββββββββββββ β β ββββββββββββββββ β β
β β β New Parquet β β β β Base + Log β β β
β β β (clean) β β β β (merged at β β β
β β ββββββββββββββββ β β β read time) β β β
β β β β ββββββββββββββββ β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β
β Write: Slow (full rewrite) Write: Fast (append only) β
β Read: Fast (no merge) Read: Slow (merge at read) β
β Storage: Higher (full copy) Storage: Lower (delta log) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β INCREMENTAL PROCESSING PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Batch 1 Batch 2 Batch 3 β
β (t=0) (t=1) (t=2) β
β βββββββ βββββββ βββββββ β
β β Ξt0 βββββββββββββΆβ Ξt1 βββββββββββββΆβ Ξt2 β β
β ββββ¬βββ ββββ¬βββ ββββ¬βββ β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β HUDI TIMELINE β β
β β ββββββββ¬βββββββ¬βββββββ¬βββββββ¬βββββββ¬βββββββ¬βββββββ β β
β β βCommitβCommitβCommitβCommitβCommitβCommitβCommitβ β β
β β β 0 β 1 β 2 β 3 β 4 β 5 β 6 β β β
β β ββββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ΄βββββββ β β
β β β β β β β
β β βΌ βΌ βΌ β β
β β Read from Commit 2: get only changes since t=2 β β
β β (Incremental query with begin and end commit time) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β INCREMENTAL QUERY FLOW β β
β β β β
β β Input: (beginInstantTime, endInstantTime) β β
β β β β
β β Timeline Filter βββΆ File Filter βββΆ Record Filter β β
β β β β β β β
β β βΌ βΌ βΌ β β
β β Select commits Select affected Apply remaining β β
β β in range file slices predicates β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Apache Hudi (Hadoop Upserts Deletes and Incremental processing) is an open-source data lake platform that provides record-level upserts, incremental processing, and table services. Unlike traditional data lake formats that only support append operations, Hudi enables efficient update and delete operations at the record level, making it ideal for use cases requiring near-real-time data ingestion with strict latency requirements.
The fundamental innovation in Hudi is its file layout management system. Every Hudi dataset consists of base files (Parquet format) and log files (for MOR tables). The metadata layer tracks which records exist in which files, enabling the system to surgically update only the affected files rather than rewriting entire partitions. This is achieved through a combination of record-level indexes, bloom filters, and file-level statistics.
The Copy-on-Write (COW) table type performs data updates by rewriting the entire base file containing the affected records. When an update arrives, Hudi identifies the base file containing the record, reads the entire file, applies the update, and writes a new version of the file. This approach provides excellent read performance since there is no merge overhead, but write operations are expensive due to the full file rewrite. COW is ideal for read-heavy workloads where write latency is less critical.
The Merge-on-Read (MOR) table type takes a fundamentally different approach. Instead of rewriting base files, updates are appended to log files. When a read operation occurs, Hudi merges the base files with the log files on-the-fly to produce the current view. This approach dramatically reduces write latency since only the log file is updated, but read performance suffers due to the merge overhead. MOR is ideal for write-heavy workloads with near-real-time ingestion requirements.
Hudi's timeline is the central coordination mechanism that tracks all operations performed on the dataset. Each commit, compaction, or cleaning operation creates an entry in the timeline with a timestamp. This enables incremental processingβdownstream consumers can query the timeline to identify exactly which files changed since their last read, and process only those changes. This is fundamentally different from full-refresh patterns where the entire dataset is reprocessed on each batch.
The compaction service is critical for MOR tables. It periodically merges log files into base files to maintain read performance. The compaction strategy can be configured to run inline (during writes), asynchronously, or on-demand. The compaction selector determines which file groups to compact based on factors like log file count, log file size, and time since last compaction. This balancing act between write performance and read performance is a key tuning parameter for Hudi deployments.
Mathematical Foundations
Definition: Hudi Timeline
The Hudi timeline is a chronologically ordered set of actions where each action has an operation type , state , and timestamp .
Copy-on-Write Merge
For a COW table, an upsert of record set into file group produces:
The entire file is rewritten, achieving I/O.
MOR Read Consistency Theorem
A MOR read at time merges the base file with log file where:
This guarantees eventual consistency: as compaction merges into , read latency decreases monotonically.
Compaction Write Amplification
For MOR compaction merging base file with log files :
Target: to keep compaction overhead bounded.
File Grouping Efficiency
Optimal file sizing minimizes the total number of file groups:
Key Insight
Hudi's file group abstraction enables record-level versioning without full file rewrites. This makes MOR ideal for write-heavy workloads where COW's rewrite cost becomes prohibitive.
Summary
Hudi's timeline provides chronological ordering, COW achieves read-optimized merges at write cost, MOR optimizes writes with deferred compaction. The write amplification metric guides compaction scheduling to balance read/write performance.
Key Concepts Table
| Concept | Description | When to Use |
|---|---|---|
| COW Table | Copy-on-Write: rewrites files on update | Read-heavy, batch workloads |
| MOR Table | Merge-on-Read: appends logs on update | Write-heavy, near-real-time ingestion |
| Timeline | Chronological list of all commits | Incremental processing, audit trail |
| File Group | Set of base files + associated log files | Parallel processing unit |
| Record Index | Bloom filter index on record keys | Efficient record lookup for updates |
| Bucket Index | Hash-based file assignment | High-volume upsert workloads |
| Global Index | Index across all partitions | Global uniqueness enforcement |
| Compaction | Merges log files into base files | MOR read performance optimization |
| Clustering | Groups small files into larger ones | File size optimization |
| Cleaner | Removes old file versions | Storage reclamation |
Code Examples
Setting Up Hudi with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("HudiOperations") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Define Hudi configuration
hudi_options = {
'hoodie.table.name': 'customers',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field': 'customer_id',
'hoodie.datasource.write.precombine.field': 'update_timestamp',
'hoodie.datasource.write.partitionpath.field': 'region',
'hoodie.table.shuffle.parallelism': '200',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.shuffle.enable': 'true',
'hoodie.upsert.shuffle.parallelism': '200',
'hoodie.insert.shuffle.parallelism': '200',
}
# Create initial dataset
customers_data = [
(1, "Alice", "North", "2024-01-15 10:00:00", "alice@email.com"),
(2, "Bob", "South", "2024-01-15 10:00:00", "bob@email.com"),
(3, "Charlie", "East", "2024-01-15 10:00:00", "charlie@email.com"),
(4, "Diana", "West", "2024-01-15 10:00:00", "diana@email.com"),
]
df = spark.createDataFrame(customers_data, [
"customer_id", "name", "region", "update_timestamp", "email"
])
df.write.format("hudi") \
.options(**hudi_options) \
.mode("overwrite") \
.save("/hudi/customers")
# Upsert - update existing records
updates_data = [
(1, "Alice Smith", "North", "2024-01-16 10:00:00", "alice.smith@email.com"),
(5, "Eve", "South", "2024-01-16 10:00:00", "eve@email.com"),
]
updates_df = spark.createDataFrame(updates_data, [
"customer_id", "name", "region", "update_timestamp", "email"
])
updates_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save("/hudi/customers")
MOR Table with Incremental Queries
# Create MOR table for high-frequency updates
mor_options = {
'hoodie.table.name': 'transactions',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'txn_id',
'hoodie.datasource.write.precombine.field': 'event_time',
'hoodie.datasource.write.partitionpath.field': 'date',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.compaction.inline': 'false',
'hoodie.logfile.max.size': '1073741824',
'hoodie.logfile.to.parquet.compression.ratio': '0.5',
'hoodie.parquet.max.file.size': '134217728',
}
# Write transactions
txn_data = [
("TXN001", "CUST001", "2024-01-15 08:00:00", 100.00, "2024-01-15"),
("TXN002", "CUST002", "2024-01-15 08:30:00", 250.00, "2024-01-15"),
("TXN003", "CUST001", "2024-01-15 09:00:00", 75.00, "2024-01-15"),
]
txn_df = spark.createDataFrame(txn_data, [
"txn_id", "cust_id", "event_time", "amount", "date"
])
txn_df.write.format("hudi") \
.options(**mor_options) \
.mode("overwrite") \
.save("/hudi/transactions")
# Incremental read from last commit
incremental_df = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "2024-01-15T08:00:00") \
.option("hoodie.datasource.read.end.instanttime", "2024-01-15T09:00:00") \
.load("/hudi/transactions")
incremental_df.show(truncate=False)
Compaction and Clustering
# Schedule compaction for MOR table
spark.sql("""
CALL hudi_compact(
table => 'hudi.transactions',
params => map(
'hoodie.compaction.target.io', '500000000',
'hoodie.compaction.target.filesize', '134217728'
)
)
""")
# Async compaction
spark.sql("""
CALL hudi_compact_async(
table => 'hudi.transactions',
params => map(
'hoodie.compaction.target.io', '500000000'
)
)
""")
# Clustering for COW tables
spark.sql("""
CALL hudi_clustering(
table => 'hudi.customers',
params => map(
'hoodie.clustering.target.filemax', '10',
'hoodie.clustering.sort.columns', 'customer_id',
'hoodie.clustering.max.bytes.per.file', '134217728'
)
)
""")
# Show timeline
spark.sql("SHOW COMMITS hudi.transactions").show(truncate=False)
# Show compaction plan
spark.sql("SHOW COMPACTION hudi.transactions").show(truncate=False)
Schema Evolution and Partition Evolution
# Schema evolution
spark.sql("""
ALTER TABLE hudi.customers ADD COLUMNS (
phone_number STRING,
loyalty_tier STRING DEFAULT 'Bronze'
)
""")
# Partition evolution - change partition field
spark.sql("""
CALL hudi_change_partition_column(
table => 'hudi.customers',
partition_col => 'region',
new_partition_col => 'country',
params => map(
'hoodie_PARTITION_evolutION_parallelism', '100'
)
)
""")
# Show table properties
spark.sql("DESCRIBE EXTENDED hudi.customers").show(truncate=False)
Performance Metrics
| Metric | COW Table | MOR Table | Optimized COW | Optimized MOR |
|---|---|---|---|---|
| Upsert Latency (1K records) | 12-15 seconds | 2-4 seconds | 8-10 seconds | 1-2 seconds |
| Upsert Throughput | 500-800 records/sec | 2000-5000 records/sec | 800-1200 records/sec | 5000-10000 records/sec |
| Read Latency (point query) | 50-100 ms | 150-300 ms (with merge) | 30-60 ms | 100-200 ms |
| Read Latency (full scan) | 10-20 seconds | 15-30 seconds | 8-15 seconds | 12-25 seconds |
| Storage Overhead | ~10-15% | ~5-8% | ~8-12% | ~4-6% |
| File Count | Lower (compacted) | Higher (logs) | Lowest | Moderate |
| Compaction Cost | N/A | CPU-intensive | N/A | Moderate |
| Concurrent Writers | 2-3x slower | 1.5x slower | 1.5x slower | 1.2x slower |
| Time Travel Latency | < 1 second | < 2 seconds | < 1 second | < 1.5 seconds |
| Incremental Query | Supported | Supported | Faster | Faster |
Best Practices
- Choose COW for read-heavy workloads where write latency is acceptable and read performance is critical
- Choose MOR for write-heavy workloads where near-real-time ingestion is required and read latency can be traded
- Configure
hoodie.compaction.target.ioto balance compaction aggressiveness with write performance - Use
hoodie.datasource.write.operation=bulk_insertfor initial loads to avoid unnecessary deduplication overhead - Enable
hoodie.cleaner.policy=KEEP_LATEST_COMMITSwith appropriatehoodie.cleaner.commits.retainedfor time travel requirements - Tune
hoodie.logfile.max.sizeto control log file splitting and compaction frequency for MOR tables - Use bucket indexing (
hoodie.index.type=BUCKET) for high-volume upsert workloads with uniform key distribution - Schedule clustering for COW tables to consolidate small files and improve scan performance
- Monitor the timeline using
SHOW COMMITSto identify compaction lag and write latency issues - Use
hoodie.datasource.write.recordkey.fieldas a unique identifier to enable efficient upserts - Configure
hoodie.parquet.block.sizeto match your typical file size requirements (default 128MB) - Enable
hoodie.metadata.enable=truefor large datasets to enable efficient metadata queries and faster compaction
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)