πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Spark SQL and Temporary Views

PySpark AdvancedSpark SQL⭐ Premium

Advertisement

PySpark Advanced Interview Series

Module 04: Spark SQL β€” The Power of SQL on Distributed Data

GoogleMicrosoftDifficulty: Hard

Interview Question

"At Google, BigQuery and Spark SQL share many concepts. Walk us through how Spark SQL processes a complex query with subqueries, CTEs, and window functions. How does the Catalyst optimizer transform this query, and what control do you have over execution plans?" β€” Google Data Engineer Interview

"At Microsoft, we use Spark SQL extensively for Azure Synapse Analytics. Explain the difference between global temporary views, local temporary views, and managed tables. How do you register a UDF that works in both DataFrame API and SQL context?" β€” Microsoft Senior Data Engineer Interview


Spark SQL Architecture

Spark SQL is Spark's module for structured data processing. It provides:

  1. SQL Interface: Execute SQL queries on DataFrames and tables
  2. Catalyst Optimizer: Automatic query optimization
  3. Tungsten Execution Engine: Efficient code generation
  4. Catalog API: Metadata management
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkSQLInterview") \
    .enableHiveSupport() \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .getOrCreate()

Temporary Views

Local Temporary View

Scoped to the current SparkSession. Lost when session ends.

# Create DataFrame
df = spark.read.parquet("s3a://bucket/employees/")

# Register as local temporary view
df.createOrReplaceTempView("employees")

# Also works with createTempView (throws error if exists)
df.createTempView("employees_v2")

# Query using SQL
result = spark.sql("SELECT * FROM employees WHERE age > 30")
result.show()

Global Temporary View

Shared across SparkSessions within the same Spark application. Stored in the global_temp database.

# Register as global temporary view
df.createOrReplaceGlobalTempView("global_employees")

# Query using global_temp database prefix
spark.sql("SELECT * FROM global_temp.global_employees WHERE age > 30").show()

# Access from different SparkSession
spark2 = SparkSession.builder.appName("SecondSession").getOrCreate()
spark2.sql("SELECT * FROM global_temp.global_employees").show()

⚠️Common Pitfall

Global temporary views are NOT persistent. They're stored in the in-memory catalog and lost when the Spark application stops. For persistent storage, use managed or external tables.

Managed vs External Tables

# Managed table: Spark manages both metadata and data
spark.sql("CREATE TABLE managed_employees (name STRING, age INT)")
# Data stored in spark.sql.warehouse.dir

# External table: Spark manages metadata, you manage data
spark.sql("""
    CREATE EXTERNAL TABLE external_employees (
        name STRING, 
        age INT
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LOCATION 's3a://bucket/external-employees/'
""")

# Insert data
spark.sql("INSERT INTO managed_employees SELECT * FROM employees")

SQL Query Fundamentals

Basic Queries

# Simple SELECT
spark.sql("SELECT name, age FROM employees").show()

# WHERE clause
spark.sql("""
    SELECT name, age, department 
    FROM employees 
    WHERE age > 30 AND department = 'Engineering'
""").show()

# ORDER BY
spark.sql("""
    SELECT name, age 
    FROM employees 
    ORDER BY age DESC
""").show()

# GROUP BY
spark.sql("""
    SELECT department, COUNT(*) as count, AVG(age) as avg_age
    FROM employees
    GROUP BY department
    ORDER BY count DESC
""").show()

Subqueries

# Scalar subquery
spark.sql("""
    SELECT name, age,
           (SELECT AVG(age) FROM employees) as company_avg_age
    FROM employees
""").show()

# IN subquery
spark.sql("""
    SELECT name, department
    FROM employees
    WHERE department IN (
        SELECT department FROM employees WHERE age > 35
    )
""").show()

# Correlated subquery
spark.sql("""
    SELECT e.name, e.age, e.department
    FROM employees e
    WHERE e.age > (
        SELECT AVG(e2.age) 
        FROM employees e2 
        WHERE e2.department = e.department
    )
""").show()

Common Table Expressions (CTEs)

# Single CTE
spark.sql("""
    WITH department_stats AS (
        SELECT department, COUNT(*) as count, AVG(age) as avg_age
        FROM employees
        GROUP BY department
    )
    SELECT d.name, d.age, d.department, ds.avg_age
    FROM employees d
    JOIN department_stats ds ON d.department = ds.department
    WHERE d.age > ds.avg_age
""").show()

# Multiple CTEs
spark.sql("""
    WITH 
    senior_employees AS (
        SELECT * FROM employees WHERE age >= 35
    ),
    dept_counts AS (
        SELECT department, COUNT(*) as senior_count
        FROM senior_employees
        GROUP BY department
    )
    SELECT e.name, e.age, e.department, dc.senior_count
    FROM employees e
    JOIN dept_counts dc ON e.department = dc.department
    ORDER BY dc.senior_count DESC
""").show()

Window Functions in SQL

spark.sql("""
    SELECT 
        name,
        age,
        department,
        ROW_NUMBER() OVER (PARTITION BY department ORDER BY age DESC) as rank,
        LAG(age, 1) OVER (PARTITION BY department ORDER BY age) as prev_age,
        LEAD(age, 1) OVER (PARTITION BY department ORDER BY age) as next_age,
        AVG(age) OVER (PARTITION BY department) as dept_avg_age,
        SUM(age) OVER (PARTITION BY department ORDER BY age ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
    FROM employees
""").show()

Registering UDFs for SQL

from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StringType, IntegerType
import pandas as pd

# Simple UDF for SQL
@udf(returnType=StringType())
def categorize_age(age):
    if age is None:
        return "Unknown"
    elif age < 25:
        return "Junior"
    elif age < 35:
        return "Mid-Level"
    else:
        return "Senior"

# Register UDF for SQL
spark.udf.register("categorize_age", categorize_age)

# Use in SQL
spark.sql("""
    SELECT name, age, categorize_age(age) as category
    FROM employees
""").show()

# Pandas UDF for SQL (Spark 3.0+)
@pandas_udf(StringType())
def upper_name(name: pd.Series) -> pd.Series:
    return name.str.upper()

spark.udf.register("upper_name", upper_name)

spark.sql("SELECT upper_name(name) FROM employees").show()

ℹ️Google Interview Insight

At Google, interviewers expect you to understand that UDFs bypass Catalyst optimization. When you register a UDF, the optimizer can't push predicates through it or optimize its execution. Always prefer built-in SQL functions when possible.


Catalog Operations

# List databases
spark.sql("SHOW DATABASES").show()

# Use database
spark.sql("USE my_database")

# List tables
spark.sql("SHOW TABLES").show()

# Describe table
spark.sql("DESCRIBE employees").show()

# Detailed table info
spark.sql("DESCRIBE EXTENDED employees").show()

# Show create table statement
spark.sql("SHOW CREATE TABLE employees").show()

# Drop table
spark.sql("DROP TABLE IF EXISTS temp_employees")

# Show partitions
spark.sql("SHOW PARTITIONS logs").show()

Real-World Scenario: Google Analytics Pipeline

Problem Statement

Build a complex SQL pipeline that analyzes user behavior across multiple sessions, computes cohort retention, and generates marketing insights using Spark SQL.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GoogleAnalyticsPipeline") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Register raw data as temporary views
page_views = spark.read.parquet("s3a://analytics/page-views/")
page_views.createOrReplaceTempView("page_views")

sessions = spark.read.parquet("s3a://analytics/sessions/")
sessions.createOrReplaceTempView("sessions")

users = spark.read.parquet("s3a://analytics/users/")
users.createOrReplaceTempView("users")

# Complex CTE-based analytics
cohort_analysis = spark.sql("""
    WITH 
    -- Define cohorts by signup month
    user_cohorts AS (
        SELECT 
            user_id,
            DATE_FORMAT(signup_date, 'yyyy-MM') as cohort_month,
            signup_date
        FROM users
    ),
    
    -- Monthly activity per user
    monthly_activity AS (
        SELECT 
            pv.user_id,
            DATE_FORMAT(pv.event_date, 'yyyy-MM') as activity_month,
            COUNT(*) as page_views,
            COUNT(DISTINCT pv.session_id) as sessions
        FROM page_views pv
        GROUP BY pv.user_id, DATE_FORMAT(pv.event_date, 'yyyy-MM')
    ),
    
    -- Cohort retention matrix
    cohort_retention AS (
        SELECT 
            uc.cohort_month,
            ma.activity_month,
            COUNT(DISTINCT ma.user_id) as active_users,
            DATEDIFF(
                TO_DATE(ma.activity_month, 'yyyy-MM'),
                TO_DATE(uc.cohort_month, 'yyyy-MM')
            ) / 30 as months_since_signup
        FROM user_cohorts uc
        JOIN monthly_activity ma ON uc.user_id = ma.user_id
        GROUP BY uc.cohort_month, ma.activity_month
    ),
    
    -- Cohort sizes
    cohort_sizes AS (
        SELECT 
            cohort_month,
            COUNT(DISTINCT user_id) as cohort_size
        FROM user_cohorts
        GROUP BY cohort_month
    )
    
    -- Final retention calculation
    SELECT 
        cr.cohort_month,
        cr.activity_month,
        cr.months_since_signup,
        cr.active_users,
        cs.cohort_size,
        ROUND(cr.active_users / cs.cohort_size * 100, 2) as retention_rate
    FROM cohort_retention cr
    JOIN cohort_sizes cs ON cr.cohort_month = cs.cohort_month
    WHERE cr.months_since_signup >= 0
    ORDER BY cr.cohort_month, cr.months_since_signup
""")

cohort_analysis.show(50, truncate=False)

# Session-level funnel analysis
funnel_analysis = spark.sql("""
    WITH 
    sessions_with_events AS (
        SELECT 
            s.session_id,
            s.user_id,
            s.start_time,
            MAX(CASE WHEN pv.page = '/home' THEN 1 ELSE 0 END) as visited_home,
            MAX(CASE WHEN pv.page = '/products' THEN 1 ELSE 0 END) as visited_products,
            MAX(CASE WHEN pv.page = '/cart' THEN 1 ELSE 0 END) as visited_cart,
            MAX(CASE WHEN pv.page = '/checkout' THEN 1 ELSE 0 END) as visited_checkout,
            MAX(CASE WHEN pv.page = '/confirmation' THEN 1 ELSE 0 END) as completed_purchase
        FROM sessions s
        LEFT JOIN page_views pv ON s.session_id = pv.session_id
        GROUP BY s.session_id, s.user_id, s.start_time
    )
    SELECT 
        COUNT(DISTINCT session_id) as total_sessions,
        SUM(visited_home) as home_visits,
        SUM(visited_products) as product_views,
        SUM(visited_cart) as cart_adds,
        SUM(visited_checkout) as checkout_starts,
        SUM(completed_purchase) as purchases,
        ROUND(SUM(completed_purchase) / COUNT(DISTINCT session_id) * 100, 2) as conversion_rate,
        ROUND(SUM(visited_cart) / NULLIF(SUM(visited_products), 0) * 100, 2) as product_to_cart_rate,
        ROUND(SUM(completed_purchase) / NULLIF(SUM(visited_cart), 0) * 100, 2) as cart_to_purchase_rate
    FROM sessions_with_events
""")

funnel_analysis.show(truncate=False)

spark.stop()

SQL Performance Optimization

Predicate Pushdown

# BAD: Reads all data, then filters
spark.sql("""
    SELECT * FROM (
        SELECT * FROM logs
    ) filtered
    WHERE date = '2024-01-01'
""")

# GOOD: Predicate pushed down to scan
spark.sql("""
    SELECT * FROM logs
    WHERE date = '2024-01-01'
""")

# Verify pushdown with EXPLAIN
spark.sql("EXPLAIN SELECT * FROM logs WHERE date = '2024-01-01'").show(truncate=False)

Join Optimization Hints

# Broadcast hint
spark.sql("""
    /*+ BROADCAST(small_dim) */
    SELECT l.*, d.category
    FROM large_fact l
    JOIN small_dim d ON l.dim_id = d.id
""")

# Shuffle hash join hint
spark.sql("""
    /*+ SHUFFLE_HASH(medium_table) */
    SELECT * FROM a JOIN b ON a.id = b.id
""")

# Sort-merge join hint
spark.sql("""
    /*+ SHUFFLE_MERGE(a, b) */
    SELECT * FROM a JOIN b ON a.id = b.id
""")

AQE Integration

spark.conf.set("spark.sql.adaptive.enabled", "true")

# AQE automatically:
# 1. Coalesces small partitions after shuffle
# 2. Converts sort-merge join to broadcast join if one side is small
# 3. Handles skewed joins
# 4. Optimizes skew joins

# View AQE decisions
spark.sql("EXPLAIN FORMATTED SELECT ...").show(truncate=False)

πŸ’‘Microsoft Pro Tip

In Azure Synapse Analytics, Spark SQL integrates with serverless SQL pools. Use CTAS (CREATE TABLE AS SELECT) for materializing query results, and leverage distribution hints (ROUNDROBIN, HASH, REPLICATE) for optimal data distribution.


Dynamic SQL and Parameterized Queries

# Parameterized queries
table_name = "employees"
min_age = 30

# Using f-string (NOT recommended for production β€” SQL injection risk)
spark.sql(f"SELECT * FROM {table_name} WHERE age > {min_age}")

# Using temporary variables (safe)
spark.sql("SELECT * FROM {} WHERE age > {}".format(table_name, min_age))

# Using spark.sql.variable.substitute (Spark 3.0+)
spark.conf.set("spark.sql.variable.substitute", "true")
spark.sql("SELECT * FROM ${table_name} WHERE age > ${min_age}")

# Using Column objects (most Pythonic)
from pyspark.sql.functions import col
df = spark.table(table_name).filter(col("age") > min_age)

Edge Cases

1. Case Sensitivity

# Spark SQL is case-insensitive for table/column names by default
spark.sql("SELECT Name FROM EMPLOYEES")  # Works

# Make it case-sensitive
spark.conf.set("spark.sql.caseSensitive", "true")

# Quote identifiers for case-sensitive names
spark.sql('SELECT `Name` FROM `Employees` WHERE `Name` = "Alice"')

2. Complex Types in SQL

# Array functions
spark.sql("""
    SELECT 
        name,
        skills[0] as first_skill,
        SIZE(skills) as skill_count,
        ARRAY_CONTAINS(skills, 'Python') as knows_python
    FROM employees
""")

# Map functions
spark.sql("""
    SELECT 
        name,
        attributes['department'] as dept,
        MAP_KEYS(attributes) as attr_keys
    FROM employees
""")

# Struct functions
spark.sql("""
    SELECT 
        name,
        address.city,
        address.zipcode
    FROM employees
""")

3. Dynamic Partition Pruning

# Spark automatically prunes partitions based on WHERE clauses
# Ensure your partition column is used directly
spark.sql("""
    SELECT * FROM logs 
    WHERE date = '2024-01-01'  -- Prunes to single partition
""")

# Verify pruning in explain plan
spark.sql("EXPLAIN SELECT * FROM logs WHERE date = '2024-01-01'").show(truncate=False)

DataFrame API vs SQL

FeatureDataFrame APISpark SQL
Type SafetyPartial (runtime)None
ReadabilityCode-basedDeclarative
OptimizationSame (Catalyst)Same (Catalyst)
UDF SupportYesYes (registered)
IDE SupportBetterLimited
Complex LogicMore verboseMore concise
Team CollaborationBetter for devsBetter for analysts

ℹ️Interview Tip

At Google and Microsoft, interviewers often ask you to solve the same problem using both DataFrame API and SQL. Practice doing both fluently. SQL is often more concise for complex aggregations; DataFrame API is better for programmatic transformations.


Best Practices

πŸ’‘Production SQL Best Practices

  • Use CTEs for readability instead of nested subqueries
  • Always alias tables in joins to avoid ambiguity
  • Use EXPLAIN to verify query plans before production deployment
  • Register UDFs only when built-in functions don't suffice
  • Prefer parameterized queries to prevent SQL injection
  • Use broadcast hints for small dimension tables
  • Enable AQE for automatic optimization
  • Cache results for repeated queries

Summary

Spark SQL provides a powerful SQL interface to Spark's distributed computing engine. Mastering temporary views, CTEs, window functions, and UDF registration is essential for Google and Microsoft data engineering roles. Understanding how Catalyst optimizes SQL queries helps you write performant code and debug slow queries effectively.

Advertisement