πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Delta Lake and Modern Data Lakehouse

PySpark AdvancedDelta Lake⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 19: Delta Lake β€” The Lakehouse Architecture

MetaNetflixDifficulty: Hard

Interview Question

"At Meta, we use Delta Lake for our data lakehouse. Walk us through how ACID transactions work in Delta Lake, how time travel enables data recovery, and how you would implement a production merge (upsert) pattern." β€” Meta Data Engineer Interview

"At Netflix, we need data versioning for our recommendation systems. Explain Delta Lake's transaction log, how schema evolution works, and how you would optimize a Delta table that has accumulated many small files." β€” Netflix Senior Data Engineer Interview


Delta Lake Architecture

Delta Lake = Parquet files + Transaction Log (JSON)

delta-table/_delta_log/00000000000000000000.json00000000000000000001.json00000000000000000002.jsonpart-00000-...-00000.parquetpart-00001-...-00001.parquetpart-00002-...-00002.parquet# Commit 0# Commit 1# Commit 2

ACID Transactions

Write Operations

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("DeltaLakeInterview") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create Delta table
df = spark.createDataFrame([
    (1, "Alice", 100),
    (2, "Bob", 200),
    (3, "Charlie", 300)
], ["id", "name", "amount"])

df.write.format("delta").mode("overwrite").save("s3a://bucket/delta-table/")

# Read Delta table
delta_df = spark.read.format("delta").load("s3a://bucket/delta-table/")

# Append
new_df = spark.createDataFrame([
    (4, "Diana", 400),
    (5, "Eve", 500)
], ["id", "name", "amount"])

new_df.write.format("delta").mode("append").save("s3a://bucket/delta-table/")

# Overwrite specific partition
df.write.format("delta") \
    .mode("overwrite") \
    .option("replaceWhere", "date = '2024-01-01'") \
    .save("s3a://bucket/delta-table/")

Merge (Upsert)

# Load Delta table
delta_table = DeltaTable.forPath(spark, "s3a://bucket/delta-table/")

# Merge operation (upsert)
updates_df = spark.createDataFrame([
    (1, "Alice Updated", 150),  # Update existing
    (6, "Frank", 600)           # Insert new
], ["id", "name", "amount"])

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Conditional merge
delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(
    condition="source.amount > target.amount",
    set={
        "name": "source.name",
        "amount": "source.amount"
    }
).whenNotMatchedInsertAll() \
 .execute()

# Delete operation
delta_table.delete(col("amount") < 100)

# Update operation
delta_table.update(
    condition=col("name") == "Alice",
    set={"amount": col("amount") * 1.1}
)

Time Travel

# Read previous version by version number
v0_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("s3a://bucket/delta-table/")

# Read previous version by timestamp
ts_df = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01") \
    .load("s3a://bucket/delta-table/")

# View table history
delta_table = DeltaTable.forPath(spark, "s3a://bucket/delta-table/")
history = delta_table.history()
history.show()

# Restore to previous version
delta_table.restoreToVersion(0)

# RESTORE to specific timestamp
delta_table.restoreToTimestamp("2024-01-01")

Schema Evolution

# Add new column
new_schema_df = spark.createDataFrame([
    (1, "Alice", 100, "US"),
    (2, "Bob", 200, "UK")
], ["id", "name", "amount", "country"])

new_schema_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("s3a://bucket/delta-table/")

# Rename column (Delta Lake 2.0+)
delta_table = DeltaTable.forPath(spark, "s3a://bucket/delta-table/")
delta_table.alias("t").withColumnRenamed("amount", "revenue")

# Change column type (requires rewrite)
# Read with new schema, write back
spark.read.format("delta").load("s3a://bucket/delta-table/") \
    .withColumn("amount", col("amount").cast("double")) \
    .write.format("delta") \
    .mode("overwrite") \
    .save("s3a://bucket/delta-table/")

Real-World Scenario: Netflix Data Lakehouse

Problem Statement

Build a production data lakehouse pattern with Delta Lake for Netflix's viewing analytics, including merge operations, time travel for recovery, and performance optimization.

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("NetflixDeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.databricks.delta.optimizeWrite.enabled", "true") \
    .config("spark.databricks.delta.autoCompact.enabled", "true") \
    .getOrCreate()

# === CREATE DELTA TABLE ===
viewing_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("content_id", StringType(), False),
    StructField("view_date", DateType(), False),
    StructField("watch_duration_seconds", IntegerType(), True),
    StructField("completion_rate", DoubleType(), True),
    StructField("device_type", StringType(), True)
])

# Write initial data
initial_data = spark.read.parquet("s3a://netflix-data/viewing-initial/")
initial_data.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("view_date") \
    .save("s3a://netflix-delta/viewing-analytics/")

# === INCREMENTAL MERGE (Daily ETL) ===
def daily_merge(spark, source_df, target_path):
    """Merge daily viewing data into Delta table"""
    delta_table = DeltaTable.forPath(spark, target_path)
    
    delta_table.alias("target").merge(
        source_df.alias("source"),
        """
        target.user_id = source.user_id AND
        target.content_id = source.content_id AND
        target.view_date = source.view_date
        """
    ).whenMatchedUpdate(
        condition="source.watch_duration_seconds > target.watch_duration_seconds",
        set={
            "watch_duration_seconds": "source.watch_duration_seconds",
            "completion_rate": "source.completion_rate"
        }
    ).whenNotMatchedInsertAll() \
     .execute()

# Process daily batch
daily_source = spark.read.parquet("s3a://netflix-data/viewing-daily/2024-01-15/")
daily_merge(spark, daily_source, "s3a://netflix-delta/viewing-analytics/")

# === TIME TRAVEL FOR DATA RECOVERY ===
# accidental bad write
bad_df = spark.createDataFrame([], viewing_schema)
bad_df.write.format("delta") \
    .mode("overwrite") \
    .save("s3a://netflix-delta/viewing-analytics/")

# Recover from accident
delta_table = DeltaTable.forPath(spark, "s3a://netflix-delta/viewing-analytics/")
delta_table.restoreToVersion(5)  # Restore to version before bad write

# === PERFORMANCE OPTIMIZATION ===
# Optimize small files (compaction)
spark.sql("""
    OPTIMIZE delta.`s3a://netflix-delta/viewing-analytics/`
""")

# Z-order for fast lookups
spark.sql("""
    OPTIMIZE delta.`s3a://netflix-delta/viewing-analytics/`
    ZORDER BY (user_id, content_id)
""")

# Clean up old versions
spark.sql("""
    VACUUM delta.`s3a://netflix-delta/viewing-analytics/` RETAIN 168 HOURS
""")

# === CHANGE DATA FEED ===
# Get changes since version 0
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("s3a://netflix-delta/viewing-analytics/")

changes.show()

# === MONITORING ===
# View table history
delta_table = DeltaTable.forPath(spark, "s3a://netflix-delta/viewing-analytics/")
delta_table.history().show(truncate=False)

# Get table details
detail = spark.sql("DESCRIBE DETAIL delta.`s3a://netflix-delta/viewing-analytics/`")
detail.show(truncate=False)

spark.stop()

Delta Lake Features

1. Constraints

# Add constraints to enforce data quality
spark.sql("""
    ALTER TABLE delta.`s3a://bucket/table/`
    ADD CONSTRAINT valid_amount CHECK (amount >= 0)
""")

spark.sql("""
    ALTER TABLE delta.`s3a://bucket/table/`
    ADD CONSTRAINT valid_email CHECK (email LIKE '%@%')
""")

2. Liquid Clustering

# Replace partitioning with liquid clustering
spark.sql("""
    CREATE TABLE delta.`s3a://bucket/table/`
    USING DELTA
    CLUSTER BY (user_id, content_id)
""")

# Or convert existing table
spark.sql("""
    ALTER TABLE delta.`s3a://bucket/table/`
    CLUSTER BY (user_id, content_id)
""")

3. Change Data Feed

# Enable change data feed
spark.sql("""
    ALTER TABLE delta.`s3a://bucket/table/`
    SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")

# Read changes
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("s3a://bucket/table/")

Edge Cases

1. Concurrent Writes

# Delta Lake uses optimistic concurrency control
# Conflicts are detected and resolved automatically

# Write 1
df1.write.format("delta").mode("append").save(path)

# Write 2 (concurrent)
df2.write.format("delta").mode("append").save(path)

# One write may fail with ConcurrentAppendException
# Retry logic handles this

2. Schema Incompatibility

# Incompatible schema changes fail on merge
# Use mergeSchema option for additive changes
df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(path)

3. Large File Compaction

# Many small files degrade performance
# Use OPTIMIZE to compact
spark.sql("OPTIMIZE delta.`s3a://bucket/table/`")

# Or set auto-compaction
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

Performance Comparison

OperationRaw ParquetDelta LakeImprovement
UpsertManual (slow)MERGE (fast)10-100x
Read (latest)Full scanSnapshot2-10x
Schema evolutionRewrite allmergeSchemaNo rewrite
Time travelNot supportedNativeN/A
Concurrent writesNot supportedOptimistic lockingN/A

πŸ’‘Production Recommendation

Delta Lake is the standard for production data lakehouses. Use it when you need ACID transactions, time travel, schema evolution, or concurrent access. The overhead of the transaction log is minimal compared to the benefits.


Summary

Delta Lake brings database reliability to data lakes. ACID transactions prevent data corruption, time travel enables recovery, and schema evolution simplifies maintenance. At Meta and Netflix, Delta Lake is the foundation of production data pipelines, enabling reliable, performant, and maintainable data platforms.

Advertisement