Big Data: Spark Fundamentals

Module 4: Specialization + CareerFree Lesson

Advertisement

Big Data: Spark Fundamentals

Overview

Apache Spark is a unified analytics engine for large-scale data processing. It provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Spark is the de facto standard for big data processing, running 100x faster than Hadoop MapReduce in memory. Its key innovation is the Resilient Distributed Dataset (RDD) abstraction, which enables fault-tolerant parallel computation.


Why Spark?

The Big Data Problem

Architecture Diagram
Data Size Growth:
                                                          
1 TB   |--|                                               
10 TB  |---|                                              
100 TB |------|                                           
1 PB   |-----------|                                      
10 PB  |-------------------|                              
       +-------------------------> Time
       
Traditional tools fail at scale:
- Single machine: Limited by RAM (typically 16-64 GB)
- Hadoop MapReduce: Disk I/O bottleneck, slow iteration
- Pandas: Single-node, memory-bound

Spark vs Alternatives

FeaturePandasHadoop MRSpark
SpeedMediumSlowFast
ScalabilitySingle nodeDistributedDistributed
Ease of useHighLowMedium
MemoryIn-memoryDisk-basedIn-memory
IterationEasyHardEasy
LanguagePythonJavaPython, Scala, R, SQL

Spark Architecture

Architecture Diagram
                    +------------------+
                    |    Driver        |
                    |    Program       |
                    +--------+---------+
                             |
                    +--------v---------+
                    |    SparkContext   |
                    |    / Session     |
                    +--------+---------+
                             |
              +--------------+--------------+
              |              |              |
     +--------v----+  +-----v------+  +----v--------+
     |   Executor  |  |  Executor  |  |  Executor   |
     |   (Worker)  |  |  (Worker)  |  |  (Worker)   |
     +------+------+  +------+-----+  +------+------+
            |               |               |
     +------v------+  +-----v-------+  +----v--------+
     |   Cache     |  |   Cache     |  |   Cache     |
     |   (Memory)  |  |   (Memory)  |  |   (Memory)  |
     +-------------+  +-------------+  +-------------+

RDDs (Resilient Distributed Datasets)

Core Concept

DfRDD (Resilient Distributed Dataset)

RDD is an immutable, partitioned collection of elements that can be operated on in parallel. RDDs are the fundamental abstraction in Spark — they are fault-tolerant (can be recomputed from lineage), distributed (split across nodes), and lazy (transformations are deferred until an action is called).

Architecture Diagram
RDD Operations:

Transformations (Lazy):
  map()        -> Apply function to each element
  filter()     -> Keep elements matching predicate
  flatMap()    -> Map then flatten
  groupByKey() -> Group values by key
  reduceByKey() -> Combine values by key
  join()       -> Join two RDDs by key

Actions (Trigger computation):
  count()      -> Number of elements
  collect()    -> Return all elements
  reduce()     -> Aggregate elements
  first()      -> First element
  take(n)      -> First n elements
  saveAsTextFile() -> Write to disk

â„šī¸ Lazy Evaluation

Spark transformations are lazy — they build a DAG (Directed Acyclic Graph) of operations but don't execute until an action is called. This allows Spark to optimize the execution plan before running any computation.

RDD API Examples

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Initialize Spark
conf = SparkConf().setAppName("RDD_Example").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Create RDD from list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)

# Transformations (lazy)
squared = rdd.map(lambda x: x ** 2)
evens = rdd.filter(lambda x: x % 2 == 0)

# Actions (trigger computation)
print(f"Original: {rdd.collect()}")
print(f"Squared: {squared.collect()}")
print(f"Evens: {evens.collect()}")
print(f"Sum: {rdd.reduce(lambda a, b: a + b)}")
print(f"Count: {rdd.count()}")

# Word count example
text_rdd = sc.parallelize([
    "hello world",
    "hello spark",
    "spark is fast",
    "hello fast world"
])

word_counts = (text_rdd
    .flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda x: x[1], ascending=False))

print("\nWord Counts:")
for word, count in word_counts.collect():
    print(f"  {word}: {count}")

Spark DataFrames

Why DataFrames?

Architecture Diagram
RDD vs DataFrame:

RDD:
  - Low-level control
  - No schema optimization
  - No query optimization
  - Functional programming style

DataFrame:
  - High-level API (like Pandas)
  - Schema enforcement
  - Catalyst query optimizer
  - Tungsten execution engine
  - SQL support

💡 Prefer DataFrames over RDDs

For most use cases, DataFrames are preferred because Spark's Catalyst optimizer can optimize the query plan (predicate pushdown, column pruning, join reordering) in ways that are impossible with low-level RDD operations.

DataFrame Operations

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DataFrame_Example") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Create DataFrame from list
data = [
    ("Alice", 25, "Engineering", 85000),
    ("Bob", 30, "Marketing", 72000),
    ("Charlie", 35, "Engineering", 95000),
    ("Diana", 28, "Sales", 68000),
    ("Eve", 32, "Marketing", 78000),
    ("Frank", 40, "Engineering", 110000),
    ("Grace", 27, "Sales", 71000),
]

columns = ["name", "age", "department", "salary"]
df = spark.createDataFrame(data, columns)

# Basic operations
print("Schema:")
df.printSchema()

print("\nFirst 5 rows:")
df.show(5)

print(f"\nRow count: {df.count()}")
print(f"Column names: {df.columns}")

# Transformations
result = df \
    .filter(F.col("salary") > 75000) \
    .groupBy("department") \
    .agg(
        F.count("*").alias("employee_count"),
        F.avg("salary").alias("avg_salary"),
        F.max("salary").alias("max_salary"),
        F.stddev("salary").alias("salary_stddev")
    ) \
    .orderBy(F.col("avg_salary").desc())

print("\nDepartment Statistics:")
result.show()

# Window functions
from pyspark.sql.window import Window

window_spec = Window.partitionBy("department").orderBy(F.col("salary").desc())

df_with_rank = df.withColumn(
    "salary_rank", F.rank().over(window_spec)
).withColumn(
    "salary_pct", F.percent_rank().over(window_spec)
)

print("\nSalary Rankings by Department:")
df_with_rank.show()

Complex DataFrame Operations

# Create sales data
sales_data = [
    (2024, 1, "Electronics", "Laptop", 1200, 150),
    (2024, 1, "Electronics", "Phone", 800, 300),
    (2024, 2, "Electronics", "Laptop", 1250, 160),
    (2024, 2, "Clothing", "Shirt", 50, 500),
    (2024, 3, "Clothing", "Shirt", 55, 480),
    (2024, 3, "Electronics", "Phone", 820, 290),
]

sales_df = spark.createDataFrame(sales_data, 
    ["year", "month", "category", "product", "price", "quantity"])

# Complex analysis
analysis = sales_df \
    .withColumn("revenue", F.col("price") * F.col("quantity")) \
    .withColumn("month_date", F.make_date(F.col("year"), F.col("month"), F.lit(1))) \
    .groupBy("category", "product") \
    .agg(
        F.sum("revenue").alias("total_revenue"),
        F.sum("quantity").alias("total_units"),
        F.avg("price").alias("avg_price"),
        F.countDistinct("month").alias("months_active")
    ) \
    .withColumn("revenue_per_unit", F.col("total_revenue") / F.col("total_units")) \
    .orderBy(F.col("total_revenue").desc())

print("\nSales Analysis:")
analysis.show()

# Window functions for time series
ts_window = Window.partitionBy("product").orderBy("month")

monthly_trend = sales_df \
    .withColumn("revenue", F.col("price") * F.col("quantity")) \
    .withColumn("prev_month_revenue", F.lag("revenue").over(ts_window)) \
    .withColumn("mom_growth", 
        (F.col("revenue") - F.col("prev_month_revenue")) / F.col("prev_month_revenue") * 100
    )

print("\nMonth-over-Month Growth:")
monthly_trend.show()

Spark SQL

SQL Interface

# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")

# Run SQL queries
sql_result = spark.sql("""
    SELECT 
        department,
        COUNT(*) as headcount,
        AVG(salary) as avg_salary,
        MAX(salary) - MIN(salary) as salary_range
    FROM employees
    GROUP BY department
    HAVING COUNT(*) > 1
    ORDER BY avg_salary DESC
""")

sql_result.show()

# Create permanent tables
df.write.saveAsTable("employee_data")

# Query with complex SQL
spark.sql("""
    WITH dept_stats AS (
        SELECT 
            department,
            AVG(salary) as avg_salary,
            STDDEV(salary) as salary_std
        FROM employees
        GROUP BY department
    )
    SELECT 
        e.name,
        e.department,
        e.salary,
        (e.salary - ds.avg_salary) / ds.salary_std as z_score
    FROM employees e
    JOIN dept_stats ds ON e.department = ds.department
    WHERE ABS((e.salary - ds.avg_salary) / ds.salary_std) > 1
""").show()

Distributed Computing Concepts

Partitioning

DfPartitioning

Partitioning is the process of dividing data across multiple nodes for parallel processing. The number of partitions determines the degree of parallelism — ideally one partition per CPU core.

Architecture Diagram
Data Partitioning:

Original Data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Partition 0: [1, 2, 3, 4]
Partition 1: [5, 6, 7, 8]
Partition 2: [9, 10]

Parallel Processing:
  Partition 0 -> Core 0: sum = 10
  Partition 1 -> Core 1: sum = 26
  Partition 2 -> Core 2: sum = 19
  
  Final sum = 10 + 26 + 19 = 55

Shuffle Operations

Architecture Diagram
Shuffle (expensive operation):

Before Shuffle:              After Shuffle:
Partition 0:                 Partition 0:
  (A, 1)                       (A, 1)
  (B, 2)                       (A, 4)
  (A, 3)                       (A, 7)
                               
Partition 1:                 Partition 1:
  (B, 4)                       (B, 2)
  (A, 5)                       (B, 4)
  (B, 6)                       (B, 6)

Data must be redistributed across network!

âš ī¸ Shuffle Operations

Shuffles are the most expensive operation in Spark. They require data to be redistributed across the network, involving disk I/O, serialization, and network transfer. Minimize shuffles by partitioning wisely, using map-side aggregations (reduceByKey vs groupByKey), and broadcasting small tables.

Caching and Persistence

# Cache in memory
df.cache()  # or df.persist(StorageLevel.MEMORY_AND_DISK)

# Persist to disk
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)

# Unpersist when done
df.unpersist()

# Check if cached
print(f"Is cached: {df.is_cached}")

MLlib for Machine Learning

ML Pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    VectorAssembler, StringIndexer, OneHotEncoder, 
    StandardScaler, Imputer
)
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Create sample dataset
data = [
    (25, "Engineer", "Male", 85000, 1),
    (30, "Manager", "Female", 95000, 1),
    (35, "Engineer", "Male", 110000, 0),
    (28, "Analyst", "Female", 72000, 1),
    (40, "Manager", "Male", 120000, 0),
    (32, "Analyst", "Male", 78000, 1),
    (27, "Engineer", "Female", 92000, 1),
    (45, "Manager", "Female", 135000, 0),
    (29, "Analyst", "Male", 75000, 1),
    (38, "Engineer", "Female", 105000, 0),
] * 100  # Replicate for more data

columns = ["age", "job", "gender", "salary", "promoted"]
df = spark.createDataFrame(data, columns)

# Feature Engineering Pipeline
# 1. Index categorical columns
job_indexer = StringIndexer(inputCol="job", outputCol="job_index")
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

# 2. One-hot encode
job_encoder = OneHotEncoder(inputCol="job_index", outputCol="job_vec")
gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_vec")

# 3. Assemble features
assembler = VectorAssembler(
    inputCols=["age", "salary", "job_vec", "gender_vec"],
    outputCol="features_raw"
)

# 4. Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features")

# 5. Define classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="promoted",
    numTrees=100,
    maxDepth=5,
    seed=42
)

# Build pipeline
pipeline = Pipeline(stages=[
    job_indexer, gender_indexer,
    job_encoder, gender_encoder,
    assembler, scaler, rf
])

# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)

# Evaluate
evaluator = BinaryClassificationEvaluator(
    labelCol="promoted",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.4f}")

# Show predictions
predictions.select("age", "job", "salary", "promoted", "probability", "prediction").show(5)

# Cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [3, 5, 7]) \
    .build()

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    seed=42
)

cv_model = crossval.fit(train_df)
cv_auc = evaluator.evaluate(cv_model.transform(test_df))
print(f"Cross-validated AUC: {cv_auc:.4f}")

Regression Example

from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Create housing data
housing_data = [
    (1500, 3, 2, 10, 350000),
    (2000, 4, 3, 5, 450000),
    (1200, 2, 1, 15, 280000),
    (1800, 3, 2, 8, 400000),
    (2500, 5, 4, 3, 550000),
    (1600, 3, 2, 12, 320000),
    (2200, 4, 3, 6, 480000),
    (1400, 2, 2, 20, 300000),
] * 50

housing_df = spark.createDataFrame(housing_data,
    ["sqft", "bedrooms", "bathrooms", "age", "price"])

# Feature assembly
assembler = VectorAssembler(
    inputCols=["sqft", "bedrooms", "bathrooms", "age"],
    outputCol="features"
)

# Scale
scaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Models
lr = LinearRegression(featuresCol="features_scaled", labelCol="price")
gbt = GBTRegressor(featuresCol="features_scaled", labelCol="price", maxIter=100)

# Pipeline
lr_pipeline = Pipeline(stages=[assembler, scaler, lr])
gbt_pipeline = Pipeline(stages=[assembler, scaler, gbt])

# Split and train
train, test = housing_df.randomSplit([0.8, 0.2], seed=42)

lr_model = lr_pipeline.fit(train)
gbt_model = gbt_pipeline.fit(train)

# Evaluate
evaluator = RegressionEvaluator(labelCol="price", metricName="rmse")
lr_pred = lr_model.transform(test)
gbt_pred = gbt_model.transform(test)

print(f"Linear Regression RMSE: {evaluator.evaluate(lr_pred):,.2f}")
print(f"GBT RMSE: {evaluator.evaluate(gbt_pred):,.2f}")

# Feature importance (GBT)
gbt_fitted = gbt_model.stages[-1]
print("\nFeature Importance (GBT):")
for feat, imp in zip(["sqft", "bedrooms", "bathrooms", "age"], gbt_fitted.featureImportances):
    print(f"  {feat}: {imp:.4f}")

Python Implementation: Complete Spark Pipeline

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

class SparkMLPipeline:
    """Complete Spark ML pipeline for classification."""
    
    def __init__(self, app_name="ML_Pipeline"):
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.shuffle.partitions", "200") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "2g") \
            .getOrCreate()
        
        self.spark.sparkContext.setLogLevel("WARN")
    
    def load_data(self, path=None, data=None, schema=None):
        """Load data from path or create from list."""
        if data and schema:
            return self.spark.createDataFrame(data, schema)
        elif path:
            return self.spark.read.parquet(path)
        else:
            raise ValueError("Provide either path or data with schema")
    
    def explore_data(self, df, name="Dataset"):
        """Perform basic EDA."""
        print(f"\n{'='*60}")
        print(f"Exploration: {name}")
        print(f"{'='*60}")
        
        print(f"\nRows: {df.count():,}")
        print(f"Columns: {len(df.columns)}")
        
        print("\nSchema:")
        df.printSchema()
        
        print("\nStatistics:")
        df.describe().show()
        
        print("\nNull counts:")
        df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
        
        return df
    
    def build_pipeline(self, categorical_cols, numerical_cols, label_col, 
                       n_trees=100, max_depth=5):
        """Build ML pipeline."""
        stages = []
        
        # Index and encode categorical columns
        for col in categorical_cols:
            indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
            stages.append(indexer)
        
        # Assemble features
        feature_cols = numerical_cols + [f"{c}_index" for c in categorical_cols]
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
        stages.append(assembler)
        
        # Scale features
        scaler = StandardScaler(inputCol="features_raw", outputCol="features")
        stages.append(scaler)
        
        # Classifier
        rf = RandomForestClassifier(
            featuresCol="features",
            labelCol=label_col,
            numTrees=n_trees,
            maxDepth=max_depth,
            seed=42
        )
        stages.append(rf)
        
        return Pipeline(stages=stages)
    
    def train_and_evaluate(self, pipeline, train_df, test_df, label_col):
        """Train model and evaluate."""
        start_time = time.time()
        
        model = pipeline.fit(train_df)
        train_time = time.time() - start_time
        
        start_time = time.time()
        predictions = model.transform(test_df)
        predict_time = time.time() - start_time
        
        # Metrics
        evaluator_acc = MulticlassClassificationEvaluator(
            labelCol=label_col, metricName="accuracy")
        evaluator_f1 = MulticlassClassificationEvaluator(
            labelCol=label_col, metricName="f1")
        
        accuracy = evaluator_acc.evaluate(predictions)
        f1 = evaluator_f1.evaluate(predictions)
        
        print(f"\nTraining time: {train_time:.2f}s")
        print(f"Prediction time: {predict_time:.2f}s")
        print(f"Accuracy: {accuracy:.4f}")
        print(f"F1 Score: {f1:.4f}")
        
        return model, predictions
    
    def stop(self):
        """Stop Spark session."""
        self.spark.stop()

# Example usage
if __name__ == "__main__":
    # Initialize pipeline
    pipeline = SparkMLPipeline("CustomerChurn_Prediction")
    
    # Sample data
    data = [
        (25, "Male", "Yes", "No", 1),
        (30, "Female", "Yes", "Yes", 0),
        (35, "Male", "No", "Yes", 1),
        (28, "Female", "Yes", "No", 0),
        (40, "Male", "No", "No", 1),
    ] * 200
    
    schema = ["age", "gender", "partner", "churn", "label"]
    
    df = pipeline.load_data(data=data, schema=schema)
    pipeline.explore_data(df, "Customer Data")
    
    # Split data
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
    
    # Build and train
    ml_pipeline = pipeline.build_pipeline(
        categorical_cols=["gender", "partner"],
        numerical_cols=["age"],
        label_col="label"
    )
    
    model, predictions = pipeline.train_and_evaluate(
        ml_pipeline, train_df, test_df, "label"
    )
    
    # Show predictions
    predictions.select("age", "gender", "label", "probability", "prediction").show(5)
    
    pipeline.stop()

Performance Optimization

Partitioning Strategy

# Repartition by key
df = df.repartition(200, "department")

# Coalesce to reduce partitions
df = df.coalesce(10)

# Check partition count
print(f"Partitions: {df.rdd.getNumPartitions()}")

Broadcast Variables

# Broadcast small lookup table
lookup = {"US": "United States", "UK": "United Kingdom"}
broadcast_lookup = sc.broadcast(lookup)

# Use in transformation
df = df.withColumn("country_name", 
    F.udf(lambda x: broadcast_lookup.value.get(x, x))(F.col("country_code")))

Join Strategies

# Broadcast join for small tables
from pyspark.sql.functions import broadcast

result = large_df.join(broadcast(small_df), "key")

# Sort-merge join (default for large tables)
result = large_df.join(medium_df, "key")

💡 Broadcast Joins

Use broadcast joins for small tables (typically < 10MB) to avoid expensive shuffle operations. The small table is sent to all executors, eliminating the need for data redistribution.


Key Takeaways

📋Summary: Spark Fundamentals

  1. Spark is the standard for big data processing in Python — its RDD abstraction enables fault-tolerant distributed computation
  2. DataFrames > RDDs for most use cases — Catalyst optimizer enables query optimization impossible with RDDs
  3. Lazy evaluation: Transformations build a DAG; actions trigger execution — allowing whole-stage code generation
  4. Shuffles are expensive: Minimize by partitioning wisely, using map-side aggregations, and broadcasting small tables
  5. MLlib provides scalable ML algorithms — pipelines enable reproducible feature engineering and model training
  6. Caching helps when reusing DataFrames — persist intermediate results for iterative algorithms
  7. Broadcast joins for small tables to avoid shuffles — the most common performance optimization

Practice Exercises

Exercise 1: DataFrame Operations

Create a DataFrame with 1 million rows and perform:

  • GroupBy with multiple aggregations
  • Window functions for running totals
  • Complex SQL queries

Exercise 2: ETL Pipeline

Build a complete ETL pipeline that:

  1. Reads raw data
  2. Cleans and transforms
  3. Features engineering
  4. Writes to Parquet

Exercise 3: ML on Big Data

Train a classification model on a large dataset using MLlib with proper cross-validation.

Exercise 4: Performance Tuning

Optimize a slow Spark job by:

  • Repartitioning data
  • Using broadcast joins
  • Caching intermediate results

Discussion Questions

  1. When would you choose Spark over Pandas?
  2. How do you handle data skew in Spark?
  3. What are the cost implications of Spark in the cloud?

Advertisement

Need Expert Data Science Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement