Apache Hudi Operations in PySpark

Free Lesson

Advertisement

πŸ„ Apache Hudi Operations in PySpark

Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      HUDI TABLE ARCHITECTURE                            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”‚
β”‚   β”‚   Hudi       │────▢│  Metadata    │────▢│  Timeline    β”‚          β”‚
β”‚   β”‚   Client     β”‚     β”‚  Table       β”‚     β”‚  Service     β”‚          β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                               β”‚                     β”‚                   β”‚
β”‚                               β–Ό                     β–Ό                   β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  .commit Files   β”‚   β”‚  .inflight Files β”‚         β”‚
β”‚                    β”‚  (JSON metadata) β”‚   β”‚  (Pending ops)   β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  .deltacommit    β”‚   β”‚  .compaction     β”‚         β”‚
β”‚                    β”‚  (MOR logs)      β”‚   β”‚  (Merge logs)    β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                             β”‚                      β”‚                    β”‚
β”‚                             β–Ό                      β–Ό                    β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”‚
β”‚                    β”‚  Parquet Files   β”‚   β”‚  Log Files       β”‚         β”‚
β”‚                    β”‚  (Base data)     β”‚   β”‚  (.log format)   β”‚         β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚               COW vs MOR TABLE COMPARISON                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   COW (Copy on Write)            MOR (Merge on Read)                   β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
β”‚   β”‚   Write Operation   β”‚       β”‚   Write Operation   β”‚               β”‚
β”‚   β”‚                     β”‚       β”‚                     β”‚               β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”‚       β”‚  β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”Œβ”€β”€β”€β” β”‚               β”‚
β”‚   β”‚  β”‚ A β”‚ β”‚ B β”‚ β”‚ C β”‚ β”‚       β”‚  β”‚ A β”‚ β”‚ B β”‚ β”‚ C β”‚ β”‚               β”‚
β”‚   β”‚  β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β”‚       β”‚  β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β””β”€β”€β”€β”˜ β”‚               β”‚
β”‚   β”‚       β”‚     β”‚       β”‚       β”‚       β”‚     β”‚       β”‚               β”‚
β”‚   β”‚       β–Ό     β–Ό       β”‚       β”‚       β–Ό     β–Ό       β”‚               β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚       β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚               β”‚
β”‚   β”‚  β”‚  Rewrite ALL  β”‚   β”‚       β”‚  β”‚  Append Log  β”‚   β”‚               β”‚
β”‚   β”‚  β”‚  Parquet Filesβ”‚   β”‚       β”‚  β”‚  (.log file) β”‚   β”‚               β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚       β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚               β”‚
β”‚   β”‚       β”‚              β”‚       β”‚       β”‚              β”‚               β”‚
β”‚   β”‚       β–Ό              β”‚       β”‚       β–Ό              β”‚               β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚       β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚               β”‚
β”‚   β”‚  β”‚  New Parquet  β”‚   β”‚       β”‚  β”‚  Base + Log  β”‚   β”‚               β”‚
β”‚   β”‚  β”‚  (clean)      β”‚   β”‚       β”‚  β”‚  (merged at  β”‚   β”‚               β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚       β”‚  β”‚   read time)  β”‚   β”‚               β”‚
β”‚   β”‚                     β”‚       β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚               β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚                                                                         β”‚
β”‚   Write: Slow (full rewrite)    Write: Fast (append only)              β”‚
β”‚   Read:  Fast (no merge)        Read:  Slow (merge at read)            β”‚
β”‚   Storage: Higher (full copy)   Storage: Lower (delta log)             β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  INCREMENTAL PROCESSING PIPELINE                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                         β”‚
β”‚   Batch 1            Batch 2            Batch 3                        β”‚
β”‚   (t=0)              (t=1)              (t=2)                          β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”            β”Œβ”€β”€β”€β”€β”€β”                       β”‚
β”‚   β”‚ Ξ”t0 │───────────▢│ Ξ”t1 │───────────▢│ Ξ”t2 β”‚                       β”‚
β”‚   β””β”€β”€β”¬β”€β”€β”˜            β””β”€β”€β”¬β”€β”€β”˜            β””β”€β”€β”¬β”€β”€β”˜                       β”‚
β”‚      β”‚                  β”‚                  β”‚                            β”‚
β”‚      β–Ό                  β–Ό                  β–Ό                            β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚                    HUDI TIMELINE                              β”‚     β”‚
β”‚   β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”        β”‚     β”‚
β”‚   β”‚  β”‚Commitβ”‚Commitβ”‚Commitβ”‚Commitβ”‚Commitβ”‚Commitβ”‚Commitβ”‚        β”‚     β”‚
β”‚   β”‚  β”‚  0   β”‚  1   β”‚  2   β”‚  3   β”‚  4   β”‚  5   β”‚  6   β”‚        β”‚     β”‚
β”‚   β”‚  β””β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”˜        β”‚     β”‚
β”‚   β”‚     β”‚         β”‚         β”‚                                    β”‚     β”‚
β”‚   β”‚     β–Ό         β–Ό         β–Ό                                    β”‚     β”‚
β”‚   β”‚  Read from Commit 2: get only changes since t=2            β”‚     β”‚
β”‚   β”‚  (Incremental query with begin and end commit time)        β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚   β”‚                  INCREMENTAL QUERY FLOW                      β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Input: (beginInstantTime, endInstantTime)                  β”‚     β”‚
β”‚   β”‚                                                              β”‚     β”‚
β”‚   β”‚  Timeline Filter ──▢ File Filter ──▢ Record Filter          β”‚     β”‚
β”‚   β”‚       β”‚                   β”‚                β”‚                  β”‚     β”‚
β”‚   β”‚       β–Ό                   β–Ό                β–Ό                  β”‚     β”‚
β”‚   β”‚  Select commits   Select affected    Apply remaining         β”‚     β”‚
β”‚   β”‚  in range         file slices        predicates             β”‚     β”‚
β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Detailed Explanation

Apache Hudi (Hadoop Upserts Deletes and Incremental processing) is an open-source data lake platform that provides record-level upserts, incremental processing, and table services. Unlike traditional data lake formats that only support append operations, Hudi enables efficient update and delete operations at the record level, making it ideal for use cases requiring near-real-time data ingestion with strict latency requirements.

The fundamental innovation in Hudi is its file layout management system. Every Hudi dataset consists of base files (Parquet format) and log files (for MOR tables). The metadata layer tracks which records exist in which files, enabling the system to surgically update only the affected files rather than rewriting entire partitions. This is achieved through a combination of record-level indexes, bloom filters, and file-level statistics.

The Copy-on-Write (COW) table type performs data updates by rewriting the entire base file containing the affected records. When an update arrives, Hudi identifies the base file containing the record, reads the entire file, applies the update, and writes a new version of the file. This approach provides excellent read performance since there is no merge overhead, but write operations are expensive due to the full file rewrite. COW is ideal for read-heavy workloads where write latency is less critical.

The Merge-on-Read (MOR) table type takes a fundamentally different approach. Instead of rewriting base files, updates are appended to log files. When a read operation occurs, Hudi merges the base files with the log files on-the-fly to produce the current view. This approach dramatically reduces write latency since only the log file is updated, but read performance suffers due to the merge overhead. MOR is ideal for write-heavy workloads with near-real-time ingestion requirements.

Hudi's timeline is the central coordination mechanism that tracks all operations performed on the dataset. Each commit, compaction, or cleaning operation creates an entry in the timeline with a timestamp. This enables incremental processingβ€”downstream consumers can query the timeline to identify exactly which files changed since their last read, and process only those changes. This is fundamentally different from full-refresh patterns where the entire dataset is reprocessed on each batch.

The compaction service is critical for MOR tables. It periodically merges log files into base files to maintain read performance. The compaction strategy can be configured to run inline (during writes), asynchronously, or on-demand. The compaction selector determines which file groups to compact based on factors like log file count, log file size, and time since last compaction. This balancing act between write performance and read performance is a key tuning parameter for Hudi deployments.

Mathematical Foundations

Definition: Hudi Timeline

The Hudi timeline T\mathcal{T} is a chronologically ordered set of actions T=[a1,a2,…,an]\mathcal{T} = [a_1, a_2, \ldots, a_n] where each action ai=(oi,si,ti)a_i = (o_i, s_i, t_i) has an operation type oi∈{CREATE,Β UPSERT,Β DELETE,Β COMPACT,Β CLUSTER}o_i \in \{\text{CREATE, UPSERT, DELETE, COMPACT, CLUSTER}\}, state si∈{INFLIGHT,Β COMMITTED,Β FAILED}s_i \in \{\text{INFLIGHT, COMMITTED, FAILED}\}, and timestamp tit_i.

Copy-on-Write Merge

For a COW table, an upsert of record set UU into file group FjF_j produces:

Fjβ€²=(Fjβˆ–{r∈Fj:r.key∈U.keys})βˆͺUF_j' = (F_j \setminus \{r \in F_j : r.\text{key} \in U.\text{keys}\}) \cup U

The entire file is rewritten, achieving O(∣Fj∣+∣U∣)O(|F_j| + |U|) I/O.

MOR Read Consistency Theorem

A MOR read at time tt merges the base file FF with log file LL where:

Read(F,L,t)=merge(F,{l∈L:l.t≀t})\text{Read}(F, L, t) = \text{merge}(F, \{l \in L : l.t \leq t\})

This guarantees eventual consistency: as compaction merges LL into FF, read latency decreases monotonically.

Compaction Write Amplification

For MOR compaction merging base file FF with log files {L1,…,Lk}\{L_1, \ldots, L_k\}:

WA=∣F∣+βˆ‘i=1k∣Li∣∣F∣=1+βˆ‘i=1k∣Li∣∣F∣W_A = \frac{|F| + \sum_{i=1}^{k} |L_i|}{|F|} = 1 + \frac{\sum_{i=1}^{k} |L_i|}{|F|}

Target: WA<2W_A < 2 to keep compaction overhead bounded.

File Grouping Efficiency

Optimal file sizing minimizes the total number of file groups:

min⁑gβˆ‘j=1g∣Fj∣s.t.βˆ€j:Sminβ‘β‰€βˆ£Fjβˆ£β‰€Smax⁑\min_g \sum_{j=1}^{g} |F_j| \quad \text{s.t.} \quad \forall j: S_{\min} \leq |F_j| \leq S_{\max}

Key Insight

Hudi's file group abstraction enables record-level versioning without full file rewrites. This makes MOR ideal for write-heavy workloads where COW's O(n)O(n) rewrite cost becomes prohibitive.

Summary

Hudi's timeline provides chronological ordering, COW achieves read-optimized merges at write cost, MOR optimizes writes with deferred compaction. The write amplification metric WAW_A guides compaction scheduling to balance read/write performance.

Key Concepts Table

ConceptDescriptionWhen to Use
COW TableCopy-on-Write: rewrites files on updateRead-heavy, batch workloads
MOR TableMerge-on-Read: appends logs on updateWrite-heavy, near-real-time ingestion
TimelineChronological list of all commitsIncremental processing, audit trail
File GroupSet of base files + associated log filesParallel processing unit
Record IndexBloom filter index on record keysEfficient record lookup for updates
Bucket IndexHash-based file assignmentHigh-volume upsert workloads
Global IndexIndex across all partitionsGlobal uniqueness enforcement
CompactionMerges log files into base filesMOR read performance optimization
ClusteringGroups small files into larger onesFile size optimization
CleanerRemoves old file versionsStorage reclamation

Code Examples

Setting Up Hudi with PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("HudiOperations") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Hudi configuration
hudi_options = {
    'hoodie.table.name': 'customers',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'customer_id',
    'hoodie.datasource.write.precombine.field': 'update_timestamp',
    'hoodie.datasource.write.partitionpath.field': 'region',
    'hoodie.table.shuffle.parallelism': '200',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.shuffle.enable': 'true',
    'hoodie.upsert.shuffle.parallelism': '200',
    'hoodie.insert.shuffle.parallelism': '200',
}

# Create initial dataset
customers_data = [
    (1, "Alice", "North", "2024-01-15 10:00:00", "alice@email.com"),
    (2, "Bob", "South", "2024-01-15 10:00:00", "bob@email.com"),
    (3, "Charlie", "East", "2024-01-15 10:00:00", "charlie@email.com"),
    (4, "Diana", "West", "2024-01-15 10:00:00", "diana@email.com"),
]
df = spark.createDataFrame(customers_data, [
    "customer_id", "name", "region", "update_timestamp", "email"
])

df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("overwrite") \
    .save("/hudi/customers")

# Upsert - update existing records
updates_data = [
    (1, "Alice Smith", "North", "2024-01-16 10:00:00", "alice.smith@email.com"),
    (5, "Eve", "South", "2024-01-16 10:00:00", "eve@email.com"),
]
updates_df = spark.createDataFrame(updates_data, [
    "customer_id", "name", "region", "update_timestamp", "email"
])

updates_df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save("/hudi/customers")

MOR Table with Incremental Queries

# Create MOR table for high-frequency updates
mor_options = {
    'hoodie.table.name': 'transactions',
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field': 'txn_id',
    'hoodie.datasource.write.precombine.field': 'event_time',
    'hoodie.datasource.write.partitionpath.field': 'date',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.compaction.inline': 'false',
    'hoodie.logfile.max.size': '1073741824',
    'hoodie.logfile.to.parquet.compression.ratio': '0.5',
    'hoodie.parquet.max.file.size': '134217728',
}

# Write transactions
txn_data = [
    ("TXN001", "CUST001", "2024-01-15 08:00:00", 100.00, "2024-01-15"),
    ("TXN002", "CUST002", "2024-01-15 08:30:00", 250.00, "2024-01-15"),
    ("TXN003", "CUST001", "2024-01-15 09:00:00", 75.00, "2024-01-15"),
]
txn_df = spark.createDataFrame(txn_data, [
    "txn_id", "cust_id", "event_time", "amount", "date"
])

txn_df.write.format("hudi") \
    .options(**mor_options) \
    .mode("overwrite") \
    .save("/hudi/transactions")

# Incremental read from last commit
incremental_df = spark.read \
    .format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "2024-01-15T08:00:00") \
    .option("hoodie.datasource.read.end.instanttime", "2024-01-15T09:00:00") \
    .load("/hudi/transactions")

incremental_df.show(truncate=False)

Compaction and Clustering

# Schedule compaction for MOR table
spark.sql("""
    CALL hudi_compact(
        table => 'hudi.transactions',
        params => map(
            'hoodie.compaction.target.io', '500000000',
            'hoodie.compaction.target.filesize', '134217728'
        )
    )
""")

# Async compaction
spark.sql("""
    CALL hudi_compact_async(
        table => 'hudi.transactions',
        params => map(
            'hoodie.compaction.target.io', '500000000'
        )
    )
""")

# Clustering for COW tables
spark.sql("""
    CALL hudi_clustering(
        table => 'hudi.customers',
        params => map(
            'hoodie.clustering.target.filemax', '10',
            'hoodie.clustering.sort.columns', 'customer_id',
            'hoodie.clustering.max.bytes.per.file', '134217728'
        )
    )
""")

# Show timeline
spark.sql("SHOW COMMITS hudi.transactions").show(truncate=False)

# Show compaction plan
spark.sql("SHOW COMPACTION hudi.transactions").show(truncate=False)

Schema Evolution and Partition Evolution

# Schema evolution
spark.sql("""
    ALTER TABLE hudi.customers ADD COLUMNS (
        phone_number STRING,
        loyalty_tier STRING DEFAULT 'Bronze'
    )
""")

# Partition evolution - change partition field
spark.sql("""
    CALL hudi_change_partition_column(
        table => 'hudi.customers',
        partition_col => 'region',
        new_partition_col => 'country',
        params => map(
            'hoodie_PARTITION_evolutION_parallelism', '100'
        )
    )
""")

# Show table properties
spark.sql("DESCRIBE EXTENDED hudi.customers").show(truncate=False)

Performance Metrics

MetricCOW TableMOR TableOptimized COWOptimized MOR
Upsert Latency (1K records)12-15 seconds2-4 seconds8-10 seconds1-2 seconds
Upsert Throughput500-800 records/sec2000-5000 records/sec800-1200 records/sec5000-10000 records/sec
Read Latency (point query)50-100 ms150-300 ms (with merge)30-60 ms100-200 ms
Read Latency (full scan)10-20 seconds15-30 seconds8-15 seconds12-25 seconds
Storage Overhead~10-15%~5-8%~8-12%~4-6%
File CountLower (compacted)Higher (logs)LowestModerate
Compaction CostN/ACPU-intensiveN/AModerate
Concurrent Writers2-3x slower1.5x slower1.5x slower1.2x slower
Time Travel Latency< 1 second< 2 seconds< 1 second< 1.5 seconds
Incremental QuerySupportedSupportedFasterFaster

Best Practices

  1. Choose COW for read-heavy workloads where write latency is acceptable and read performance is critical
  2. Choose MOR for write-heavy workloads where near-real-time ingestion is required and read latency can be traded
  3. Configure hoodie.compaction.target.io to balance compaction aggressiveness with write performance
  4. Use hoodie.datasource.write.operation=bulk_insert for initial loads to avoid unnecessary deduplication overhead
  5. Enable hoodie.cleaner.policy=KEEP_LATEST_COMMITS with appropriate hoodie.cleaner.commits.retained for time travel requirements
  6. Tune hoodie.logfile.max.size to control log file splitting and compaction frequency for MOR tables
  7. Use bucket indexing (hoodie.index.type=BUCKET) for high-volume upsert workloads with uniform key distribution
  8. Schedule clustering for COW tables to consolidate small files and improve scan performance
  9. Monitor the timeline using SHOW COMMITS to identify compaction lag and write latency issues
  10. Use hoodie.datasource.write.recordkey.field as a unique identifier to enable efficient upserts
  11. Configure hoodie.parquet.block.size to match your typical file size requirements (default 128MB)
  12. Enable hoodie.metadata.enable=true for large datasets to enable efficient metadata queries and faster compaction

See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)

Advertisement

Need Expert PySpark Help?

Get personalized Spark optimization, cluster tuning, or production data pipeline consulting.

Advertisement