ML Feature Engineering in PySpark
ποΈ Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML FEATURE ENGINEERING PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Raw Data βββββΆβ Feature βββββΆβ Feature β β
β β Ingestion β β Extraction β β Transform β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Data Quality β β Feature β β Pipeline β β
β β Validation β β Store β β Registry β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FEATURE TRANSFORMATION STAGES β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βScaling ββββΆβEncoding ββββΆβSelectionββββΆβAssembly β β β
β β βNormaliz β βCategorizβ βReduct. β βCombine β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βStandard β βOneHot β βPCA β βVector β β β
β β βMinMax β βLabelEnc β βChiSq β βAssemblerβ β β
β β βRobust β βOrdinal β βVariance β βCombine β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PIPELINE EXECUTION FLOW β β
β β β β
β β Fit βββΆ Transform βββΆ Evaluate βββΆ Tune βββΆ Deploy β β
β β β β β β β β β
β β βΌ βΌ βΌ βΌ βΌ β β
β β Learn Apply Metrics Hyperparams Model β β
β β Params Steps Score Optimize Registry β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Feature engineering is the critical process of transforming raw data into meaningful features that improve machine learning model performance. In PySpark, this involves a sophisticated pipeline architecture that handles large-scale data processing while maintaining reproducibility and scalability.
Feature Transformers
PySpark provides a rich ecosystem of feature transformers that handle various data transformations:
Numerical Transformers:
- StandardScaler: Z-score normalization (mean=0, std=1)
- MinMaxScaler: Scales features to [0, 1] range
- RobustScaler: Uses median and IQR, robust to outliers
- MaxAbsScaler: Scales by maximum absolute value, preserves sparsity
- Normalizer: L2 normalization for unit vector transformation
Categorical Transformers:
- StringIndexer: Converts categorical strings to numeric indices
- OneHotEncoder: Creates binary vectors for categorical features
- IndexToString: Reverse mapping from indices to strings
- VectorIndexer: Handles continuous and categorical features in vectors
Feature Extraction:
- Tokenizer: Splits text into words
- HashingTF: Term frequency using hashing
- IDF: Inverse document frequency weighting
- Word2Vec: Word embedding vectorization
Feature Selection:
- ChiSqSelector: Chi-squared feature selection
- VarianceThreshold: Removes low-variance features
- Binarization: Converts features to binary values
Pipeline Architecture
The Pipeline API in PySpark implements a linear sequence of stages:
Pipeline(stages=[
Stage1: Transformer (e.g., StringIndexer),
Stage2: Transformer (e.g., StandardScaler),
Stage3: Estimator (e.g., LogisticRegression)
])
Key Concepts:
- Transformer: Applies transformations to DataFrames (fit not required)
- Estimator: Learns from data and produces Transformers (requires fit)
- Pipeline: Chains Transformers and Estimators sequentially
- Parameter: Configurable settings for Transformers/Estimators
Feature Store Integration
A Feature Store provides centralized feature management:
- Offline Store: Parquet/Delta for batch training
- Online Store: Redis/DynamoDB for low-latency serving
- Feature Registry: Metadata catalog for feature discovery
- Point-in-time Correctness: Prevents data leakage
Advanced Techniques
Feature Hashing: Maps high-cardinality categorical features to fixed-size vectors using hash functions. Benefits:
- No need to store vocabulary
- Handles unseen categories
- Memory-efficient for high cardinality
Interaction Features: Creates feature combinations to capture non-linear relationships:
- Polynomial features: xΒ², xy, yΒ²
- Cross features: categorical interactions
- Custom interactions: domain-specific combinations
Temporal Features: Extracts time-based patterns:
- Cyclical encoding: sin/cos for hour-of-day
- Lag features: previous period values
- Rolling statistics: moving averages, standard deviations
Text Feature Engineering: Processes unstructured text data:
- TF-IDF: Term frequency-inverse document frequency
- Word2Vec: Semantic word embeddings
- Sentence embeddings: Document-level representations
Performance Optimization
Partitioning Strategy:
- Use appropriate shuffle partitions for feature transformations
- Coalesce small partitions after transformations
- Repartition by key for joins with dimension tables
Memory Management:
- Cache intermediate DataFrames in memory
- Use Kryo serialization for complex objects
- Configure off-heap memory for large feature vectors
Catalyst Optimizations:
- Leverage predicate pushdown for filtering
- Use column pruning to reduce I/O
- Enable adaptive query execution for dynamic optimization
Feature Quality and Validation
Data Quality Checks:
- Null value detection and handling
- Outlier detection and treatment
- Feature distribution analysis
- Correlation analysis between features
Feature Drift Detection:
- Monitor feature statistics over time
- Detect distribution shifts
- Alert on significant changes
- Automate retraining triggers
Reproducibility:
- Version control feature transformations
- Log feature computation pipelines
- Track feature lineage
- Enable rollback capabilities
Production Considerations
Scalability:
- Handle petabyte-scale feature engineering
- Distribute computation across clusters
- Optimize for both batch and streaming
Latency Requirements:
- Batch features: minutes to hours
- Micro-batch features: seconds to minutes
- Real-time features: milliseconds
Cost Optimization:
- Right-size compute resources
- Use spot instances for batch processing
- Implement feature caching strategies
- Monitor and optimize storage costs
These feature engineering capabilities enable organizations to build sophisticated ML pipelines that can process massive datasets while maintaining high performance and reproducibility.
π― Key Concepts Table
Mathematical Foundations
Definition: Feature Scaling
Min-max scaling normalizes feature to range :
Standardization (z-score) centers and scales: .
PCA Dimensionality Reduction
For -dimensional data, PCA finds principal components by eigendecomposition of covariance matrix :
Retained variance ratio: .
Feature Independence Theorem
For features with correlation matrix , the number of independent features is:
If , features are linearly dependent and dimensionality reduction is possible without information loss.
TF-IDF Weight
For term in document within corpus :
Encoder Dimensionality
One-hot encoding with cardinality produces columns (or with drop-first). For categorical features with cardinalities :
Key Insight
Feature engineering order matters: scale BEFORE PCA (PCA is scale-sensitive), encode BEFORE scaling (encoding creates new features), and impute BEFORE encoding (missing values affect cardinality detection).
Summary
Feature engineering transforms raw data into model-ready representations. Scaling ensures equal feature contribution, PCA reduces dimensionality while preserving variance, and encoding converts categorical data to numeric. The pipeline order is critical for correctness.
| Transformer | Input Type | Output Type | Use Case | Complexity |
|---|---|---|---|---|
| StandardScaler | Numerical | Numerical | Normalization | O(n) |
| MinMaxScaler | Numerical | Numerical | Range scaling | O(n) |
| StringIndexer | Categorical | Numerical | Category encoding | O(n log n) |
| OneHotEncoder | Numerical | Vector | Binary encoding | O(n * k) |
| VectorAssembler | Multiple | Vector | Feature combination | O(n * k) |
| HashingTF | Text | Vector | Text features | O(n) |
| ChiSqSelector | Numerical | Numerical | Feature selection | O(n * kΒ²) |
| PCA | Numerical | Numerical | Dimensionality reduction | O(n * kΒ³) |
π» Code Examples
Example 1: Basic Feature Engineering Pipeline
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
StringIndexer, OneHotEncoder, StandardScaler,
VectorAssembler, Imputer
)
from pyspark.ml.classification import LogisticRegression
# Initialize Spark Session
spark = SparkSession.builder \
.appName("Feature Engineering Pipeline") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Create sample customer data
customer_data = [
(1, "Premium", 25, 75000, 12, 0.85, 1),
(2, "Standard", 35, 55000, 8, 0.72, 0),
(3, "Premium", 45, 95000, 15, 0.92, 1),
(4, "Basic", 28, 35000, 5, 0.65, 0),
(5, "Standard", 52, 65000, 10, 0.78, 1),
(6, "Premium", 38, 85000, 14, 0.88, 1),
(7, "Basic", 31, 42000, 6, 0.68, 0),
(8, "Standard", 41, 58000, 9, 0.75, 0),
(9, "Premium", 33, 78000, 11, 0.86, 1),
(10, "Basic", 27, 38000, 4, 0.62, 0),
]
columns = ["customer_id", "tier", "age", "income",
"tenure_months", "credit_score", "churned"]
df = spark.createDataFrame(customer_data, columns)
# Define feature engineering stages
# Stage 1: Handle missing values
imputer = Imputer(
inputCols=["age", "income", "credit_score"],
outputCols=["age_imputed", "income_imputed", "credit_score_imputed"],
strategy="median"
)
# Stage 2: Index categorical features
tier_indexer = StringIndexer(
inputCol="tier",
outputCol="tier_index",
handleInvalid="keep"
)
# Stage 3: One-hot encode categorical features
tier_encoder = OneHotEncoder(
inputCol="tier_index",
outputCol="tier_vector",
dropLast=True
)
# Stage 4: Scale numerical features
scaler = StandardScaler(
inputCol="credit_score_imputed",
outputCol="credit_score_scaled",
withMean=True,
withStd=True
)
# Stage 5: Assemble all features into a single vector
assembler = VectorAssembler(
inputCols=[
"age_imputed",
"income_imputed",
"credit_score_scaled",
"tier_vector",
"tenure_months"
],
outputCol="features",
handleInvalid="skip"
)
# Stage 6: Define ML algorithm
lr = LogisticRegression(
featuresCol="features",
labelCol="churned",
maxIter=100,
regParam=0.01
)
# Create pipeline
pipeline = Pipeline(stages=[
imputer,
tier_indexer,
tier_encoder,
scaler,
assembler,
lr
])
# Fit pipeline
model = pipeline.fit(df)
# Transform data
result = model.transform(df)
# Show results
result.select(
"customer_id",
"features",
"probability",
"prediction"
).show(truncate=False)
Example 2: Advanced Feature Engineering with Custom Transformers
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lit, log, sqrt
from pyspark.sql.types import DoubleType, VectorType
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
# Custom Transformer for log transformation
class LogTransformer(Transformer, HasInputCol, HasOutputCol,
DefaultParamsReadable, DefaultParamsWritable):
def __init__(self, inputCol=None, outputCol=None):
super().__init__()
self._setDefault(inputCol=None, outputCol=None)
self.setParams(inputCol=inputCol, outputCol=outputCol)
def setParams(self, inputCol=None, outputCol=None):
if inputCol is not None:
self._set(inputCol=inputCol)
if outputCol is not None:
self._set(outputCol=outputCol)
return self
def _transform(self, dataset):
input_col = self.getInputCol()
output_col = self.getOutputCol()
# Apply log transformation with offset to handle zeros
return dataset.withColumn(
output_col,
log(col(input_col) + 1)
)
# Custom Transformer for feature interaction
class InteractionTransformer(Transformer, HasInputCol, HasOutputCol,
DefaultParamsReadable, DefaultParamsWritable):
def __init__(self, inputCol=None, outputCol=None):
super().__init__()
self._setDefault(inputCol=None, outputCol=None)
self.setParams(inputCol=inputCol, outputCol=outputCol)
def setParams(self, inputCol=None, outputCol=None):
if inputCol is not None:
self._set(inputCol=inputCol)
if outputCol is not None:
self._set(outputCol=outputCol)
return self
def _transform(self, dataset):
input_col = self.getInputCol()
output_col = self.getOutputCol()
# Create interaction features
return dataset.withColumn(
output_col,
col(input_col) * col("tenure_months")
)
# Advanced Feature Engineering Pipeline
from pyspark.ml.feature import (
Bucketizer, QuantileDiscretizer,
SQLTransformer, Interaction
)
# Create comprehensive dataset
transaction_data = [
(1, 100.50, 5, 1200.00, "2024-01-15", "electronics", 4.5),
(2, 250.75, 2, 800.00, "2024-01-16", "clothing", 3.8),
(3, 75.25, 8, 1500.00, "2024-01-17", "food", 4.2),
(4, 350.00, 1, 2000.00, "2024-01-18", "electronics", 4.8),
(5, 150.50, 6, 1100.00, "2024-01-19", "clothing", 4.0),
(6, 425.25, 3, 1800.00, "2024-01-20", "electronics", 4.6),
(7, 85.75, 10, 900.00, "2024-01-21", "food", 3.5),
(8, 275.00, 4, 1300.00, "2024-01-22", "clothing", 4.1),
(9, 190.50, 7, 1600.00, "2024-01-23", "electronics", 4.4),
(10, 320.25, 2, 1400.00, "2024-01-24", "clothing", 4.3),
]
columns = ["transaction_id", "amount", "quantity",
"total_value", "date", "category", "rating"]
df = spark.createDataFrame(transaction_data, columns)
# Define advanced feature engineering stages
# Stage 1: Bucket continuous features
amount_bucketizer = Bucketizer(
splits=[0, 50, 100, 200, 300, 500],
inputCol="amount",
outputCol="amount_bucket"
)
# Stage 2: Quantile discretization
rating_discretizer = QuantileDiscretizer(
numBuckets=4,
inputCol="rating",
outputCol="rating_quantile"
)
# Stage 3: SQL-based feature engineering
sql_transformer = SQLTransformer(
statement="""
SELECT *,
amount * quantity as volume_indicator,
CASE
WHEN amount > 200 THEN 1
ELSE 0
END as high_value_flag,
log(total_value + 1) as log_total_value
FROM __THIS__
"""
)
# Stage 4: Feature interactions
interaction = Interaction(
inputCols=["amount_bucket", "rating_quantile"],
outputCol="amount_rating_interaction"
)
# Stage 5: Custom log transformation
log_transformer = LogTransformer(
inputCol="total_value",
outputCol="log_total_value_custom"
)
# Create advanced pipeline
advanced_pipeline = Pipeline(stages=[
amount_bucketizer,
rating_discretizer,
sql_transformer,
interaction,
log_transformer
])
# Fit and transform
advanced_model = advanced_pipeline.fit(df)
advanced_result = advanced_model.transform(df)
# Show intermediate features
advanced_result.select(
"transaction_id", "amount", "amount_bucket",
"rating", "rating_quantile", "log_total_value_custom"
).show(truncate=False)
Example 3: Feature Selection and Dimensionality Reduction
from pyspark.ml.feature import (
VectorAssembler, PCA, ChiSqSelector,
VarianceThreshold, SelectKBest
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Create high-dimensional dataset
import random
random.seed(42)
high_dim_data = []
for i in range(1000):
features = [random.gauss(0, 1) for _ in range(50)]
# Add some signal in first 10 features
label = 1 if sum(features[:10]) > 0 else 0
high_dim_data.append((i, features, label))
columns = ["id", "raw_features", "label"]
df = spark.createDataFrame(high_dim_data, columns)
# Extract individual features from vector
from pyspark.sql.functions import udf, monotonically_increasing_id
# Explode vector into individual columns
def extract_features(row):
features = row.raw_features
result = {"id": row.id, "label": row.label}
for i, feat in enumerate(features):
result[f"feature_{i}"] = float(feat)
return result
# Transform data
df_with_features = df.rdd.map(extract_features).toDF()
df_with_features = df_with_features.withColumn("row_id", monotonically_increasing_id())
# Assemble all features
feature_cols = [f"feature_{i}" for i in range(50)]
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features_raw"
)
df_assembled = assembler.transform(df_with_features)
# Stage 1: Variance Threshold - Remove low variance features
variance_selector = VarianceThreshold(
threshold=0.1,
inputCol="features_raw",
outputCol="features_var_filtered"
)
df_var_filtered = variance_selector.transform(df_assembled)
# Stage 2: PCA for dimensionality reduction
pca = PCA(
k=20,
inputCol="features_raw",
outputCol="features_pca"
)
pca_model = pca.fit(df_assembled)
df_pca = pca_model.transform(df_assembled)
# Stage 3: Chi-Square Feature Selection
chi_selector = ChiSqSelector(
numTopFeatures=15,
featuresCol="features_raw",
labelCol="label",
outputCol="features_chi"
)
df_chi_selected = chi_selector.fit(df_assembled).transform(df_assembled)
# Stage 4: Random Forest Feature Importance
rf = RandomForestClassifier(
featuresCol="features_raw",
labelCol="label",
numTrees=100,
featureSubsetStrategy="sqrt"
)
rf_model = rf.fit(df_assembled)
# Extract feature importance
feature_importance = list(zip(feature_cols, rf_model.featureImportances.toArray()))
feature_importance.sort(key=lambda x: x[1], reverse=True)
print("=== Top 20 Features by Importance ===")
for i, (feature, importance) in enumerate(feature_importance[:20]):
print(f"{i+1:2d}. {feature}: {importance:.4f}")
# Stage 5: Create final pipeline with selected features
from pyspark.ml.feature import SQLTransformer
# Select top features based on importance
top_features = [feat for feat, imp in feature_importance[:15]]
# Create SQL transformer to select features
sql_selector = SQLTransformer(
statement=f"""
SELECT id, label,
{', '.join(top_features)}
FROM __THIS__
"""
)
# Assemble selected features
final_assembler = VectorAssembler(
inputCols=top_features,
outputCol="features_final"
)
# Complete feature selection pipeline
feature_pipeline = Pipeline(stages=[
sql_selector,
final_assembler
])
feature_model = feature_pipeline.fit(df_with_features)
final_df = feature_model.transform(df_with_features)
print(f"\nReduced from {len(feature_cols)} to {len(top_features)} features")
final_df.select("id", "label", "features_final").show(5, truncate=False)
Example 4: Feature Engineering for Time Series Data
from pyspark.sql.window import Window
from pyspark.sql.functions import (
lag, lead, datediff, date_format, hour,
dayofweek, month, year, collect_list, size
)
from pyspark.ml.feature import VectorAssembler, StandardScaler
# Create time series dataset
ts_data = [
(1, "2024-01-01 08:00:00", 100.0, 10, "A"),
(2, "2024-01-01 09:00:00", 110.0, 12, "A"),
(3, "2024-01-01 10:00:00", 95.0, 8, "A"),
(4, "2024-01-01 11:00:00", 120.0, 15, "A"),
(5, "2024-01-01 12:00:00", 105.0, 11, "A"),
(6, "2024-01-02 08:00:00", 115.0, 13, "A"),
(7, "2024-01-02 09:00:00", 125.0, 16, "A"),
(8, "2024-01-02 10:00:00", 100.0, 9, "A"),
(9, "2024-01-02 11:00:00", 130.0, 18, "A"),
(10, "2024-01-02 12:00:00", 110.0, 14, "A"),
]
columns = ["id", "timestamp", "value", "volume", "series_id"]
df = spark.createDataFrame(ts_data, columns)
# Time-based window for rolling statistics
window_spec = Window \
.partitionBy("series_id") \
.orderBy("timestamp") \
.rowsBetween(-2, 0) # 3-period rolling window
# Stage 1: Extract temporal features
df_temporal = df \
.withColumn("hour", hour(col("timestamp"))) \
.withColumn("day_of_week", dayofweek(col("timestamp"))) \
.withColumn("month", month(col("timestamp"))) \
.withColumn("year", year(col("timestamp"))) \
.withColumn("is_weekend",
when(col("day_of_week").isin(1, 7), 1).otherwise(0))
# Stage 2: Cyclical encoding for hour
df_cyclical = df_temporal \
.withColumn("hour_sin",
(2 * 3.14159 * col("hour") / 24).cast("double")) \
.withColumn("hour_cos",
(2 * 3.14159 * col("hour") / 24).cast("double"))
# Stage 3: Lag features
df_lags = df_cyclical \
.withColumn("lag_1", lag("value", 1).over(Window.partitionBy("series_id").orderBy("timestamp"))) \
.withColumn("lag_2", lag("value", 2).over(Window.partitionBy("series_id").orderBy("timestamp"))) \
.withColumn("lead_1", lead("value", 1).over(Window.partitionBy("series_id").orderBy("timestamp")))
# Stage 4: Rolling statistics
df_rolling = df_lags \
.withColumn("rolling_avg_3", avg("value").over(window_spec)) \
.withColumn("rolling_std_3", stddev("value").over(window_spec)) \
.withColumn("rolling_min_3", min("value").over(window_spec)) \
.withColumn("rolling_max_3", max("value").over(window_spec))
# Stage 5: Rate of change features
df_roc = df_rolling \
.withColumn("roc_1", (col("value") - col("lag_1")) / col("lag_1")) \
.withColumn("roc_2", (col("value") - col("lag_2")) / col("lag_2"))
# Stage 6: Volume features
df_volume = df_roc \
.withColumn("volume_change", col("volume") - lag("volume", 1).over(Window.partitionBy("series_id").orderBy("timestamp"))) \
.withColumn("volume_ratio", col("volume") / lag("volume", 1).over(Window.partitionBy("series_id").orderBy("timestamp")))
# Stage 7: Assemble features for ML
feature_cols = [
"hour", "day_of_week", "month", "is_weekend",
"lag_1", "lag_2", "rolling_avg_3", "rolling_std_3",
"rolling_min_3", "rolling_max_3", "roc_1", "roc_2",
"volume_change", "volume_ratio"
]
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features_raw"
)
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features_scaled"
)
# Create time series feature pipeline
ts_pipeline = Pipeline(stages=[assembler, scaler])
ts_model = ts_pipeline.fit(df_volume)
final_ts_df = ts_model.transform(df_volume)
print("=== Time Series Feature Engineering Results ===")
final_ts_df.select(
"id", "timestamp", "value", "features_scaled"
).show(5, truncate=False)
π Performance Metrics
| Pipeline Stage | Execution Time (s) | Memory (GB) | Output Features | Data Size (GB) |
|---|---|---|---|---|
| StringIndexer | 12.5 | 2.1 | 1 | 10 |
| OneHotEncoder | 18.3 | 3.4 | 50 | 10 |
| StandardScaler | 8.7 | 1.8 | 25 | 10 |
| VectorAssembler | 5.2 | 1.2 | 75 | 10 |
| PCA | 45.6 | 8.5 | 20 | 10 |
| ChiSqSelector | 32.1 | 6.2 | 15 | 10 |
| Complete Pipeline | 122.4 | 12.8 | 15 | 10 |
| Parallel Pipeline | 89.3 | 9.6 | 15 | 10 |
π§ Best Practices
1. Use Pipeline API for Reproducibility
# β Bad: Manual transformation steps
df = indexer.fit(df).transform(df)
df = scaler.fit(df).transform(df)
# Hard to reproduce and track
# β
Good: Use Pipeline for reproducibility
pipeline = Pipeline(stages=[indexer, scaler, assembler])
model = pipeline.fit(train_df)
result = model.transform(test_df)
2. Handle Missing Values Before Feature Engineering
# β Bad: Apply transformations with nulls
scaler.fit(df_with_nulls).transform(df_with_nulls)
# β
Good: Impute missing values first
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=["age", "income"],
outputCols=["age_imputed", "income_imputed"],
strategy="median"
)
df_imputed = imputer.fit(df).transform(df)
3. Cache Intermediate Results
# Cache frequently used DataFrames
df.cache()
# Use persist for larger-than-memory data
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
# Don't forget to unpersist when done
df.unpersist()
4. Optimize Feature Vector Assembly
# β Bad: Assemble too many features
assembler = VectorAssembler(
inputCols=range(1000), # Too many features
outputCol="features"
)
# β
Good: Use feature selection first
selector = ChiSqSelector(
numTopFeatures=100,
featuresCol="all_features",
labelCol="label",
outputCol="selected_features"
)
5. Use Kryo Serialization for Complex Objects
spark.conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "true")
# Register custom classes
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
spark.sparkContext._conf.set(
"spark.kryo.classesToRegister",
"org.apache.spark.ml.Pipeline;org.apache.spark.ml.feature.VectorAssembler"
)
6. Implement Feature Validation
from pyspark.sql.functions import col, count, when, isnan
def validate_features(df, feature_cols):
"""Validate feature data quality"""
total_rows = df.count()
for feature in feature_cols:
null_count = df.filter(col(feature).isNull() | isnan(col(feature))).count()
null_pct = (null_count / total_rows) * 100
if null_pct > 5:
print(f"WARNING: {feature} has {null_pct:.2f}% nulls")
return True
# Use validation in pipeline
validate_features(df, ["age", "income", "credit_score"])
7. Monitor Feature Drift
from pyspark.sql.functions import mean, stddev
def detect_feature_drift(train_df, test_df, feature_cols, threshold=0.1):
"""Detect feature distribution drift"""
drift_report = {}
for feature in feature_cols:
train_stats = train_df.select(
mean(feature).alias("mean"),
stddev(feature).alias("std")
).collect()[0]
test_stats = test_df.select(
mean(feature).alias("mean"),
stddev(feature).alias("std")
).collect()[0]
# Calculate drift metric
mean_drift = abs(train_stats["mean"] - test_stats["mean"]) / train_stats["std"]
if mean_drift > threshold:
drift_report[feature] = {
"train_mean": train_stats["mean"],
"test_mean": test_stats["mean"],
"drift": mean_drift
}
return drift_report
# Monitor feature drift
drift = detect_feature_drift(train_df, test_df, ["age", "income"])
8. Use Feature Store for Production
# Example feature store integration (conceptual)
class FeatureStore:
def __init__(self):
self.offline_store = {} # Parquet/Delta
self.online_store = {} # Redis/DynamoDB
def save_features(self, df, feature_set, version):
"""Save features to store"""
# Save to offline store (batch)
df.write.parquet(f"features/{feature_set}/v{version}")
# Save to online store (low-latency)
# Convert to dict and store in Redis
def get_features(self, entity_ids, feature_set, version):
"""Retrieve features for serving"""
# Check online store first (low latency)
# Fallback to offline store (batch)
# Use feature store in production
feature_store = FeatureStore()
feature_store.save_features(df, "customer_features", "1.0")
π Related Topics
- Model Training: ML algorithms and hyperparameter tuning
- Model Evaluation: Metrics and cross-validation
- Model Deployment: Production serving patterns
- Feature Monitoring: Drift detection and retraining triggers
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)