πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: DataFrame API Mastery

PySpark AdvancedDataFrame Operations⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 03: DataFrame API β€” Declarative Data Processing

AmazonUberDifficulty: Hard

Interview Question

"At Amazon, our data pipelines process billions of transactions daily using DataFrames. Walk us through the complete DataFrame API β€” from reading raw data to complex aggregations. How does the Catalyst optimizer transform your declarative queries, and what pitfalls should we avoid when chaining transformations?" β€” Amazon Data Engineer Interview

"At Uber, we build real-time analytics pipelines using DataFrames. Explain how you would implement a complex transformation that involves multiple joins, window functions, and conditional logic. How do you ensure performance when joining a 100GB rides table with a 1GB driver dimension table?" β€” Uber Senior Data Engineer Interview


DataFrame Fundamentals

A DataFrame is a distributed collection of rows organized into named columns, conceptually equivalent to a table in a relational database. Under the hood, it's built on RDDs but with schema information and Catalyst optimization.

Creating DataFrames

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, lit, when, coalesce, expr

spark = SparkSession.builder.appName("DataFrameInterview").getOrCreate()

# From a list of tuples
data = [
    ("Alice", 30, "Engineering"),
    ("Bob", 25, "Marketing"),
    ("Charlie", 35, "Engineering"),
    ("Diana", 28, "Marketing")
]
df = spark.createDataFrame(data, ["name", "age", "department"])

# From a list of dictionaries
data_dicts = [
    {"name": "Alice", "age": 30, "department": "Engineering"},
    {"name": "Bob", "age": 25, "department": "Marketing"}
]
df = spark.createDataFrame(data_dicts)

# With explicit schema
schema = StructType([
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("department", StringType(), False)
])
df = spark.createDataFrame(data, schema)

# From a CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3a://bucket/data.csv")

# From JSON
df = spark.read.json("s3a://bucket/data.json")

# From Parquet (most common in production)
df = spark.read.parquet("s3a://bucket/data.parquet")

Selection and Column Operations

Basic Selection

# Column selection
df.select("name", "age")  # By string
df.select(col("name"), col("age"))  # By Column object
df.select(df.name, df.age)  # By attribute access

# Column aliasing
df.select(
    col("name").alias("employee_name"),
    col("age").alias("employee_age")
)

# Adding columns
df = df.withColumn("age_plus_10", col("age") + 10)
df = df.withColumn("country", lit("USA"))

# Dropping columns
df = df.drop("age_plus_10")

# Renaming columns
df = df.withColumnRenamed("name", "employee_name")

# Select all columns
df.select("*")

# Select with expressions
df.selectExpr("name", "age + 10 AS age_plus_10", "department")

Column Expressions

from pyspark.sql.functions import (
    col, when, coalesce, expr, upper, lower,
    concat, substring, length, trim, regexp_replace,
    date_format, to_date, current_timestamp
)

# Conditional column creation
df = df.withColumn(
    "seniority",
    when(col("age") >= 35, "Senior")
    .when(col("age") >= 28, "Mid")
    .otherwise("Junior")
)

# Complex expressions with expr
df = df.withColumn(
    "name_length",
    expr("length(name)")
)

# String operations
df = df.withColumn("upper_name", upper(col("name")))
df = df.withColumn("name_no_spaces", trim(col("name")))

# Date operations
df = df.withColumn("current_date", current_timestamp())

πŸ’‘Amazon Pro Tip

Prefer col() over df.column_name for production code. Attribute access (df.name) fails silently when column names contain spaces or special characters. col("column name") always works.


Filtering Data

# Basic filtering
df.filter(col("age") > 30)
df.where(col("age") > 30)  # Alias for filter

# Multiple conditions
df.filter((col("age") > 25) & (col("department") == "Engineering"))
df.filter((col("age") > 25) | (col("department") == "Marketing"))

# IN clause
df.filter(col("department").isin(["Engineering", "Marketing"]))

# IS NULL / IS NOT NULL
df.filter(col("name").isNull())
df.filter(col("name").isNotNull())

# String matching
df.filter(col("name").startswith("A"))
df.filter(col("name").contains("li"))
df.filter(col("name").rlike("^A.*e$"))  # Regex

# Between
df.filter(col("age").between(25, 35))

# NOT conditions
df.filter(~col("department").isin(["Engineering"]))
df.filter(~col("name").isNull())

# SQL expression filtering
df.filter("age > 30 AND department = 'Engineering'")

Grouping and Aggregation

from pyspark.sql.functions import (
    count, countDistinct, sum, avg, min, max,
    first, last, collect_list, collect_set,
    stddev, variance, kurtosis, skewness
)

# Basic groupBy
df.groupBy("department").count()

# Multiple aggregations
df.groupBy("department").agg(
    count("*").alias("total_employees"),
    avg("age").alias("avg_age"),
    min("age").alias("min_age"),
    max("age").alias("max_age"),
    countDistinct("name").alias("unique_names")
)

# Named aggregations (Spark 2.0+)
df.groupBy("department").agg(
    count("*").alias("count"),
    avg("age").alias("avg_age")
)

# Collect as list (use sparingly β€” can OOM driver)
df.groupBy("department").agg(
    collect_list("name").alias("all_names")
)

# Multiple groupBy columns
df.groupBy("department", "age").count()

# Filter after aggregation
df.groupBy("department").agg(
    count("*").alias("count")
).filter(col("count") > 1)

# Sorting after aggregation
df.groupBy("department").agg(
    avg("age").alias("avg_age")
).orderBy(col("avg_age").desc())

Handling Null Values in Aggregation

from pyspark.sql.functions import when, count, sum

# Count only non-null values
df.groupBy("department").agg(
    count("age").alias("non_null_count"),
    count("*").alias("total_count"),
    (count("*") - count("age")).alias("null_count")
)

# Sum ignoring nulls
df.groupBy("department").agg(
    sum(when(col("age").isNotNull(), 1).otherwise(0)).alias("valid_count")
)

Real-World Scenario: Uber Ride Analytics Pipeline

Problem Statement

Build a complete analytics pipeline that reads ride data, driver data, and payment data. Compute ride metrics, driver performance, and revenue analytics with complex aggregations.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, when, coalesce, expr,
    count, countDistinct, sum, avg, min, max,
    datediff, current_timestamp, date_format,
    round as _round, percentile_approx
)

spark = SparkSession.builder \
    .appName("UberRideAnalytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Read raw data
rides_df = spark.read.parquet("s3a://uber-data/rides/")
drivers_df = spark.read.parquet("s3a://uber-data/drivers/")
payments_df = spark.read.parquet("s3a://uber-data/payments/")

# Step 1: Clean and standardize rides
clean_rides = rides_df \
    .filter(col("ride_status") == "completed") \
    .filter(col("pickup_time").isNotNull()) \
    .withColumn(
        "ride_duration_minutes",
        _round((col("dropoff_time").cast("long") - col("pickup_time").cast("long")) / 60, 2)
    ) \
    .withColumn(
        "ride_date",
        to_date(col("pickup_time"))
    ) \
    .withColumn(
        "hour_of_day",
        hour(col("pickup_time"))
    ) \
    .withColumn(
        "day_of_week",
        date_format(col("pickup_time"), "EEEE")
    )

# Step 2: Join with drivers
enriched_rides = clean_rides \
    .join(drivers_df, clean_rides.driver_id == drivers_df.driver_id, "left") \
    .select(
        clean_rides["*"],
        drivers_df.driver_name,
        drivers_df.vehicle_type,
        drivers_df.rating
    )

# Step 3: Join with payments
full_rides = enriched_rides \
    .join(payments_df, enriched_rides.ride_id == payments_df.ride_id, "left") \
    .withColumn(
        "fare_amount",
        coalesce(payments_df.fare, lit(0))
    ) \
    .withColumn(
        "tip_amount",
        coalesce(payments_df.tip, lit(0))
    ) \
    .withColumn(
        "total_revenue",
        col("fare_amount") + col("tip_amount")
    )

# Step 4: Complex aggregations
# Daily ride metrics
daily_metrics = full_rides \
    .groupBy("ride_date") \
    .agg(
        count("*").alias("total_rides"),
        countDistinct("driver_id").alias("active_drivers"),
        _round(avg("ride_duration_minutes"), 2).alias("avg_duration"),
        _round(avg("fare_amount"), 2).alias("avg_fare"),
        _round(sum("total_revenue"), 2).alias("total_revenue"),
        _round(percentile_approx("fare_amount", 0.95), 2).alias("p95_fare")
    )

# Driver performance
driver_performance = full_rides \
    .groupBy("driver_id", "driver_name") \
    .agg(
        count("*").alias("total_rides"),
        _round(avg("ride_duration_minutes"), 2).alias("avg_duration"),
        _round(avg("fare_amount"), 2).alias("avg_fare"),
        _round(sum("total_revenue"), 2).alias("total_revenue"),
        _round(avg("tip_amount"), 2).alias("avg_tip"),
        _round(avg("rating"), 2).alias("avg_rating")
    ) \
    .withColumn(
        "performance_score",
        _round(
            (col("total_rides") * 0.3) +
            (col("avg_rating") * 20 * 0.3) +
            (col("avg_fare") * 0.2) +
            (col("avg_tip") * 0.2), 2
        )
    ) \
    .orderBy(col("performance_score").desc())

# Hourly demand pattern
hourly_demand = full_rides \
    .groupBy("hour_of_day", "day_of_week") \
    .agg(
        count("*").alias("ride_count"),
        _round(avg("fare_amount"), 2).alias("avg_fare")
    ) \
    .orderBy("day_of_week", "hour_of_day")

# Show results
daily_metrics.show(10, truncate=False)
driver_performance.show(10, truncate=False)
hourly_demand.show(20, truncate=False)

spark.stop()

Complex Transformations

Conditional Logic

from pyspark.sql.functions import when, coalesce, expr

# Multi-condition transformation
df = df.withColumn(
    "fare_category",
    when(col("fare_amount") < 10, "Budget")
    .when(col("fare_amount") < 25, "Standard")
    .when(col("fare_amount") < 50, "Premium")
    .otherwise("Luxury")
)

# Coalesce for null handling
df = df.withColumn(
    "display_name",
    coalesce(col("nickname"), col("first_name"), col("name"))
)

# Complex expression with expr
df = df.withColumn(
    "adjusted_fare",
    expr("""
        CASE 
            WHEN is_peak_hour THEN fare_amount * 1.5
            WHEN is_rainy THEN fare_amount * 1.2
            ELSE fare_amount
        END
    """)
)

Pivoting and Unpivoting

# Pivot: rows to columns
monthly_sales = df.groupBy("product_category").pivot("month").sum("revenue")

# Unpivot: columns to rows (Spark 3.4+)
unpivoted = monthly_sales.unpivot(
    ["product_category"],
    ["January", "February", "March"],
    "month", "revenue"
)

# Manual unpivot for older Spark versions
from pyspark.sql.functions import stack
df_unpivoted = df.selectExpr(
    "product_category",
    "stack(12, 'Jan', Jan, 'Feb', Feb, 'March', Mar, ...) as (month, revenue)"
)

Complex Joins

# Multi-condition join
result = df_a.join(
    df_b,
    (df_a.id == df_b.id) & (df_a.date == df_b.date),
    "inner"
)

# Self-join
employees = df.alias("e1").join(
    df.alias("e2"),
    col("e1.manager_id") == col("e2.employee_id"),
    "left"
)

# Anti-join (find records in A not in B)
missing = df_a.join(df_b, df_a.id == df_b.id, "left_anti")

# Semi-join (find records in A that exist in B)
existing = df_a.join(df_b, df_a.id == df_b.id, "left_semi")

Edge Cases and Pitfalls

1. Column Ambiguity in Joins

# PROBLEM: Both DataFrames have "id" column
result = df_a.join(df_b, df_a.id == df_b.id)
# AmbiguousReferenceException!

# SOLUTION: Alias DataFrames
result = df_a.alias("a").join(
    df_b.alias("b"),
    col("a.id") == col("b.id"),
    "inner"
).select(
    col("a.id").alias("a_id"),
    col("b.id").alias("b_id"),
    col("a.value")
)

2. Null Handling in Joins

# NULLs don't match in joins
df_a = spark.createDataFrame([(1, "a"), (2, None)], ["id", "val"])
df_b = spark.createDataFrame([(2, "x"), (3, "y")], ["id", "val"])

# Join on val: only (2, x) matches, NULLs are excluded
result = df_a.join(df_b, df_a.val == df_b.val, "inner")

# Use coalesce for null-safe joins
result = df_a.join(
    df_b,
    coalesce(df_a.val, lit("__NULL__")) == coalesce(df_b.val, lit("__NULL__")),
    "left"
)

3. Data Skew in GroupBy

# PROBLEM: One key has 100x more data
df.groupBy("skewed_key").count()
# One partition takes 100x longer (straggler)

# SOLUTION 1: Salting
salted = df.withColumn(
    "salt",
    (rand() * 10).cast("int")
).withColumn(
    "salted_key",
    concat(col("skewed_key"), lit("_"), col("salt"))
)

salted.groupBy("salted_key").count() \
    .withColumn("original_key", split(col("salted_key"), "_")[0]) \
    .groupBy("original_key").agg(sum("count").alias("total_count"))

# SOLUTION 2: Adaptive query execution (Spark 3.0+)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

⚠️Amazon Interview Gotcha

At Amazon, interviewers expect you to know that DataFrames are NOT type-safe in PySpark. Unlike Scala's Dataset, PySpark DataFrames don't catch type errors at compile time. Always add schema validation and data quality checks in production.


Performance Best Practices

PracticeImpactExample
Predicate pushdownReduces data readfilter() before join()
Column pruningReduces I/Oselect() only needed columns
Broadcast joinsEliminates shufflebroadcast(df_small)
Repartition by join keyReduces shuffle skewrepartition("key")
Cache repeated DataFramesAvoids recomputationdf.cache()
Use Parquet/ORCColumnar compressionspark.read.parquet()
AQE enabledAuto-optimizationspark.sql.adaptive.enabled=true
# Optimized pipeline example
from pyspark.sql.functions import broadcast

result = large_df \
    .filter(col("date") >= "2024-01-01") \
    .select("id", "value", "date") \
    .join(broadcast(small_dim_df), "id") \
    .groupBy("category") \
    .agg(sum("value").alias("total")) \
    .cache()

result.count()  # Materialize cache

Summary

DataFrame API mastery is essential for data engineering roles at Amazon and Uber. Understanding declarative transformations, Catalyst optimization hints, and common pitfalls like column ambiguity and data skew separates strong candidates from average ones. Always think about data movement (shuffles) when designing transformations.

Advertisement