πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Spark ML Pipelines

Apache SparkMachine Learning⭐ Premium

Advertisement

Spark ML Pipelines

Difficulty: Senior Level | Companies: Databricks, Netflix, Uber, Apple, Airbnb

ML Pipeline Fundamentals

Spark ML uses a Pipeline API where data flows through stages of Transformers and Estimators. This ensures consistent preprocessing for training and inference.

Building a Basic Pipeline

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder \
    .appName("MLPipelines") \
    .getOrCreate()

# Load data
df = spark.read.parquet("hdfs://data/customer-churn")

# Define preprocessing stages
indexer_gender = StringIndexer(
    inputCol="gender", 
    outputCol="gender_index",
    handleInvalid="keep"
)

indexer_region = StringIndexer(
    inputCol="region", 
    outputCol="region_index",
    handleInvalid="keep"
)

# Assemble features
assembler = VectorAssembler(
    inputCols=["age", "income", "tenure_months", "gender_index", "region_index"],
    outputCol="raw_features"
)

# Scale features
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Define classifier
classifier = RandomForestClassifier(
    featuresCol="features",
    labelCol="churned",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Create pipeline
pipeline = Pipeline(stages=[
    indexer_gender,
    indexer_region,
    assembler,
    scaler,
    classifier
])

# Fit pipeline
model = pipeline.fit(train_df)

# Transform data
predictions = model.transform(test_df)

ℹ️

Interview Insight: Pipelines ensure the same preprocessing is applied during training and inference. This prevents training-serving skew, a common production ML issue.

Feature Engineering at Scale

from pyspark.ml.feature import (
    HashingTF, IDF, Tokenizer, 
    OneHotEncoder, Bucketizer,
    VectorAssembler, SQLTransformer
)

# Text feature extraction
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

text_pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf])

# Categorical encoding
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_onehot")

# Bucketization
bucketizer = Bucketizer(
    splits=[0, 18, 35, 50, 65, 100],
    inputCol="age",
    outputCol="age_bucket"
)

# Custom transformations with SQL
sql_transformer = SQLTransformer(
    statement="SELECT *, (income / tenure_months) as income_per_month FROM __THIS__"
)

# Combine all features
assembler = VectorAssembler(
    inputCols=["features", "category_onehot", "age_bucket", "income_per_month"],
    outputCol="final_features"
)

# Complete feature engineering pipeline
feature_pipeline = Pipeline(stages=[
    tokenizer, hashing_tf, idf,
    indexer, encoder, bucketizer,
    sql_transformer, assembler
])

Cross-Validation and Hyperparameter Tuning

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(classifier.numTrees, [50, 100, 200]) \
    .addGrid(classifier.maxDepth, [5, 10, 15]) \
    .addGrid(classifier.featureSubsetStrategy, ["auto", "sqrt", "log2"]) \
    .build()

# Define evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="churned",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# Create cross-validator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=4,
    seed=42
)

# Fit with cross-validation
cv_model = cv.fit(train_df)

# Get best model
best_model = cv_model.bestModel
print(f"Best AUC: {cv_model.avgMetrics}")

# Access best parameters
rf_model = best_model.stages[-1]
print(f"Num Trees: {rf_model.getNumTrees}")
print(f"Max Depth: {rf_model.getOrDefault("maxDepth")}")

⚠️

Warning: Cross-validation with large datasets can be expensive. Consider using TrainValidationSplit for faster tuning, or sample data for initial exploration.

Model Evaluation

from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
    RegressionEvaluator
)

# Binary classification metrics
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="churned",
    rawPredictionCol="rawPrediction"
)

auc = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderROC"})
pr = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderPR"})

print(f"AUC: {auc:.4f}")
print(f"PR: {pr:.4f}")

# Multiclass metrics
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

print(f"Accuracy: {accuracy:.4f}")
print(f"F1: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# Confusion matrix
predictions.groupBy("label", "prediction").count().show()

Model Persistence and Serving

# Save pipeline model
model.write().overwrite().save("hdfs://models/churn-prediction")

# Load pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("hdfs://models/churn-prediction")

# Use for batch inference
new_data = spark.read.parquet("hdfs://data/new-customers")
predictions = loaded_model.transform(new_data)

# Save predictions
predictions.write.mode("overwrite").parquet("hdfs://predictions/churn")

# Real-time serving with Spark Structured Streaming
stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "customer-events") \
    .load()

parsed = stream.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
parsed = parsed.select("data.*")

# Apply model to stream
predictions_stream = loaded_model.transform(parsed)

query = predictions_stream \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "predictions") \
    .option("checkpointLocation", "hdfs://checkpoints/predictions") \
    .start()

ℹ️

Pro Tip: Always save the entire pipeline, not just the model. This ensures preprocessing steps are applied consistently during inference.

Distributed Training

# Spark ML provides distributed training for tree-based models
# For deep learning, use Horovod or TensorFlow on Spark

# Example: Distributed random forest
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    subsamplingRate=0.8,  # Bootstrap sampling
    featureSubsetStrategy="auto",
    seed=42
)

# Training is distributed across executors
model = rf.fit(train_df)

# Check feature importance
importance = model.featureImportances.toArray()
feature_names = assembler.getInputCols()

for name, imp in sorted(zip(feature_names, importance), key=lambda x: -x[1]):
    print(f"{name}: {imp:.4f}")

ℹ️

Key Takeaway: Spark ML Pipelines provide a unified API for building, tuning, and deploying ML models at scale. Use pipelines to ensure consistency between training and inference, and leverage distributed training for large datasets.

Follow-Up Questions

  • How does Spark handle imbalanced datasets in ML pipelines?
  • Explain the difference between transform and fit in the Pipeline API.
  • How would you implement custom Transformers for feature engineering?
  • Describe strategies for model versioning and A/B testing in production.
  • How does Spark ML handle missing values during preprocessing?

Advertisement