πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Delta Lake Integration

Apache SparkData Lake⭐ Premium

Advertisement

Delta Lake Integration

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

Delta Lake Fundamentals

Delta Lake adds ACID transactions, schema enforcement, and time travel to Spark's data lake capabilities. It's built on top of Parquet files with a transaction log.

Creating Delta Tables

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
    .appName("DeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Write DataFrame as Delta table
df = spark.read.parquet("hdfs://data/sales")

df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .save("hdfs://delta/sales")

# Or save as managed table
df.write \
    .format("delta") \
    .saveAsTable("sales_table")

# Read Delta table
delta_df = spark.read.format("delta").load("hdfs://delta/sales")

ℹ️

Interview Insight: Delta Lake stores data as Parquet files with a JSON transaction log. The log tracks all changes, enabling ACID transactions and time travel.

ACID Transactions

# Delta provides ACID transactions for data lake operations
from delta.tables import DeltaTable

# Load existing Delta table
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")

# Upsert operation (MERGE)
new_data = spark.read.parquet("hdfs://data/new-sales")

delta_table.alias("target") \
    .merge(
        new_data.alias("source"),
        "target.sale_id = source.sale_id"
    ) \
    .whenMatchedUpdate(set={
        "amount": "source.amount",
        "updated_at": "source.timestamp"
    }) \
    .whenNotMatchedInsert(values={
        "sale_id": "source.sale_id",
        "amount": "source.amount",
        "created_at": "source.timestamp"
    }) \
    .execute()

# Atomic writes ensure data consistency
# If any part fails, the entire operation is rolled back

Time Travel

# Query historical versions of data
# By version number
df_version_5 = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("hdfs://delta/sales")

# By timestamp
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-06-28") \
    .load("hdfs://delta/sales")

# View table history
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
history = delta_table.history()
history.show(truncate=False)

# Rollback to previous version
delta_table.restoreToVersion(5)

# Compare versions
current = spark.read.format("delta").load("hdfs://delta/sales")
previous = spark.read.format("delta").option("versionAsOf", 4).load("hdfs://delta/sales")

diff = current.join(previous, "sale_id", "left_anti")
print(f"New rows: {diff.count()}")

⚠️

Warning: Time travel retains all historical data by default. Configure delta.logRetentionDuration and delta.deletedFileRetentionDuration to manage storage.

Schema Evolution

# Delta supports schema evolution without rewriting data
df = spark.read.parquet("hdfs://data/sales")

# Add new column
df_with_new_col = df.withColumn("tax", F.col("amount") * 0.08)

# Write with mergeSchema option
df_with_new_col.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("hdfs://delta/sales")

# Overwrite with new schema
df_new_schema = df.select("sale_id", "amount", "tax", "region")

df_new_schema.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("hdfs://delta/sales")

# Check schema evolution
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
print(delta_table.toDF().schema)

Z-Ordering for Query Optimization

# Z-ordering co-locates related data for faster queries
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")

# Z-order by frequently queried columns
delta_table.optimize().executeZOrderBy("customer_id", "product_id")

# Liquid clustering (Spark 3.x+)
# More flexible than Z-ordering
df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("delta.clusteringColumns", "customer_id, product_id") \
    .save("hdfs://delta/sales")

# Optimize existing table
spark.sql("""
    OPTIMIZE delta.`hdfs://delta/sales`
    ZORDER BY (customer_id, product_id)
""")

# Check file statistics
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")
spark.sql("""
    SELECT * FROM delta.`hdfs://delta/sales`.delta.`hdfs://delta/sales`.tombstones
""").show()

ℹ️

Pro Tip: Z-order on columns used in WHERE clauses and JOIN conditions. This enables data skipping and reduces I/O by reading only relevant files.

Data Compaction and Cleanup

# Compact small files for better performance
delta_table = DeltaTable.forPath(spark, "hdfs://delta/sales")

# Optimize: Compacts small files
delta_table.optimize().executeCompaction()

# Vacuum: Remove old files (after retention period)
# Default retention: 7 days
delta_table.vacuum(168)  # 168 hours = 7 days

# Configure retention
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
delta_table.vacuum(24)  # 24 hours (aggressive)

# Check file stats
spark.sql("""
    SELECT 
        count(*) as file_count,
        sum(size) as total_size_mb
    FROM delta.`hdfs://delta/sales`.delta_files
""").show()

Delta Lake with Streaming

# Delta supports both batch and streaming reads/writes
# Write streaming data to Delta
events_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .load()

parsed = events_stream \
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Write to Delta table
query = parsed \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "hdfs://checkpoints/events-delta") \
    .start("hdfs://delta/events")

# Read Delta as stream
streaming_df = spark.readStream \
    .format("delta") \
    .load("hdfs://delta/events")

# Process streaming data
processed = streaming_df \
    .groupBy(F.window("timestamp", "5 minutes")) \
    .agg(F.count("*").alias("count"))

query = processed \
    .writeStream \
    .format("console") \
    .outputMode("update") \
    .start()

Performance Tuning

# Optimize Delta performance
spark = SparkSession.builder \
    .appName("DeltaPerformance") \
    .config("spark.databricks.delta.properties.defaults.autoOptimize.enabled", "true") \
    .config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite.enabled", "true") \
    .config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact.enabled", "true") \
    .getOrCreate()

# Auto-optimize: Automatically compacts small files
df.write \
    .format("delta") \
    .mode("append") \
    .option("delta.autoOptimize.optimizeWrite", "true") \
    .option("delta.autoOptimize.autoCompact", "true") \
    .save("hdfs://delta/events")

# Manual optimization for specific tables
spark.sql("""
    OPTIMIZE delta.`hdfs://delta/events`
    WHERE date >= '2024-06-01'
""")

# Check optimization statistics
spark.sql("DESCRIBE EXTENDED delta.`hdfs://delta/events`").show(100)

ℹ️

Key Takeaway: Delta Lake brings database capabilities to data lakes. Use ACID transactions for data consistency, time travel for auditing, Z-ordering for query performance, and auto-optimize for small file management.

Follow-Up Questions

  • How does Delta Lake achieve ACID transactions on object storage?
  • Explain the difference between Z-ordering and liquid clustering.
  • How does time travel impact storage costs and query performance?
  • Describe strategies for migrating from Parquet to Delta Lake.
  • How does Delta Lake handle concurrent writes?

Advertisement