19. Spark Submit in PySpark
Dfspark-submit
spark-submit is the command-line tool for launching Spark applications. It handles dependency packaging, JVM configuration, and submission to cluster managers (YARN, Mesos, Kubernetes, Standalone).
DfDeployment Mode
A deployment mode determines where the driver runs: client mode (driver on the submitting machine, good for interactive/debugging) or cluster mode (driver on a cluster node, good for production).
Driver Memory Formula
Here,
- =Required driver memory
- =Memory needed to collect results from executors
- =Memory needed to store broadcast variables
- =Minimum recommended driver memory
Use cluster mode for production deployments β it ensures the driver survives if the submitting machine goes down. Use client mode for development and debugging where you need direct access to driver logs and the Spark UI.
Always set --conf spark.dynamicAllocation.enabled=true and --conf spark.shuffle.service.enabled=true for YARN cluster mode. This allows executors to scale up/down based on workload.
ThResource Contention
Theorem: The total resources requested by a Spark application must not exceed available_cluster_resources Γ F_{utilization} where F_{utilization} is the target utilization factor (typically 0.8β0.9 for shared clusters). Exceeding this causes resource contention and queueing delays.
- spark-submit handles packaging, JVM config, and cluster submission
- Client mode: driver on submit machine (debugging); Cluster mode: driver on cluster (production)
- Total resources = executors Γ (cores + memory + overhead)
- Set driver memory based on collect size and broadcast variable size
ποΈ Deployment Modes Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPARK-SUBMIT DEPLOYMENT MODES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CLIENT MODE β β
β β β β
β β βββββββββββββββββββ βββββββββββββββββββ β β
β β β Client Machine βββββΆβ Cluster β β β
β β β (spark-submit) β β Manager β β β
β β β β β β β β
β β β βββββββββββββ β β βββββββββββββ β β β
β β β β Driver β β β β Executor β β β β
β β β β (Local) β β β β 1 β β β β
β β β βββββββββββββ β β βββββββββββββ β β β
β β β β β βββββββββββββ β β β
β β β Advantages: β β β Executor β β β β
β β β β’ Easy debug β β β 2 β β β β
β β β β’ Direct accessβ β βββββββββββββ β β β
β β β β’ Simple setup β β β β β
β β β β β Disadvantages: β β β
β β β Disadvantages: β β β’ Driver not β β β
β β β β’ Single point β β protected β β β
β β β of failure β β β’ Network β β β
β β β β’ Network β β dependency β β β
β β β dependency β β β β β
β β βββββββββββββββββββ βββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CLUSTER MODE β β
β β β β
β β βββββββββββββββββββ βββββββββββββββββββ β β
β β β Client Machine βββββΆβ Cluster β β β
β β β (spark-submit) β β Manager β β β
β β β β β β β β
β β β βββββββββββββ β β βββββββββββββ β β β
β β β β (No β β β β Driver β β β β
β β β β Driver) β β β β (Cluster) β β β β
β β β βββββββββββββ β β βββββββββββββ β β β
β β β β β βββββββββββββ β β β
β β β Advantages: β β β Executor β β β β
β β β β’ Fault β β β 1 β β β β
β β β tolerant β β βββββββββββββ β β β
β β β β’ Better β β βββββββββββββ β β β
β β β resource β β β Executor β β β β
β β β usage β β β 2 β β β β
β β β β β βββββββββββββ β β β
β β β Disadvantages: β β β β β
β β β β’ Harder to β β Advantages: β β β
β β β debug β β β’ Driver β β β
β β β β’ Complex setupβ β protected β β β
β β βββββββββββββββββββ βββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SPARK- SUBMIT FLOW β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Submit Application β β β
β β β spark-submit --master yarn --deploy-mode cluster β β β
β β β app.py β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2. Cluster Manager Allocates Resources β β β
β β β β’ Requests driver container β β β
β β β β’ Requests executor containers β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βΌ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 3. Driver Starts and Manages Executors β β β
β β β β’ Launches executor tasks β β β
β β β β’ Monitors progress β β β
β β β β’ Handles failures β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Configuration Parameters
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPARK-SUBMIT CONFIGURATION PARAMETERS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CORE PARAMETERS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β --master Cluster master URL β β β
β β β β’ local[*] β β β
β β β β’ yarn β β β
β β β β’ spark://host:port β β β
β β β β’ k8s://https://host:port β β β
β β β β β β
β β β --deploy-mode Deployment mode β β β
β β β β’ client (default) β β β
β β β β’ cluster β β β
β β β β β β
β β β --name Application name β β β
β β β --class Main class (for Java/Scala) β β β
β β β --jars JAR files to include β β β
β β β --py-files Python files to include β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RESOURCE PARAMETERS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β --driver-memory Driver memory (default: 1g) β β β
β β β --driver-cores Driver cores (default: 1) β β β
β β β --executor-memory Executor memory (default: 1g) β β β
β β β --executor-cores Executor cores (default: 1) β β β
β β β --num-executors Number of executors (YARN) β β β
β β β --total-executors Total executors (YARN) β β β
β β β β β β
β β β Dynamic Allocation: β β β
β β β --conf spark.dynamicAllocation.enabled=true β β β
β β β --conf spark.dynamicAllocation.minExecutors=2 β β β
β β β --conf spark.dynamicAllocation.maxExecutors=100 β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RUNTIME PARAMETERS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β --conf KEY=VALUE Set arbitrary Spark config β β β
β β β --properties-file File with extra properties β β β
β β β --driver-java-options Driver JVM options β β β
β β β --executor-java-options Executor JVM options β β β
β β β --driver-class-path Driver classpath β β β
β β β --executor-class-path Executor classpath β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β YARN-SPECIFIC PARAMETERS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β --queue YARN queue name β β β
β β β --archives Archives to distribute β β β
β β β --files Files to distribute β β β
β β β --conf spark.yarn.maxAppAttempts=2 β β β
β β β --conf spark.yarn.submit.waitAppCompletion=false β β β
β β β --conf spark.yarn.historyServer.address=host:port β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Execution Flow Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SPARK-SUBMIT EXECUTION FLOW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 1: PARSE ARGUMENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β spark-submit \ β β β
β β β --master yarn \ β β β
β β β --deploy-mode cluster \ β β β
β β β --executor-memory 8g \ β β β
β β β --executor-cores 4 \ β β β
β β β --num-executors 10 \ β β β
β β β --conf spark.sql.shuffle.partitions=200 \ β β β
β β β my_app.py β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 2: VALIDATE CONFIGURATION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Check master URL accessibility β β β
β β β β’ Validate resource requests β β β
β β β β’ Verify application dependencies β β β
β β β β’ Check queue permissions (YARN) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 3: SUBMIT TO CLUSTER MANAGER β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β YARN: β β β
β β β β’ Create application master container β β β
β β β β’ Upload application resources β β β
β β β β’ Start application master β β β
β β β β β β
β β β Kubernetes: β β β
β β β β’ Create driver pod β β β
β β β β’ Pull container image β β β
β β β β’ Start driver pod β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 4: DRIVER STARTUP β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Initialize SparkContext β β β
β β β β’ Connect to cluster manager β β β
β β β β’ Request executor resources β β β
β β β β’ Set up communication channels β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 5: EXECUTE APPLICATION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Distribute tasks to executors β β β
β β β β’ Monitor task execution β β β
β β β β’ Handle failures and retries β β β
β β β β’ Collect results β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Step 6: CLEANUP AND EXIT β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β’ Release executor resources β β β
β β β β’ Clean up temporary files β β β
β β β β’ Report final status β β β
β β β β’ Exit with appropriate code β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Detailed Explanation
Spark-submit is the primary tool for deploying PySpark applications to cluster environments. It handles the complexity of resource allocation, dependency management, and application lifecycle, allowing developers to focus on business logic rather than cluster infrastructure. Understanding spark-submit's parameters and execution flow is essential for effective PySpark deployment.
The choice of deployment mode (client vs. cluster) significantly impacts application behavior. Client mode runs the driver on the submission machine, providing easy access to logs and debugging capabilities but creating a single point of failure. Cluster mode runs the driver within the cluster, offering better fault tolerance and resource efficiency but making debugging more challenging.
Resource configuration is critical for application performance and stability. Parameters like executor-memory, executor-cores, and num-executors determine how much computing power the application can utilize. Dynamic allocation allows the application to scale resources based on workload demands, improving cluster utilization for variable workloads.
Configuration management in spark-submit involves multiple layers: command-line arguments, configuration files, and programmatic settings. Understanding the precedence rules and configuration options helps optimize application behavior for different environments and workloads.
Dependency management is handled through parameters like --jars, --py-files, and --packages. These parameters ensure that all required libraries are available to the application, whether they are JAR files, Python modules, or external packages from repositories like Maven or PyPI.
YARN-specific parameters control integration with the Hadoop ecosystem, including queue management, application history tracking, and resource negotiation. Understanding these parameters is essential for running PySpark applications in production Hadoop environments.
Kubernetes deployment introduces cloud-native concepts like container orchestration, resource requests and limits, and pod management. Spark-submit's Kubernetes integration enables running PySpark applications in modern containerized environments with auto-scaling and resource isolation capabilities.
Best practices for spark-submit include: starting with conservative resource configurations and tuning based on workload characteristics, using dynamic allocation for variable workloads, implementing proper dependency management, testing configurations in staging environments, and monitoring application performance in production.
Advanced techniques include custom resource schedulers, application templates for common configurations, and automated deployment pipelines. These techniques help organizations standardize and optimize their PySpark deployment processes.
π Key Concepts Table
| Parameter | Description | Default | Recommended |
|---|---|---|---|
| --master | Cluster master URL | local[*] | yarn, k8s://... |
| --deploy-mode | Driver deployment mode | client | cluster (production) |
| --executor-memory | Memory per executor | 1g | 4-16g (workload dependent) |
| --executor-cores | Cores per executor | 1 | 4-8 (workload dependent) |
| --num-executors | Number of executors | 2 | 10-100 (workload dependent) |
| --conf | Custom configuration | - | As needed |
π» Code Examples
Basic spark-submit Command
# Basic spark-submit with Python application
spark-submit \
--master yarn \
--deploy-mode client \
--name "MySparkApplication" \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.default.parallelism=200 \
my_application.py
# spark-submit with dependencies
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "ApplicationWithDeps" \
--py-files utilities.py,helpers.zip \
--jars mysql-connector-java-8.0.27.jar \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
--executor-memory 16g \
--executor-cores 8 \
--driver-memory 8g \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=50 \
my_application.py
# spark-submit with YARN queue
spark-submit \
--master yarn \
--deploy-mode cluster \
--queue production \
--name "ProductionJob" \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 20 \
--conf spark.yarn.maxAppAttempts=3 \
--conf spark.yarn.submit.waitAppCompletion=false \
production_job.py
Kubernetes Deployment
# Kubernetes deployment with spark-submit
spark-submit \
--master k8s://https://kubernetes.default.svc:6443 \
--deploy-mode cluster \
--name "KubernetesSparkApp" \
--conf spark.kubernetes.container.image=spark:3.3.0 \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driver.label.app=spark-processing \
--conf spark.kubernetes.executor.label.app=spark-processing \
--conf spark.kubernetes.executor.request.cores=2 \
--conf spark.kubernetes.executor.limit.cores=4 \
--conf spark.kubernetes.driver.request.cores=1 \
--conf spark.kubernetes.driver.limit.cores=2 \
--conf spark.executor.memory=8g \
--conf spark.driver.memory=4g \
local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0.jar
# Kubernetes with dynamic allocation
spark-submit \
--master k8s://https://kubernetes.default.svc:6443 \
--deploy-mode cluster \
--name "KubernetesDynamic" \
--conf spark.kubernetes.container.image=spark:3.3.0 \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.dynamicAllocation.executorIdleTimeout=60s \
--conf spark.kubernetes.allocation.batch.size=5 \
local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0.jar
Python Application with Configuration
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import sys
import os
def create_spark_session(app_name, configs=None):
"""Create Spark session with configuration"""
builder = SparkSession.builder.appName(app_name)
# Apply custom configurations
if configs:
for key, value in configs.items():
builder = builder.config(key, value)
# Set default configurations
builder = builder \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "512m")
return builder.getOrCreate()
def main():
"""Main application entry point"""
# Parse command line arguments
if len(sys.argv) < 2:
print("Usage: spark-submit my_app.py <input_path> [output_path]")
sys.exit(1)
input_path = sys.argv[1]
output_path = sys.argv[2] if len(sys.argv) > 2 else "/tmp/output"
# Create Spark session
spark = create_spark_session(
app_name="PySparkApplication",
configs={
"spark.executor.memory": "8g",
"spark.executor.cores": "4",
"spark.dynamicAllocation.enabled": "true"
}
)
try:
# Read data
df = spark.read.parquet(input_path)
# Process data
result = df \
.withColumn("processed_at", current_timestamp()) \
.filter(col("value") > 0) \
.groupBy("category") \
.agg(
sum("value").alias("total_value"),
count("*").alias("count"),
avg("value").alias("avg_value")
)
# Write results
result.write \
.mode("overwrite") \
.parquet(output_path)
print(f"Application completed successfully. Output: {output_path}")
except Exception as e:
print(f"Application failed: {str(e)}")
sys.exit(1)
finally:
spark.stop()
if __name__ == "__main__":
main()
Configuration File Example
# spark-submit.properties
# Cluster Configuration
spark.master=yarn
spark.submit.deployMode=cluster
spark.app.name=ProductionSparkApp
# Resource Configuration
spark.executor.memory=16g
spark.executor.cores=8
spark.driver.memory=8g
spark.driver.cores=4
# Dynamic Allocation
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=100
spark.dynamicAllocation.executorIdleTimeout=60s
spark.shuffle.service.enabled=true
# Performance Configuration
spark.sql.shuffle.partitions=400
spark.sql.autoBroadcastJoinThreshold=20971520
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=1024m
# YARN Configuration
spark.yarn.queue=production
spark.yarn.maxAppAttempts=3
spark.yarn.submit.waitAppCompletion=false
spark.yarn.historyServer.address=history-server:18080
# Monitoring Configuration
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///spark-history
spark.history.fs.logDirectory=hdfs:///spark-history
spark.history.ui.port=18080
π Performance Metrics
| Metric | Target | Warning | Critical | Optimization |
|---|---|---|---|---|
| Startup Time | < 30s | 30-60s | > 60s | Reduce dependencies, optimize configuration |
| Resource Utilization | > 80% | 60-80% | < 60% | Tune parallelism, use dynamic allocation |
| Application Success Rate | > 99% | 95-99% | < 95% | Improve error handling, increase retries |
| Task Completion Time | < 5min | 5-15min | > 15min | Optimize code, increase resources |
| Cluster Utilization | > 70% | 50-70% | < 50% | Use dynamic allocation, optimize scheduling |
π Best Practices
- Use cluster mode for production - Better fault tolerance and resource management
- Configure dynamic allocation - Optimize resource usage for variable workloads
- Test configurations in staging - Validate settings before production deployment
- Monitor application performance - Track key metrics and optimize as needed
- Implement proper dependency management - Ensure all dependencies are available
- Use appropriate resource configurations - Balance performance with cost
- Configure logging and monitoring - Enable event logs and history server
- Handle failures gracefully - Implement retry mechanisms and error handling
- Document configurations - Maintain clear documentation of settings
- Review and optimize regularly - Continuously improve based on performance data
π Related Topics
- 17-cluster-management.mdx: Cluster resource management and optimization
- 18-gc-tuning.mdx: Garbage collection and memory optimization
- 20-monitoring-metrics.mdx: Monitoring application performance
- 11-structured-streaming.mdx: Streaming application deployment
See Also
- Kafka Streams (kafka/03): Deployment patterns for Kafka stream processing
- Data Engineering Streaming (data-engineering/022): Production deployment of streaming pipelines