Apache Spark Provider Integration with Airflow
Architecture Diagram
Formal Definitions
DfSparkSubmitOperator
The SparkSubmitOperator wraps the spark-submit CLI command, providing Airflow-native task execution for PySpark, Spark Scala, and Spark Java applications. It manages master URL construction, package dependencies, and configuration arguments.
DfSpark Connection
A Spark connection in Airflow specifies the cluster endpoint (host), deploy mode (extra.host), and authentication details. It abstracts cluster-specific configuration, allowing operators to be portable across YARN, Kubernetes, and standalone deployments.
DfDeploy Mode
Deploy mode determines where the Spark driver runs: client (driver on the Airflow worker) or cluster (driver on the cluster). In cluster mode, is reduced since the driver doesn't depend on Airflow's worker process.
Detailed Explanation
Connection Setup
# Airflow connection for Spark
# Connection ID: spark_default
# Connection Type: Spark
# Host: yarn (or k8s://https://\<api\>:6443 or spark://<host>:7077)
# Extra: {
# "deploy-mode": "cluster",
# "spark-home": "/opt/spark",
# "spark-binary": "spark-submit",
# "namespace": "spark-jobs",
# "job-template": {
# "apiVersion": "spark.apache.org/v1beta2",
# "kind": "SparkApplication",
# "spec": {"...": "..."}
# }
# }
Basic SparkSubmitOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG(
dag_id='spark_etl_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['spark', 'etl'],
) as dag:
submit_extract = SparkSubmitOperator(
task_id='spark_extract',
conn_id='spark_default',
application='/opt/spark/jobs/extract.py',
name='extract_job_{{ ds }}',
application_args=[
'--date', '{{ ds }}',
'--source', 's3://data-lake/raw/',
'--target', 's3://data-lake/staging/',
],
conf={
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
'spark.dynamicAllocation.enabled': 'true',
'spark.dynamicAllocation.minExecutors': '2',
'spark.dynamicAllocation.maxExecutors': '20',
},
packages='org.apache.hadoop:hadoop-aws:3.3.4',
py_files='/opt/spark/lib/utils.py',
driver_memory='4g',
executor_memory='8g',
executor_cores=4,
num_executors=10,
verbose=False,
trigger_rule='all_success',
on_failure_callback=None,
timeout=3600,
)
submit_transform = SparkSubmitOperator(
task_id='spark_transform',
conn_id='spark_default',
application='/opt/spark/jobs/transform.py',
name='transform_job_{{ ds }}',
application_args=[
'--date', '{{ ds }}',
'--input', 's3://data-lake/staging/',
'--output', 's3://data-lake/processed/',
],
conf={
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
},
driver_memory='4g',
executor_memory='8g',
num_executors=10,
)
submit_extract >> submit_transform
SparkJDBCOperator
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
jdbc_extract = SparkJDBCOperator(
task_id='jdbc_extract',
conn_id='spark_default',
jdbc_table='orders',
jdbc_url='jdbc:postgresql://db-host:5432/warehouse',
jdbc_driver='org.postgresql.Driver',
connection_properties={
'user': '{{ conn.jdbc.username }}',
'password': '{{ conn.jdbc.password }}',
},
target_dir='s3://data-lake/staging/orders/',
fetchsize=10000,
batchsize=5000,
num_partitions=20,
partition_column='order_id',
lower_bound=1,
upper_bound=1000000,
driver_memory='4g',
executor_memory='8g',
num_executors=5,
)
PySpark Job File
# /opt/spark/jobs/transform.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, max
import argparse
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--date', required=True)
parser.add_argument('--input', required=True)
parser.add_argument('--output', required=True)
return parser.parse_args()
def main():
args = parse_args()
spark = SparkSession.builder \
.appName(f'transform_{args.date}') \
.getOrCreate()
# Read raw data
orders = spark.read.parquet(f'{args.input}/orders/{args.date}')
# Transform
aggregated = orders.groupBy('customer_id').agg(
sum('amount').alias('total_amount'),
count('*').alias('order_count'),
max('order_date').alias('last_order_date'),
)
# Write processed data
aggregated.write.mode('overwrite').parquet(f'{args.output}/fct_orders/{args.date}')
spark.stop()
if __name__ == '__main__':
main()
Deferrable Spark Job Monitoring
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.triggers.spark_submit import (
SparkSubmitTrigger,
)
from airflow.operators.python import PythonOperator
from typing import Any, Dict
class DeferredSparkSubmitOperator(SparkSubmitOperator):
"""
SparkSubmitOperator that defers after submitting the job.
The worker slot is released while monitoring.
"""
def execute(self, context: Dict[str, Any]) -> None:
self._hook = self._get_hook()
self._conn = self._hook.get_connection(self._conn_id)
application_id = self._submit_job(
context['run_id'],
self._application,
)
self.defer(
timeout=self._total_timeout_seconds,
trigger=SparkSubmitTrigger(
application_id=application_id,
poll_interval=self._poll_interval,
conn_id=self._conn_id,
),
method_name='execute_complete',
)
def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> None:
if event['status'] == 'error':
raise RuntimeError(f'Spark job failed: {event.get("message")}')
self.log.info(f'Spark job completed: {event.get("application_id")}')
Key Concepts Table
| Operator | Purpose | Connection | Best For |
|---|---|---|---|
| SparkSubmitOperator | Submit any Spark app | spark_default | PySpark, Scala, Java |
| SparkJDBCOperator | JDBC read/write | spark_default | Database transfers |
| SparkSQLOperator | Execute SQL queries | spark_default | Hive/Spark SQL |
| SparkBatchOperator | Batch processing | spark_default | Structured Streaming |
Configuration Reference
| Parameter | Description | YARN | Kubernetes |
|---|---|---|---|
conn_id | Spark connection | yarn | k8s://https://<api>:6443 |
deploy_mode | client or cluster | cluster | cluster |
num_executors | Executor count | Fixed or dynamic | Pod replicas |
executor_memory | RAM per executor | 4-16g | Configured via resources |
executor_cores | CPU per executor | 2-8 | Configured via resources |
driver_memory | Driver RAM | 2-8g | 2-8g |
packages | Maven coordinates | Downloaded on submit | Downloaded on submit |
py_files | Python dependencies | Local or HDFS | Mounted volumes |
Best Practices
- Use cluster deploy mode for production — client mode ties the driver to Airflow's worker.
- Enable dynamic allocation to optimize resource usage:
spark.dynamicAllocation.enabled=true. - Set
num_partitionsappropriately for JDBC reads to avoid OOM or under-parallelization. - Use
packagesfor dependency management instead of pre-installing on cluster. - Monitor Spark UI for job performance — check stages, shuffle, and spill metrics.
- Use deferrable operators for long-running Spark jobs to free worker slots.
- Set
timeouton SparkSubmitOperator to prevent indefinite hangs. - Tag Spark applications with Airflow run IDs for traceability.
For Spark on Kubernetes, configure the Spark connection to use k8s://https://<api-server>:6443 and set spark.kubernetes.authenticate.driver.serviceAccountName in conf. This avoids embedding credentials in the operator.
SparkSubmitOperator builds the spark-submit command from operator arguments. The command is executed as a subprocess on the Airflow worker (client mode) or via the cluster manager (cluster mode). Ensure the Airflow worker has access to spark-submit in its PATH.
Key Takeaways:
- SparkSubmitOperator wraps spark-submit for PySpark, Scala, and Java applications
- SparkJDBCOperator handles database reads/writes with partitioned parallel loading
- Use cluster deploy mode for production to decouple the Spark driver from Airflow workers
- Dynamic allocation and adaptive query execution optimize resource usage
- Deferrable SparkSubmitOperator frees worker slots during long-running jobs
- Configure spark-defaults.conf on the cluster for shared settings across jobs
See Also
- Databricks Provider — Databricks-managed Spark clusters
- BigQuery Provider — Google BigQuery integration
- Snowflake Provider — Snowflake data warehouse integration
- Operators and Hooks — Operator lifecycle and hook architecture