πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: File Formats and Storage Optimization

PySpark AdvancedFile Formats⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 14: File Formats β€” Choosing the Right Storage

UberGoogleDifficulty: Hard

Interview Question

"At Uber, we store petabytes of ride data. Walk us through the trade-offs between Parquet, ORC, Avro, and Delta Lake. When would you choose each, and how do compression codecs affect query performance?" β€” Uber Data Engineer Interview

"At Google, we optimize storage costs while maintaining query performance. Explain how columnar storage works, the impact of file sizing on performance, and how you would optimize a Parquet dataset for both analytics and point lookups." β€” Google Senior Data Engineer Interview


File Format Comparison

FormatStorageCompressionSchema EvolutionColumnarRow-basedBest For
ParquetColumnarExcellentYesYesNoAnalytics, Hadoop
ORCColumnarExcellentYesYesNoHive, ACID transactions
AvroRow-basedGoodYesNoYesStreaming, write-heavy
Delta LakeParquet+LogExcellentYesYesNoACID, time travel
JSONRow-basedPoorYesNoNoAPIs, semi-structured
CSVRow-basedPoorNoNoNoSimple exports
AvroRow-basedGoodYesNoYesSchema evolution

Parquet Deep Dive

Columnar Storage

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FileFormatInterview").getOrCreate()

# Create sample data
data = [(i, f"user_{i}", i * 100, "2024-01-01") for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "name", "value", "date"])

# Write as Parquet
df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet("s3a://bucket/data-parquet/")

# Write as CSV for comparison
df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("s3a://bucket/data-csv/")

# Check file sizes
parquet_size = spark.read.parquet("s3a://bucket/data-parquet/").count()
csv_size = spark.read.csv("s3a://bucket/data-csv/", header=True).count()

Parquet Configuration

# Write with optimal settings
df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .option("parquet.block.size", 128 * 1024 * 1024)  # 128MB row group
    .option("parquet.page.size", 1 * 1024 * 1024)  # 1MB page
    .option("parquet.dictionary.enabled", "true") \
    .option("parquet.dictionary.page.size", 1 * 1024 * 1024) \
    .option("parquet.enable.dictionary", "true") \
    .parquet("s3a://bucket/data-optimized/")

# Read with predicate pushdown
result = spark.read.parquet("s3a://bucket/data-optimized/") \
    .filter(col("id") > 500000) \
    .select("id", "name")
# Only reads relevant row groups (predicate pushdown)

Parquet Compression Codecs

CodecCompressionSpeedSplittableUse Case
SnappyGoodFastYesDefault, balanced
GzipBetterSlowNoArchival, small files
LZ4GoodFastestYesSpeed-critical
ZstdBestMediumYesBest compression
NoneNoneFastestYesAlready compressed
# Write with different codecs
df.write.option("compression", "snappy").parquet("s3a://bucket/snappy/")
df.write.option("compression", "gzip").parquet("s3a://bucket/gzip/")
df.write.option("compression", "zstd").parquet("s3a://bucket/zstd/")
df.write.option("compression", "lz4").parquet("s3a://bucket/lz4/")

ORC Format

ORC Features

# ORC is optimized for Hive workloads
df.write \
    .mode("overwrite") \
    .option("orc.compress", "zlib") \
    .option("orc.stripe.size", "67108864")  # 64MB stripe
    .option("orc.row.index.stride", "10000") \
    .orc("s3a://bucket/data-orc/")

# ORC with ACID support (Hive 3.0+)
spark.sql("""
    CREATE TABLE orc_table (
        id INT,
        name STRING,
        value INT
    )
    STORED AS ORC
    TBLPROPERTIES ('transactional' = 'true')
""")

ORC vs Parquet

FeatureORCParquet
ACID SupportNative (Hive 3.0+)Via Delta Lake
IndexRow-level bloom filterMin/max statistics
CompressionZlib (better)Snappy (faster)
EcosystemHive-nativeBroader support
PerformanceBetter for HiveBetter for Spark

Avro Format

Row-Based Storage

# Avro is row-based β€” better for write-heavy workloads
df.write \
    .mode("overwrite") \
    .avro("s3a://bucket/data-avro/")

# Read Avro
avro_df = spark.read.avro("s3a://bucket/data-avro/")

# Schema evolution with Avro
# Avro stores schema with data, enabling evolution

Avro Use Cases

# 1. Streaming (Kafka) β€” schema evolution
# 2. Write-heavy workloads
# 3. Cross-platform compatibility
# 4. When you need row-level access

Delta Lake

ACID Transactions

# Delta Lake = Parquet + Transaction Log
# Provides ACID transactions, schema enforcement, time travel

# Write Delta table
df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("s3a://bucket/data-delta/")

# ACID merge (upsert)
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "s3a://bucket/data-delta/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time travel
# Read previous version
previous_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("s3a://bucket/data-delta/")

# View history
delta_table.history().show()

Delta Lake Features

# 1. Schema evolution
delta_table = DeltaTable.forPath(spark, "s3a://bucket/data-delta/")
delta_table.addColumns([StructField("new_col", StringType(), True)])

# 2. Data skipping (Z-ordering)
spark.sql("""
    OPTIMIZE delta.`s3a://bucket/data-delta/`
    ZORDER BY (id, date)
""")

# 3. VACUUM (clean old versions)
spark.sql("""
    VACUUM delta.`s3a://bucket/data-delta/` RETAIN 168 HOURS
""")

# 4. Change data feed
spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("s3a://bucket/data-delta/")

Real-World Scenario: Uber Storage Optimization

Problem Statement

Optimize a 10TB ride dataset for multiple access patterns: analytics queries, point lookups, and streaming ingestion. Choose the right format and configuration.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("UberStorageOptimization") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read raw ride data
raw_rides = spark.read.json("s3a://uber-data/rides-raw/")

# === FORMAT SELECTION STRATEGY ===

# 1. Analytics workload (historical queries)
# Use Parquet with Snappy compression
analytics_rides = raw_rides \
    .select("ride_id", "driver_id", "rider_id", "pickup_time", 
            "dropoff_time", "fare_amount", "status", "city")

analytics_rides.write \
    .mode("overwrite") \
    .partitionBy("city", "year", "month") \
    .option("compression", "snappy") \
    .parquet("s3a://uber-storage/rides-analytics/")

# 2. Real-time ingestion (Kafka source)
# Use Delta Lake for ACID and schema evolution
realtime_rides = raw_rides \
    .withColumn("ingestion_time", current_timestamp())

realtime_rides.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("city") \
    .save("s3a://uber-storage/rides-realtime/")

# 3. Point lookups (API serving)
# Use Parquet with Z-ordering for fast lookups
point_lookup_rides = raw_rides \
    .select("ride_id", "rider_id", "driver_id", "status", "fare_amount")

point_lookup_rides.write \
    .mode("overwrite") \
    .option("compression", "zstd") \
    .parquet("s3a://uber-storage/rides-pointlookup/")

# Optimize for point lookups
spark.sql("""
    OPTIMIZE delta.`s3a://uber-storage/rides-realtime/`
    ZORDER BY (ride_id)
""")

# === COMPRESSION COMPARISON ===

# Test different codecs
codecs = ["snappy", "gzip", "zstd", "lz4"]
for codec in codecs:
    start = time.time()
    analytics_rides.write \
        .mode("overwrite") \
        .option("compression", codec) \
        .parquet(f"s3a://uber-storage/rides-{codec}/")
    write_time = time.time() - start
    
    # Measure read performance
    start = time.time()
    read_df = spark.read.parquet(f"s3a://uber-storage/rides-{codec}/")
    read_df.filter(col("city") == "NYC").count()
    read_time = time.time() - start
    
    print(f"{codec}: Write={write_time:.2f}s, Read={read_time:.2f}s")

# === FILE SIZE OPTIMIZATION ===

# Bad: Too many small files
analytics_rides.repartition(10000) \
    .write.mode("overwrite") \
    .parquet("s3a://uber-storage/rides-small-files/")

# Good: Optimal file size (128-256MB)
analytics_rides.repartition(
    int(analytics_rides.count() * 200 / (1024 * 1024 * 256))  # ~256MB per file
).write.mode("overwrite") \
    .parquet("s3a://uber-storage/rides-optimal/")

# Best: Let Spark handle it with coalesce
analytics_rides.coalesce(100) \
    .write.mode("overwrite") \
    .parquet("s3a://uber-storage/rides-coalesced/")

spark.stop()

File Sizing Best Practices

# Optimal file size: 128MB-256MB (compressed)
# Too small: many small files β†’ overhead
# Too large: fewer files β†’ less parallelism

# Calculate optimal partition count
data_size_bytes = 10 * 1024 * 1024 * 1024 * 1024  # 10TB
target_file_size_bytes = 256 * 1024 * 1024  # 256MB
optimal_files = data_size_bytes / target_file_size_bytes
print(f"Optimal files: {optimal_files}")  # ~40,000 files

# Compact small files
spark.sql("""
    OPTIMIZE delta.`s3a://bucket/data/`
    ZORDER BY (key_column)
""")

Edge Cases

1. Schema Evolution

# Parquet schema evolution
df.write.mode("overwrite").parquet("s3a://bucket/data/")

# Add new column
df_new = df.withColumn("new_col", lit("value"))
df_new.write.mode("append").parquet("s3a://bucket/data/")
# Will fail without schema merge

# Enable schema merge
df_new.write.mode("append") \
    .option("mergeSchema", "true") \
    .parquet("s3a://bucket/data/")

2. Corrupt Files

# Handle corrupt files gracefully
df = spark.read.parquet("s3a://bucket/data/") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record")

# Filter out corrupt records
clean_df = df.filter(col("_corrupt_record").isNull())

3. Predicate Pushdown

# Parquet: min/max statistics enable predicate pushdown
# ORC: Bloom filters enable point lookup pushdown
# Delta: Data skipping with Z-ordering

# Verify pushdown in explain plan
df.filter(col("date") == "2024-01-01").explain(True)
# Shows PartitionFilters in the plan

Performance Comparison

FormatRead SpeedWrite SpeedCompressionColumn PruningPredicate Pushdown
ParquetFastMediumExcellentYesYes (min/max)
ORCFastMediumExcellentYesYes (bloom)
AvroMediumFastGoodNoNo
DeltaFastMediumExcellentYesYes (Z-order)
CSVSlowFastPoorNoNo
JSONSlowSlowPoorNoNo

πŸ’‘Production Recommendation

For most Spark workloads, Parquet with Snappy compression is the default choice. Use Delta Lake when you need ACID transactions, schema evolution, or time travel. Use Avro for streaming workloads with schema evolution.


Summary

Choosing the right file format impacts query performance, storage costs, and operational complexity. Parquet is the standard for columnar analytics; Delta Lake adds ACID and time travel; Avro is best for streaming and write-heavy workloads. Understanding compression trade-offs and file sizing is critical for optimizing petabyte-scale storage at Uber and Google.

Advertisement