Production Hardening for PySpark Applications
Architecture Diagram: Production Hardening Overview
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PYSPARK PRODUCTION HARDENING FRAMEWORK β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PRODUCTION READINESS PILLARS β β
β β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β β β SECURITY β βGOVERNANCEβ β CI/CD β βMONITORINGβ β TESTING β β β
β β β β β β β β β β β β β β
β β ββ’ AuthN/ β ββ’ Data β ββ’ Build β ββ’ Metrics β ββ’ Unit β β β
β β β AuthZ β β Lineage β β Automateβ β Collect β β Tests β β β
β β ββ’ Encrypt β ββ’ Schema β ββ’ Test β ββ’ Alert β ββ’ Integr. β β β
β β ββ’ Secrets β β Registryβ β Gate β β Rules β β Tests β β β
β β ββ’ Audit β ββ’ Quality β ββ’ Deploy β ββ’ Dashbrd β ββ’ Perf β β β
β β ββ’ Network β β Gates β β Auto β ββ’ Log Agg β β Tests β β β
β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PRODUCTION DEPLOYMENT PIPELINE β β
β β β β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ βββββββββββ β
β β β Code βββββΆβ Build βββββΆβ Test βββββΆβSecurityβββββΆβ Deploy βββββΆβMonitor ββ β
β β β Push β β & Lint β β Suite β β Scan β β Stage β β& Alert ββ β
β β ββββββββββ ββββββββββ ββββββββββ ββββββββββ ββββββββββ βββββββββββ β
β β β β
β β Trigger: Duration: Duration: Duration: Duration: Duration:β β
β β git push 2-5 min 5-15 min 3-5 min 5-10 min Continuousβ β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Quality Gates (Must Pass Before Promotion) β β β
β β β β β β
β β β β Code coverage > 80% β β β
β β β β Zero critical/high security vulnerabilities β β β
β β β β Schema validation passed β β β
β β β β Performance benchmarks met β β β
β β β β Data quality checks passed β β β
β β β β Peer review approved β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: Security Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PYSPARK SECURITY ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LAYER 1: NETWORK SECURITY β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β VPC / VNet β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Private Subnet (Spark Cluster) β β β β
β β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β β
β β β β β Driver β βExecutor 1β βExecutor 2β βExecutor Nβ β β β β
β β β β β Node β β β β β β β β β β β
β β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β β
β β β β β β β β
β β β β Security Groups: β β β β
β β β β β’ Inbound: Only from bastion/Jump host (SSH) β β β β
β β β β β’ Inbound: Only from driver (executor communication) β β β β
β β β β β’ Outbound: Only to S3/ADLS (VPC Endpoint) β β β β
β β β β β’ Outbound: Only to Kafka (VPC Endpoint) β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β VPC Endpoints (No Internet Traffic): β β β
β β β β’ com.amazonaws.*.s3 β S3 Gateway Endpoint β β β
β β β β’ com.amazonaws.*.dynamodb β DynamoDB Endpoint β β β
β β β β’ com.amazonaws.*.kafka β Kafka Interface Endpoint β β β
β β β β’ com.amazonaws.*.glue β Glue Interface Endpoint β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LAYER 2: AUTHENTICATION & AUTHORIZATION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β
β β β β IAM Roles β β Kerberos β β OAuth 2.0 β β β β
β β β β (Cloud) β β (On-Prem) β β (Cloud) β β β β
β β β β β β β β β β β β
β β β β β’ EMR Role β β β’ Principal β β β’ Token β β β β
β β β β β’ S3 Access β β β’ Keytab β β β’ Refresh β β β β
β β β β β’ Glue β β β’ Ticket β β β’ Scope β β β β
β β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Unity Catalog (Fine-Grained Access Control) β β β β
β β β β β β β β
β β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β β
β β β β β Catalog β β Schema β β Table β β Column β β β β β
β β β β β Level β β Level β β Level β β Level β β β β β
β β β β β β β β β β β β β β β β
β β β β β USE β β CREATE β β SELECT β β MASKING β β β β β
β β β β β CATALOG β β SCHEMA β β INSERT β β REDACT β β β β β
β β β β β β β β β UPDATE β β HASH β β β β β
β β β β β β β β β DELETE β β β β β β β
β β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LAYER 3: DATA PROTECTION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β Encryption at Rest: β β β
β β β β’ S3 SSE-KMS (AWS-managed keys) or SSE-C (customer keys) β β β
β β β β’ Delta Lake column-level encryption (upcoming) β β β
β β β β’ EBS encryption for local disks β β β
β β β β β β
β β β Encryption in Transit: β β β
β β β β’ TLS 1.3 for all Spark communication β β β
β β β β’ HTTPS for S3/ADLS access β β β
β β β β’ mTLS for Kafka connections β β β
β β β β β β
β β β Secrets Management: β β β
β β β β’ AWS Secrets Manager / Azure Key Vault / GCP Secret Manager β β β
β β β β’ Never hardcode credentials in code or configs β β β
β β β β’ Rotate credentials automatically β β β
β β β β β β
β β β Data Masking: β β β
β β β β’ Column-level masking rules in Unity Catalog β β β
β β β β’ Dynamic masking based on user role β β β
β β β β’ PII detection and automatic redaction β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architecture Diagram: CI/CD Pipeline for PySpark
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PYSPARK CI/CD PIPELINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DEVELOPMENT STAGING PRODUCTION β β
β β βββββββββββββ ββββββββ βββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β PHASE 1: BUILD & VALIDATE (2-5 minutes) β β β
β β β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β β β Lint β β Type β β Unit β β Schema β β Build β β β β
β β β β Check β β Check β β Tests β β Valid β β Wheel β β β β
β β β β β β β β β β β β β β β β
β β β β flake8 β β mypy β β pytest β β Great β β pip β β β β
β β β β black β β pysparkβ β 80%+ β β Expect β β wheel β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β PHASE 2: SECURITY SCAN (3-5 minutes) β β β
β β β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β β β SAST β β SCA β β Secret β β Infra β β β β
β β β β Scan β β Scan β β Scan β β Scan β β β β
β β β β β β β β β β β β β β
β β β β bandit β β safety β β truffleHβ β tfsec β β β β
β β β β sonar β β pip β β git β β cfn β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β PHASE 3: DEPLOY TO STAGING (5-10 minutes) β β β
β β β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β β β Deploy β β Smoke β β Integr β β Perf β β β β
β β β β to β β Tests β β Tests β β Tests β β β β
β β β β Stagingβ β β β β β β β β β
β β β β β β Basic β β E2E β β Load β β β β
β β β β EMR/ β β queriesβ β data β β 1000 β β β β
β β β β DBricksβ β pass β β flows β β TPS β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β PHASE 4: DEPLOY TO PRODUCTION (5-10 minutes) β β β
β β β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β β β Blue/ β β Canary β β Auto β β Roll- β β β β
β β β β Green β β Deploy β β Roll- β β back β β β β
β β β β Deploy β β (5%β25%β β back β β Ready β β β β
β β β β β β β100%) β β on β β β β β β
β β β β Zero β β β β error β β 1-clickβ β β β
β β β β down β β Auto β β rate β β revert β β β β
β β β β time β β metricsβ β > 1% β β β β β β
β β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β MONITORING & OBSERVABILITY (Continuous) β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βMetrics β β Logs β β Traces β β Alerts β βDashbrd β β β
β β βPrometheusβ β ELK β β Jaeger β βPagerDutyβ βGrafana β β β
β β βCloudWatchβ βSplunk β β X-Ray β βSlack β βCustom β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Detailed Explanation
Production hardening transforms a working PySpark prototype into a reliable, secure, and maintainable production system. This requires systematic attention to security, governance, CI/CD automation, monitoring, and testing. The cost of production incidents (data corruption, security breaches, SLA violations) far exceeds the investment in hardening.
Security is the foundation of production readiness. The defense-in-depth approach implements multiple security layers: network security (VPC isolation, security groups, VPC endpoints), authentication and authorization (IAM roles, Kerberos, OAuth 2.0, Unity Catalog), data protection (encryption at rest and in transit, secrets management, column-level masking), and audit logging (all data access and mutations logged). Every PySpark application must authenticate before accessing any resource and be authorized for the specific operations it performs.
CI/CD automation ensures consistent, repeatable deployments with quality gates that prevent regressions. The pipeline validates code quality (linting, type checking), runs unit and integration tests, performs security scans (SAST, SCA, secret detection), deploys to staging for smoke testing, and promotes to production with blue/green or canary deployment strategies. Quality gates block promotion if any check fails, preventing defective code from reaching production.
Monitoring and observability provide visibility into production behavior. Metrics collection tracks key indicators: job duration, data freshness, error rates, resource utilization, and cost. Alerting rules trigger notifications when metrics exceed thresholds. Dashboards provide real-time visibility into system health. Log aggregation enables debugging and forensics. Distributed tracing tracks requests across distributed components.
Testing in PySpark requires specialized approaches due to the distributed nature of Spark. Unit tests use local Spark sessions with small datasets. Integration tests verify end-to-end data flows. Performance tests measure throughput and latency under load. Data quality tests validate schema, completeness, accuracy, and consistency. Contract tests verify compatibility with downstream consumers.
Governance ensures compliance with regulatory requirements (GDPR, HIPAA, SOX) and organizational policies (data classification, retention, access control). Data lineage tracks the flow of data from source to consumption. Schema registries prevent breaking changes. Quality gates enforce data standards. Access policies control who can read and write which data.
Key Concepts Table
Mathematical Foundations
Definition: SLA/SLO
A Service Level Agreement (SLA) defines uptime guarantees, while Service Level Objectives (SLOs) set internal targets:
Three nines (99.9%) = 8.76 hours/year downtime; four nines (99.99%) = 52.56 minutes/year.
Error Budget
Remaining error budget for period with SLO target :
Deployment is blocked when .
Reliability Theorem
For independent components with individual reliability :
Series reliability decreases multiplicatively. Parallel redundancy: for redundant components.
Throughput Capacity
Peak throughput with executors and per-executor throughput :
where efficiency depends on shuffle overhead and skew.
Monitoring Alert Threshold
Alert threshold for metric with rolling mean and standard deviation :
where for 99.7% confidence (3-sigma rule).
Key Insight
Production hardening is about reducing blast radius: circuit breakers prevent cascading failures, retries with exponential backoff handle transient failures, and health checks enable automatic recovery. The goal is graceful degradation, not perfection.
Summary
Production hardening combines reliability engineering (redundancy, circuit breakers) with operational excellence (monitoring, alerting). SLOs define reliability targets; error budgets govern deployment velocity. System reliability decreases multiplicatively with component count, making redundancy critical.
Key Concepts Table (cont.)
| Pillar | Component | Tool/Technology | Implementation | Priority |
|---|---|---|---|---|
| Security | Network Isolation | VPC, Security Groups | Private subnets, VPC endpoints | Critical |
| Security | Authentication | IAM, Kerberos, OAuth | Role-based access | Critical |
| Security | Authorization | Unity Catalog, ACLs | Column-level masking | Critical |
| Security | Encryption | AWS KMS, TLS 1.3 | At rest + in transit | Critical |
| Security | Secrets | AWS Secrets Manager | Never hardcode | Critical |
| Governance | Data Lineage | OpenLineage, Unity | Auto-tracked | High |
| Governance | Schema Registry | Delta Lake, Unity | Schema evolution control | High |
| Governance | Quality Gates | Great Expectations, Deequ | Automated validation | High |
| CI/CD | Code Quality | flake8, black, mypy | Pre-commit hooks | High |
| CI/CD | Testing | pytest, chispa | Unit + integration | High |
| CI/CD | Security Scanning | bandit, safety, truffleHog | Automated scans | High |
| CI/CD | Deployment | Blue/Green, Canary | Zero-downtime | Medium |
| Monitoring | Metrics | Prometheus, CloudWatch | Custom Spark metrics | High |
| Monitoring | Logging | ELK, Splunk | Structured logging | High |
| Monitoring | Alerting | PagerDuty, Slack | Threshold-based | High |
| Monitoring | Dashboards | Grafana, Custom | Real-time visibility | Medium |
Code Examples
Example 1: Secure PySpark Application with IAM and Column Masking
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# βββ Secure Spark Session Configuration βββ
spark = (
SparkSession.builder
.appName("Secure-PySpark-App")
# Kerberos authentication (on-prem)
.config("spark.kerberos.keytab", "/etc/security/keytabs/spark.keytab")
.config("spark.kerberos.principal", "spark/_HOST@COMPANY.COM")
# Unity Catalog (Databricks)
.config("spark.databricks.unityCatalog.enabled", "true")
# Encryption
.config("spark.ssl.enabled", "true")
.config("spark.ssl.keyStore", "/path/to/keystore.jks")
.config("spark.ssl.trustStore", "/path/to/truststore.jks")
# Enable AQE
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
)
# βββ Secrets Management (Never hardcode!) βββ
def get_secret(secret_name):
"""Retrieve secret from cloud secrets manager."""
import boto3 # or azure.keyvault.secrets / google.cloud.secretmanager
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=secret_name)
return response['SecretString']
# Use secrets for database connections
db_password = get_secret("prod/warehouse/password")
db_username = get_secret("prod/warehouse/username")
# βββ Data Access with Unity Catalog Policies βββ
# Read with row-level security (automatically applied by Unity Catalog)
customer_data = (
spark.read
.format("delta")
.table("sales.customers") # Unity Catalog table with policies
)
# Column-level masking is applied automatically based on user role
# - Analysts see: customer_id, masked_name, masked_email
# - Data engineers see: customer_id, full_name, email
# - Executives see: all columns
# βββ Data Masking Functions (Custom Implementation) βββ
def mask_pii_columns(df, mask_rules):
"""Apply data masking based on column-level rules."""
masked_df = df
for col_name, rule in mask_rules.items():
if col_name in df.columns:
if rule["type"] == "partial_mask":
# Show first 2 chars + ***
masked_df = masked_df.withColumn(
col_name,
when(
col(col_name).isNotNull(),
concat(
substring(col(col_name), 1, 2),
lit("***")
)
).otherwise(lit(None))
)
elif rule["type"] == "full_mask":
masked_df = masked_df.withColumn(col_name, lit("***"))
elif rule["type"] == "hash":
masked_df = masked_df.withColumn(
col_name,
sha2(col(col_name), 256)
)
elif rule["type"] == "redact":
masked_df = masked_df.withColumn(
col_name,
regexp_replace(col(col_name), r'.', '*')
)
return masked_df
# Apply masking for analyst access
mask_rules = {
"customer_name": {"type": "partial_mask"},
"email": {"type": "partial_mask"},
"credit_card": {"type": "full_mask"},
"ssn": {"type": "hash"},
"address": {"type": "redact"},
}
masked_data = mask_pii_columns(customer_data, mask_rules)
masked_data.show(5)
# +-----------+--------------+------------------+----------+-------+
# |customer_id|customer_name | email|credit_card| ssn|
# +-----------+--------------+------------------+----------+-------+
# | 101| Al*** | al*** | *** | a1b2..|
# | 102| Bo*** | bo*** | *** | c3d4..|
# +-----------+--------------+------------------+----------+-------+
# βββ Audit Logging βββ
def log_data_access(user, table, operation, row_count):
"""Log data access for audit compliance."""
import boto3
from datetime import datetime
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user": user,
"table": table,
"operation": operation,
"row_count": row_count,
"session_id": spark.sparkContext.applicationId,
}
# Write to CloudWatch Logs
import json
print(json.dumps(log_entry))
# Also write to audit table
spark.createDataFrame([log_entry]).write \
.format("delta") \
.mode("append") \
.save("/mnt/audit/data_access")
log_data_access(
user="analyst@company.com",
table="sales.customers",
operation="SELECT",
row_count=customer_data.count()
)
Example 2: CI/CD Pipeline Configuration (GitHub Actions)
# .github/workflows/pyspark-ci-cd.yml
name: PySpark CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
PYTHON_VERSION: '3.11'
SPARK_VERSION: '3.5.0'
DELTA_VERSION: '3.1.0'
jobs:
# βββ PHASE 1: Build & Validate βββ
build-and-validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
pip install -r requirements-dev.txt
pip install pyspark==${{ env.SPARK_VERSION }} \
delta-spark==${{ env.DELTA_VERSION }} \
great-expectations chispa
- name: Lint check
run: |
flake8 src/ tests/
black --check src/ tests/
isort --check-only src/ tests/
- name: Type check
run: |
mypy src/ --ignore-missing-imports
- name: Unit tests
run: |
pytest tests/unit/ \
--cov=src/ \
--cov-report=xml \
--cov-report=html \
--junitxml=reports/unit-tests.xml \
-v
env:
PYSPARK_PYTHON: python
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
# βββ PHASE 2: Security Scan βββ
security-scan:
runs-on: ubuntu-latest
needs: build-and-validate
steps:
- uses: actions/checkout@v4
- name: Static Application Security Testing (SAST)
run: |
pip install bandit
bandit -r src/ -f json -o reports/sast-report.json || true
bandit -r src/ -ll # Fail on high-severity issues
- name: Software Composition Analysis (SCA)
run: |
pip install safety
safety check --json --output reports/sca-report.json
- name: Secret Scanning
run: |
pip install detect-secrets
detect-secrets scan --all-files --report-json reports/secrets-report.json
- name: Infrastructure Security
run: |
pip install checkov
checkov -d infra/ --framework cloudformation --output junitxml \
> reports/infra-report.xml
# βββ PHASE 3: Deploy to Staging βββ
deploy-staging:
runs-on: ubuntu-latest
needs: [build-and-validate, security-scan]
if: github.ref == 'refs/heads/main'
environment: staging
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::ACCOUNT:role/deploystaging
aws-region: us-east-1
- name: Build wheel
run: |
pip install build
python -m build --wheel
- name: Deploy to EMR/Staging
run: |
aws emr add-steps \
--cluster-id ${{ secrets.STAGING_CLUSTER_ID }} \
--steps Type=Spark,Name=pyspark-app,ActionOnFailure=CONTINUE,Args=[\
--deploy-mode,cluster,\
--conf,spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,\
s3://staging-bucket/wheels/pyspark-app-${{ github.sha }}.whl]
- name: Smoke tests
run: |
pytest tests/smoke/ -v --tb=short
env:
STAGING_ENDPOINT: ${{ secrets.STAGING_ENDPOINT }}
- name: Integration tests
run: |
pytest tests/integration/ -v --tb=short -m "staging"
env:
STAGING_ENDPOINT: ${{ secrets.STAGING_ENDPOINT }}
- name: Performance tests
run: |
python tests/performance/benchmark.py --environment staging
# βββ PHASE 4: Deploy to Production βββ
deploy-production:
runs-on: ubuntu-latest
needs: deploy-staging
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::ACCOUNT:role/deployproduction
aws-region: us-east-1
- name: Canary deployment (5% traffic)
run: |
# Deploy canary with 5% traffic
aws emr add-steps \
--cluster-id ${{ secrets.PROD_CLUSTER_ID }} \
--steps Type=Spark,Name=pyspark-canary,ActionOnFailure=CONTINUE,Args=[\
--deploy-mode,cluster,\
--conf,spark.dynamicAllocation.maxExecutors=2,\
s3://prod-bucket/wheels/pyspark-app-${{ github.sha }}.whl]
- name: Monitor canary (5 minutes)
run: |
sleep 300
python scripts/monitor_canary.py --duration 300 --error-threshold 0.01
- name: Full deployment
run: |
aws emr add-steps \
--cluster-id ${{ secrets.PROD_CLUSTER_ID }} \
--steps Type=Spark,Name=pyspark-full,ActionOnFailure=CONTINUE,Args=[\
--deploy-mode,cluster,\
--conf,spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,\
s3://prod-bucket/wheels/pyspark-app-${{ github.sha }}.whl]
- name: Post-deployment validation
run: |
pytest tests/validation/ -v --tb=short
Example 3: Production Monitoring and Alerting
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import logging
# βββ Custom Metrics Collection βββ
class PySparkMetrics:
"""Custom metrics for PySpark production monitoring."""
def __init__(self, app_name):
self.app_name = app_name
# Prometheus metrics
self.job_duration = Histogram(
'pyspark_job_duration_seconds',
'Duration of PySpark jobs in seconds',
['job_name', 'status'],
buckets=[10, 30, 60, 120, 300, 600, 1800, 3600]
)
self.records_processed = Counter(
'pyspark_records_processed_total',
'Total records processed',
['job_name', 'source', 'sink']
)
self.records_failed = Counter(
'pyspark_records_failed_total',
'Total records that failed processing',
['job_name', 'error_type']
)
self.data_freshness = Gauge(
'pyspark_data_freshness_seconds',
'Seconds since last data update',
['table_name']
)
self.executor_count = Gauge(
'pyspark_executor_count',
'Number of active executors',
['cluster_id']
)
self.shuffle_bytes = Counter(
'pyspark_shuffle_bytes_total',
'Total shuffle bytes',
['job_name']
)
# Start Prometheus metrics server
start_http_server(8080)
def track_job(self, job_name):
"""Decorator to track job execution metrics."""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
self.job_duration.labels(
job_name=job_name, status='success'
).observe(duration)
logging.info(f"Job {job_name} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
self.job_duration.labels(
job_name=job_name, status='failure'
).observe(duration)
self.records_failed.labels(
job_name=job_name, error_type=type(e).__name__
).inc()
logging.error(f"Job {job_name} failed: {e}")
raise
return wrapper
return decorator
# βββ Data Quality Monitoring βββ
class DataQualityMonitor:
"""Monitor data quality metrics in production."""
def __init__(self, spark):
self.spark = spark
def check_freshness(self, table_path, max_age_hours=24):
"""Check if data is fresh enough."""
df = self.spark.read.format("delta").load(table_path)
# Get latest record timestamp
latest = df.agg(
max("_updated_at").alias("latest_timestamp")
).first()["latest_timestamp"]
if latest is None:
return {"status": "ERROR", "message": "No data found"}
age_hours = (datetime.now() - latest).total_seconds() / 3600
if age_hours > max_age_hours:
return {
"status": "ALERT",
"message": f"Data is {age_hours:.1f} hours old (max: {max_age_hours})",
"age_hours": age_hours
}
return {"status": "OK", "age_hours": age_hours}
def check_completeness(self, df, required_columns):
"""Check completeness of required columns."""
total_rows = df.count()
results = {}
for col_name in required_columns:
null_count = df.filter(col(col_name).isNull()).count()
completeness = 1 - (null_count / total_rows) if total_rows > 0 else 0
results[col_name] = {
"completeness": completeness,
"null_count": null_count,
"status": "OK" if completeness >= 0.99 else "ALERT"
}
return results
def check_uniqueness(self, df, key_columns):
"""Check uniqueness of key columns."""
total_rows = df.count()
unique_rows = df.select(key_columns).distinct().count()
duplicates = total_rows - unique_rows
uniqueness = unique_rows / total_rows if total_rows > 0 else 0
return {
"total_rows": total_rows,
"unique_rows": unique_rows,
"duplicates": duplicates,
"uniqueness": uniqueness,
"status": "OK" if duplicates == 0 else "ALERT"
}
# βββ Structured Logging βββ
def setup_logging(app_name):
"""Configure structured logging for PySpark."""
import json
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"app": app_name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if hasattr(record, 'extra_data'):
log_entry.update(record.extra_data)
return json.dumps(log_entry)
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger = logging.getLogger(app_name)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# βββ Example Usage βββ
spark = SparkSession.builder \
.appName("Production-Monitoring") \
.getOrCreate()
metrics = PySparkMetrics("production-app")
quality_monitor = DataQualityMonitor(spark)
logger = setup_logging("production-app")
# Track a job
@metrics.track_job("process_transactions")
def process_transactions():
"""Process transaction data with monitoring."""
df = spark.read.format("delta").load("/mnt/lakehouse/silver/transactions")
# Quality checks
freshness = quality_monitor.check_freshness(
"/mnt/lakehouse/silver/transactions",
max_age_hours=4
)
if freshness["status"] == "ALERT":
logger.warning(f"Data freshness alert: {freshness['message']}")
completeness = quality_monitor.check_completeness(
df, ["transaction_id", "customer_id", "amount"]
)
for col_name, result in completeness.items():
if result["status"] == "ALERT":
logger.warning(f"Completeness alert for {col_name}: {result['completeness']}")
# Process data
result = (
df
.groupBy("customer_id")
.agg(count("*").alias("transaction_count"))
.write
.format("delta")
.mode("overwrite")
.save("/mnt/lakehouse/gold/customer_summary")
)
# Update metrics
metrics.records_processed.labels(
job_name="process_transactions",
source="silver/transactions",
sink="gold/customer_summary"
).inc(df.count())
return result
# Run with monitoring
process_transactions()
Performance Metrics
| Metric | Without Hardening | With Hardening | Improvement |
|---|---|---|---|
| Mean Time to Recovery (MTTR) | 4-8 hours | 15-30 minutes | 90% reduction |
| Production Incidents/Month | 8-12 | 1-2 | 85% reduction |
| Deployment Frequency | Weekly | Daily | 7x increase |
| Lead Time for Changes | 2-4 weeks | 1-2 days | 90% reduction |
| Change Failure Rate | 15-25% | 2-5% | 80% reduction |
| Security Vulnerabilities | 20+/quarter | 0-2/quarter | 90% reduction |
| Data Quality Issues | 10+/month | 1-2/month | 85% reduction |
| SLA Compliance | 95% | 99.9% | 5% improvement |
| Cost of Downtime | 5K/hour | 90% reduction | |
| Audit Compliance | Manual (days) | Automated (hours) | 95% faster |
Best Practices
-
Implement defense-in-depth security β Never rely on a single security control. Use network isolation, authentication, authorization, encryption, and audit logging together. Each layer provides independent protection.
-
Automate everything with CI/CD β Manual deployments are error-prone and slow. Automate building, testing, security scanning, and deployment. Use quality gates to prevent defective code from reaching production.
-
Test at every level β Unit tests validate individual functions, integration tests validate data flows, performance tests validate SLAs, and contract tests validate downstream compatibility. Aim for 80%+ code coverage.
-
Monitor proactively, not reactively β Set up metrics, alerts, and dashboards before production issues occur. Monitor data freshness, quality, performance, and cost continuously. Alert on anomalies, not just failures.
-
Use structured logging β Log in JSON format with consistent fields (timestamp, level, app, message, context). This enables efficient log searching, aggregation, and analysis in production.
-
Implement blue/green or canary deployments β Never deploy directly to production. Use deployment strategies that allow instant rollback if issues are detected. Canary deployments (5% β 25% β 100%) reduce blast radius.
-
Maintain runbooks for common issues β Document step-by-step procedures for common production issues (job failures, data quality problems, performance degradation). This reduces MTTR and enables junior engineers to handle incidents.
-
Enforce governance as code β Use automated tools (Unity Catalog, Great Expectations, Deequ) to enforce data governance policies. Manual governance processes don't scale and are prone to human error.
-
Implement comprehensive audit logging β Log all data access, mutations, and configuration changes. This is required for regulatory compliance (GDPR, HIPAA, SOX) and enables forensic analysis after incidents.
-
Practice chaos engineering β Regularly inject failures (spot interruptions, network partitions, disk failures) to validate that your production hardening actually works. Use tools like Chaos Monkey or custom fault injection.
See also: Snowflake Time Travel (snowflake/02), Kafka CDC (kafka/04), Airflow DAGs (airflow/02)