Big Data: Spark Fundamentals
Overview
Apache Spark is a unified analytics engine for large-scale data processing. It provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Spark is the de facto standard for big data processing, running 100x faster than Hadoop MapReduce in memory. Its key innovation is the Resilient Distributed Dataset (RDD) abstraction, which enables fault-tolerant parallel computation.
Why Spark?
The Big Data Problem
Data Size Growth:
1 TB |--|
10 TB |---|
100 TB |------|
1 PB |-----------|
10 PB |-------------------|
+-------------------------> Time
Traditional tools fail at scale:
- Single machine: Limited by RAM (typically 16-64 GB)
- Hadoop MapReduce: Disk I/O bottleneck, slow iteration
- Pandas: Single-node, memory-bound
Spark vs Alternatives
| Feature | Pandas | Hadoop MR | Spark |
|---|---|---|---|
| Speed | Medium | Slow | Fast |
| Scalability | Single node | Distributed | Distributed |
| Ease of use | High | Low | Medium |
| Memory | In-memory | Disk-based | In-memory |
| Iteration | Easy | Hard | Easy |
| Language | Python | Java | Python, Scala, R, SQL |
Spark Architecture
+------------------+
| Driver |
| Program |
+--------+---------+
|
+--------v---------+
| SparkContext |
| / Session |
+--------+---------+
|
+--------------+--------------+
| | |
+--------v----+ +-----v------+ +----v--------+
| Executor | | Executor | | Executor |
| (Worker) | | (Worker) | | (Worker) |
+------+------+ +------+-----+ +------+------+
| | |
+------v------+ +-----v-------+ +----v--------+
| Cache | | Cache | | Cache |
| (Memory) | | (Memory) | | (Memory) |
+-------------+ +-------------+ +-------------+
RDDs (Resilient Distributed Datasets)
Core Concept
DfRDD (Resilient Distributed Dataset)
RDD is an immutable, partitioned collection of elements that can be operated on in parallel. RDDs are the fundamental abstraction in Spark â they are fault-tolerant (can be recomputed from lineage), distributed (split across nodes), and lazy (transformations are deferred until an action is called).
RDD Operations:
Transformations (Lazy):
map() -> Apply function to each element
filter() -> Keep elements matching predicate
flatMap() -> Map then flatten
groupByKey() -> Group values by key
reduceByKey() -> Combine values by key
join() -> Join two RDDs by key
Actions (Trigger computation):
count() -> Number of elements
collect() -> Return all elements
reduce() -> Aggregate elements
first() -> First element
take(n) -> First n elements
saveAsTextFile() -> Write to disk
âšī¸ Lazy Evaluation
Spark transformations are lazy â they build a DAG (Directed Acyclic Graph) of operations but don't execute until an action is called. This allows Spark to optimize the execution plan before running any computation.
RDD API Examples
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Initialize Spark
conf = SparkConf().setAppName("RDD_Example").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create RDD from list
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
# Transformations (lazy)
squared = rdd.map(lambda x: x ** 2)
evens = rdd.filter(lambda x: x % 2 == 0)
# Actions (trigger computation)
print(f"Original: {rdd.collect()}")
print(f"Squared: {squared.collect()}")
print(f"Evens: {evens.collect()}")
print(f"Sum: {rdd.reduce(lambda a, b: a + b)}")
print(f"Count: {rdd.count()}")
# Word count example
text_rdd = sc.parallelize([
"hello world",
"hello spark",
"spark is fast",
"hello fast world"
])
word_counts = (text_rdd
.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
.sortBy(lambda x: x[1], ascending=False))
print("\nWord Counts:")
for word, count in word_counts.collect():
print(f" {word}: {count}")
Spark DataFrames
Why DataFrames?
RDD vs DataFrame:
RDD:
- Low-level control
- No schema optimization
- No query optimization
- Functional programming style
DataFrame:
- High-level API (like Pandas)
- Schema enforcement
- Catalyst query optimizer
- Tungsten execution engine
- SQL support
đĄ Prefer DataFrames over RDDs
For most use cases, DataFrames are preferred because Spark's Catalyst optimizer can optimize the query plan (predicate pushdown, column pruning, join reordering) in ways that are impossible with low-level RDD operations.
DataFrame Operations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Initialize Spark Session
spark = SparkSession.builder \
.appName("DataFrame_Example") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Create DataFrame from list
data = [
("Alice", 25, "Engineering", 85000),
("Bob", 30, "Marketing", 72000),
("Charlie", 35, "Engineering", 95000),
("Diana", 28, "Sales", 68000),
("Eve", 32, "Marketing", 78000),
("Frank", 40, "Engineering", 110000),
("Grace", 27, "Sales", 71000),
]
columns = ["name", "age", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Basic operations
print("Schema:")
df.printSchema()
print("\nFirst 5 rows:")
df.show(5)
print(f"\nRow count: {df.count()}")
print(f"Column names: {df.columns}")
# Transformations
result = df \
.filter(F.col("salary") > 75000) \
.groupBy("department") \
.agg(
F.count("*").alias("employee_count"),
F.avg("salary").alias("avg_salary"),
F.max("salary").alias("max_salary"),
F.stddev("salary").alias("salary_stddev")
) \
.orderBy(F.col("avg_salary").desc())
print("\nDepartment Statistics:")
result.show()
# Window functions
from pyspark.sql.window import Window
window_spec = Window.partitionBy("department").orderBy(F.col("salary").desc())
df_with_rank = df.withColumn(
"salary_rank", F.rank().over(window_spec)
).withColumn(
"salary_pct", F.percent_rank().over(window_spec)
)
print("\nSalary Rankings by Department:")
df_with_rank.show()
Complex DataFrame Operations
# Create sales data
sales_data = [
(2024, 1, "Electronics", "Laptop", 1200, 150),
(2024, 1, "Electronics", "Phone", 800, 300),
(2024, 2, "Electronics", "Laptop", 1250, 160),
(2024, 2, "Clothing", "Shirt", 50, 500),
(2024, 3, "Clothing", "Shirt", 55, 480),
(2024, 3, "Electronics", "Phone", 820, 290),
]
sales_df = spark.createDataFrame(sales_data,
["year", "month", "category", "product", "price", "quantity"])
# Complex analysis
analysis = sales_df \
.withColumn("revenue", F.col("price") * F.col("quantity")) \
.withColumn("month_date", F.make_date(F.col("year"), F.col("month"), F.lit(1))) \
.groupBy("category", "product") \
.agg(
F.sum("revenue").alias("total_revenue"),
F.sum("quantity").alias("total_units"),
F.avg("price").alias("avg_price"),
F.countDistinct("month").alias("months_active")
) \
.withColumn("revenue_per_unit", F.col("total_revenue") / F.col("total_units")) \
.orderBy(F.col("total_revenue").desc())
print("\nSales Analysis:")
analysis.show()
# Window functions for time series
ts_window = Window.partitionBy("product").orderBy("month")
monthly_trend = sales_df \
.withColumn("revenue", F.col("price") * F.col("quantity")) \
.withColumn("prev_month_revenue", F.lag("revenue").over(ts_window)) \
.withColumn("mom_growth",
(F.col("revenue") - F.col("prev_month_revenue")) / F.col("prev_month_revenue") * 100
)
print("\nMonth-over-Month Growth:")
monthly_trend.show()
Spark SQL
SQL Interface
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")
# Run SQL queries
sql_result = spark.sql("""
SELECT
department,
COUNT(*) as headcount,
AVG(salary) as avg_salary,
MAX(salary) - MIN(salary) as salary_range
FROM employees
GROUP BY department
HAVING COUNT(*) > 1
ORDER BY avg_salary DESC
""")
sql_result.show()
# Create permanent tables
df.write.saveAsTable("employee_data")
# Query with complex SQL
spark.sql("""
WITH dept_stats AS (
SELECT
department,
AVG(salary) as avg_salary,
STDDEV(salary) as salary_std
FROM employees
GROUP BY department
)
SELECT
e.name,
e.department,
e.salary,
(e.salary - ds.avg_salary) / ds.salary_std as z_score
FROM employees e
JOIN dept_stats ds ON e.department = ds.department
WHERE ABS((e.salary - ds.avg_salary) / ds.salary_std) > 1
""").show()
Distributed Computing Concepts
Partitioning
DfPartitioning
Partitioning is the process of dividing data across multiple nodes for parallel processing. The number of partitions determines the degree of parallelism â ideally one partition per CPU core.
Data Partitioning:
Original Data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Partition 0: [1, 2, 3, 4]
Partition 1: [5, 6, 7, 8]
Partition 2: [9, 10]
Parallel Processing:
Partition 0 -> Core 0: sum = 10
Partition 1 -> Core 1: sum = 26
Partition 2 -> Core 2: sum = 19
Final sum = 10 + 26 + 19 = 55
Shuffle Operations
Shuffle (expensive operation):
Before Shuffle: After Shuffle:
Partition 0: Partition 0:
(A, 1) (A, 1)
(B, 2) (A, 4)
(A, 3) (A, 7)
Partition 1: Partition 1:
(B, 4) (B, 2)
(A, 5) (B, 4)
(B, 6) (B, 6)
Data must be redistributed across network!
â ī¸ Shuffle Operations
Shuffles are the most expensive operation in Spark. They require data to be redistributed across the network, involving disk I/O, serialization, and network transfer. Minimize shuffles by partitioning wisely, using map-side aggregations (reduceByKey vs groupByKey), and broadcasting small tables.
Caching and Persistence
# Cache in memory
df.cache() # or df.persist(StorageLevel.MEMORY_AND_DISK)
# Persist to disk
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
# Unpersist when done
df.unpersist()
# Check if cached
print(f"Is cached: {df.is_cached}")
MLlib for Machine Learning
ML Pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
VectorAssembler, StringIndexer, OneHotEncoder,
StandardScaler, Imputer
)
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Create sample dataset
data = [
(25, "Engineer", "Male", 85000, 1),
(30, "Manager", "Female", 95000, 1),
(35, "Engineer", "Male", 110000, 0),
(28, "Analyst", "Female", 72000, 1),
(40, "Manager", "Male", 120000, 0),
(32, "Analyst", "Male", 78000, 1),
(27, "Engineer", "Female", 92000, 1),
(45, "Manager", "Female", 135000, 0),
(29, "Analyst", "Male", 75000, 1),
(38, "Engineer", "Female", 105000, 0),
] * 100 # Replicate for more data
columns = ["age", "job", "gender", "salary", "promoted"]
df = spark.createDataFrame(data, columns)
# Feature Engineering Pipeline
# 1. Index categorical columns
job_indexer = StringIndexer(inputCol="job", outputCol="job_index")
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
# 2. One-hot encode
job_encoder = OneHotEncoder(inputCol="job_index", outputCol="job_vec")
gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_vec")
# 3. Assemble features
assembler = VectorAssembler(
inputCols=["age", "salary", "job_vec", "gender_vec"],
outputCol="features_raw"
)
# 4. Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
# 5. Define classifier
rf = RandomForestClassifier(
featuresCol="features",
labelCol="promoted",
numTrees=100,
maxDepth=5,
seed=42
)
# Build pipeline
pipeline = Pipeline(stages=[
job_indexer, gender_indexer,
job_encoder, gender_encoder,
assembler, scaler, rf
])
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Train model
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
# Evaluate
evaluator = BinaryClassificationEvaluator(
labelCol="promoted",
metricName="areaUnderROC"
)
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.4f}")
# Show predictions
predictions.select("age", "job", "salary", "promoted", "probability", "prediction").show(5)
# Cross-validation
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [3, 5, 7]) \
.build()
crossval = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
seed=42
)
cv_model = crossval.fit(train_df)
cv_auc = evaluator.evaluate(cv_model.transform(test_df))
print(f"Cross-validated AUC: {cv_auc:.4f}")
Regression Example
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
# Create housing data
housing_data = [
(1500, 3, 2, 10, 350000),
(2000, 4, 3, 5, 450000),
(1200, 2, 1, 15, 280000),
(1800, 3, 2, 8, 400000),
(2500, 5, 4, 3, 550000),
(1600, 3, 2, 12, 320000),
(2200, 4, 3, 6, 480000),
(1400, 2, 2, 20, 300000),
] * 50
housing_df = spark.createDataFrame(housing_data,
["sqft", "bedrooms", "bathrooms", "age", "price"])
# Feature assembly
assembler = VectorAssembler(
inputCols=["sqft", "bedrooms", "bathrooms", "age"],
outputCol="features"
)
# Scale
scaler = StandardScaler(inputCol="features", outputCol="features_scaled")
# Models
lr = LinearRegression(featuresCol="features_scaled", labelCol="price")
gbt = GBTRegressor(featuresCol="features_scaled", labelCol="price", maxIter=100)
# Pipeline
lr_pipeline = Pipeline(stages=[assembler, scaler, lr])
gbt_pipeline = Pipeline(stages=[assembler, scaler, gbt])
# Split and train
train, test = housing_df.randomSplit([0.8, 0.2], seed=42)
lr_model = lr_pipeline.fit(train)
gbt_model = gbt_pipeline.fit(train)
# Evaluate
evaluator = RegressionEvaluator(labelCol="price", metricName="rmse")
lr_pred = lr_model.transform(test)
gbt_pred = gbt_model.transform(test)
print(f"Linear Regression RMSE: {evaluator.evaluate(lr_pred):,.2f}")
print(f"GBT RMSE: {evaluator.evaluate(gbt_pred):,.2f}")
# Feature importance (GBT)
gbt_fitted = gbt_model.stages[-1]
print("\nFeature Importance (GBT):")
for feat, imp in zip(["sqft", "bedrooms", "bathrooms", "age"], gbt_fitted.featureImportances):
print(f" {feat}: {imp:.4f}")
Python Implementation: Complete Spark Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time
class SparkMLPipeline:
"""Complete Spark ML pipeline for classification."""
def __init__(self, app_name="ML_Pipeline"):
self.spark = SparkSession.builder \
.appName(app_name) \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
self.spark.sparkContext.setLogLevel("WARN")
def load_data(self, path=None, data=None, schema=None):
"""Load data from path or create from list."""
if data and schema:
return self.spark.createDataFrame(data, schema)
elif path:
return self.spark.read.parquet(path)
else:
raise ValueError("Provide either path or data with schema")
def explore_data(self, df, name="Dataset"):
"""Perform basic EDA."""
print(f"\n{'='*60}")
print(f"Exploration: {name}")
print(f"{'='*60}")
print(f"\nRows: {df.count():,}")
print(f"Columns: {len(df.columns)}")
print("\nSchema:")
df.printSchema()
print("\nStatistics:")
df.describe().show()
print("\nNull counts:")
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
return df
def build_pipeline(self, categorical_cols, numerical_cols, label_col,
n_trees=100, max_depth=5):
"""Build ML pipeline."""
stages = []
# Index and encode categorical columns
for col in categorical_cols:
indexer = StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
stages.append(indexer)
# Assemble features
feature_cols = numerical_cols + [f"{c}_index" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
stages.append(assembler)
# Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
stages.append(scaler)
# Classifier
rf = RandomForestClassifier(
featuresCol="features",
labelCol=label_col,
numTrees=n_trees,
maxDepth=max_depth,
seed=42
)
stages.append(rf)
return Pipeline(stages=stages)
def train_and_evaluate(self, pipeline, train_df, test_df, label_col):
"""Train model and evaluate."""
start_time = time.time()
model = pipeline.fit(train_df)
train_time = time.time() - start_time
start_time = time.time()
predictions = model.transform(test_df)
predict_time = time.time() - start_time
# Metrics
evaluator_acc = MulticlassClassificationEvaluator(
labelCol=label_col, metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(
labelCol=label_col, metricName="f1")
accuracy = evaluator_acc.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)
print(f"\nTraining time: {train_time:.2f}s")
print(f"Prediction time: {predict_time:.2f}s")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
return model, predictions
def stop(self):
"""Stop Spark session."""
self.spark.stop()
# Example usage
if __name__ == "__main__":
# Initialize pipeline
pipeline = SparkMLPipeline("CustomerChurn_Prediction")
# Sample data
data = [
(25, "Male", "Yes", "No", 1),
(30, "Female", "Yes", "Yes", 0),
(35, "Male", "No", "Yes", 1),
(28, "Female", "Yes", "No", 0),
(40, "Male", "No", "No", 1),
] * 200
schema = ["age", "gender", "partner", "churn", "label"]
df = pipeline.load_data(data=data, schema=schema)
pipeline.explore_data(df, "Customer Data")
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Build and train
ml_pipeline = pipeline.build_pipeline(
categorical_cols=["gender", "partner"],
numerical_cols=["age"],
label_col="label"
)
model, predictions = pipeline.train_and_evaluate(
ml_pipeline, train_df, test_df, "label"
)
# Show predictions
predictions.select("age", "gender", "label", "probability", "prediction").show(5)
pipeline.stop()
Performance Optimization
Partitioning Strategy
# Repartition by key
df = df.repartition(200, "department")
# Coalesce to reduce partitions
df = df.coalesce(10)
# Check partition count
print(f"Partitions: {df.rdd.getNumPartitions()}")
Broadcast Variables
# Broadcast small lookup table
lookup = {"US": "United States", "UK": "United Kingdom"}
broadcast_lookup = sc.broadcast(lookup)
# Use in transformation
df = df.withColumn("country_name",
F.udf(lambda x: broadcast_lookup.value.get(x, x))(F.col("country_code")))
Join Strategies
# Broadcast join for small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# Sort-merge join (default for large tables)
result = large_df.join(medium_df, "key")
đĄ Broadcast Joins
Use broadcast joins for small tables (typically < 10MB) to avoid expensive shuffle operations. The small table is sent to all executors, eliminating the need for data redistribution.
Key Takeaways
đSummary: Spark Fundamentals
- Spark is the standard for big data processing in Python â its RDD abstraction enables fault-tolerant distributed computation
- DataFrames > RDDs for most use cases â Catalyst optimizer enables query optimization impossible with RDDs
- Lazy evaluation: Transformations build a DAG; actions trigger execution â allowing whole-stage code generation
- Shuffles are expensive: Minimize by partitioning wisely, using map-side aggregations, and broadcasting small tables
- MLlib provides scalable ML algorithms â pipelines enable reproducible feature engineering and model training
- Caching helps when reusing DataFrames â persist intermediate results for iterative algorithms
- Broadcast joins for small tables to avoid shuffles â the most common performance optimization
Practice Exercises
Exercise 1: DataFrame Operations
Create a DataFrame with 1 million rows and perform:
- GroupBy with multiple aggregations
- Window functions for running totals
- Complex SQL queries
Exercise 2: ETL Pipeline
Build a complete ETL pipeline that:
- Reads raw data
- Cleans and transforms
- Features engineering
- Writes to Parquet
Exercise 3: ML on Big Data
Train a classification model on a large dataset using MLlib with proper cross-validation.
Exercise 4: Performance Tuning
Optimize a slow Spark job by:
- Repartitioning data
- Using broadcast joins
- Caching intermediate results
Discussion Questions
- When would you choose Spark over Pandas?
- How do you handle data skew in Spark?
- What are the cost implications of Spark in the cloud?