CW

Apache Spark Provider Integration with Airflow

Free Lesson

Advertisement

Apache Spark Provider Integration with Airflow

Architecture Diagram

Formal Definitions

DfSparkSubmitOperator

The SparkSubmitOperator wraps the spark-submit CLI command, providing Airflow-native task execution for PySpark, Spark Scala, and Spark Java applications. It manages master URL construction, package dependencies, and configuration arguments.

DfSpark Connection

A Spark connection in Airflow specifies the cluster endpoint (host), deploy mode (extra.host), and authentication details. It abstracts cluster-specific configuration, allowing operators to be portable across YARN, Kubernetes, and standalone deployments.

DfDeploy Mode

Deploy mode determines where the Spark driver runs: client (driver on the Airflow worker) or cluster (driver on the cluster). In cluster mode, ToverheadT_{\text{overhead}} is reduced since the driver doesn't depend on Airflow's worker process.

Detailed Explanation

Connection Setup

# Airflow connection for Spark
# Connection ID: spark_default
# Connection Type: Spark
# Host: yarn (or k8s://https://\<api\>:6443 or spark://<host>:7077)
# Extra: {
#   "deploy-mode": "cluster",
#   "spark-home": "/opt/spark",
#   "spark-binary": "spark-submit",
#   "namespace": "spark-jobs",
#   "job-template": {
#       "apiVersion": "spark.apache.org/v1beta2",
#       "kind": "SparkApplication",
#       "spec": {"...": "..."}
#   }
# }

Basic SparkSubmitOperator

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG(
    dag_id='spark_etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['spark', 'etl'],
) as dag:

    submit_extract = SparkSubmitOperator(
        task_id='spark_extract',
        conn_id='spark_default',
        application='/opt/spark/jobs/extract.py',
        name='extract_job_{{ ds }}',
        application_args=[
            '--date', '{{ ds }}',
            '--source', 's3://data-lake/raw/',
            '--target', 's3://data-lake/staging/',
        ],
        conf={
            'spark.sql.shuffle.partitions': '200',
            'spark.sql.adaptive.enabled': 'true',
            'spark.dynamicAllocation.enabled': 'true',
            'spark.dynamicAllocation.minExecutors': '2',
            'spark.dynamicAllocation.maxExecutors': '20',
        },
        packages='org.apache.hadoop:hadoop-aws:3.3.4',
        py_files='/opt/spark/lib/utils.py',
        driver_memory='4g',
        executor_memory='8g',
        executor_cores=4,
        num_executors=10,
        verbose=False,
        trigger_rule='all_success',
        on_failure_callback=None,
        timeout=3600,
    )

    submit_transform = SparkSubmitOperator(
        task_id='spark_transform',
        conn_id='spark_default',
        application='/opt/spark/jobs/transform.py',
        name='transform_job_{{ ds }}',
        application_args=[
            '--date', '{{ ds }}',
            '--input', 's3://data-lake/staging/',
            '--output', 's3://data-lake/processed/',
        ],
        conf={
            'spark.sql.shuffle.partitions': '200',
            'spark.sql.adaptive.enabled': 'true',
        },
        driver_memory='4g',
        executor_memory='8g',
        num_executors=10,
    )

    submit_extract >> submit_transform

SparkJDBCOperator

from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator

jdbc_extract = SparkJDBCOperator(
    task_id='jdbc_extract',
    conn_id='spark_default',
    jdbc_table='orders',
    jdbc_url='jdbc:postgresql://db-host:5432/warehouse',
    jdbc_driver='org.postgresql.Driver',
    connection_properties={
        'user': '{{ conn.jdbc.username }}',
        'password': '{{ conn.jdbc.password }}',
    },
    target_dir='s3://data-lake/staging/orders/',
    fetchsize=10000,
    batchsize=5000,
    num_partitions=20,
    partition_column='order_id',
    lower_bound=1,
    upper_bound=1000000,
    driver_memory='4g',
    executor_memory='8g',
    num_executors=5,
)

PySpark Job File

# /opt/spark/jobs/transform.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, max
import argparse


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--date', required=True)
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    return parser.parse_args()


def main():
    args = parse_args()

    spark = SparkSession.builder \
        .appName(f'transform_{args.date}') \
        .getOrCreate()

    # Read raw data
    orders = spark.read.parquet(f'{args.input}/orders/{args.date}')

    # Transform
    aggregated = orders.groupBy('customer_id').agg(
        sum('amount').alias('total_amount'),
        count('*').alias('order_count'),
        max('order_date').alias('last_order_date'),
    )

    # Write processed data
    aggregated.write.mode('overwrite').parquet(f'{args.output}/fct_orders/{args.date}')

    spark.stop()


if __name__ == '__main__':
    main()

Deferrable Spark Job Monitoring

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.triggers.spark_submit import (
    SparkSubmitTrigger,
)
from airflow.operators.python import PythonOperator
from typing import Any, Dict


class DeferredSparkSubmitOperator(SparkSubmitOperator):
    """
    SparkSubmitOperator that defers after submitting the job.
    The worker slot is released while monitoring.
    """

    def execute(self, context: Dict[str, Any]) -> None:
        self._hook = self._get_hook()
        self._conn = self._hook.get_connection(self._conn_id)

        application_id = self._submit_job(
            context['run_id'],
            self._application,
        )

        self.defer(
            timeout=self._total_timeout_seconds,
            trigger=SparkSubmitTrigger(
                application_id=application_id,
                poll_interval=self._poll_interval,
                conn_id=self._conn_id,
            ),
            method_name='execute_complete',
        )

    def execute_complete(self, context: Dict[str, Any], event: Dict[str, Any]) -> None:
        if event['status'] == 'error':
            raise RuntimeError(f'Spark job failed: {event.get("message")}')
        self.log.info(f'Spark job completed: {event.get("application_id")}')

Key Concepts Table

OperatorPurposeConnectionBest For
SparkSubmitOperatorSubmit any Spark appspark_defaultPySpark, Scala, Java
SparkJDBCOperatorJDBC read/writespark_defaultDatabase transfers
SparkSQLOperatorExecute SQL queriesspark_defaultHive/Spark SQL
SparkBatchOperatorBatch processingspark_defaultStructured Streaming

Configuration Reference

ParameterDescriptionYARNKubernetes
conn_idSpark connectionyarnk8s://https://<api>:6443
deploy_modeclient or clusterclustercluster
num_executorsExecutor countFixed or dynamicPod replicas
executor_memoryRAM per executor4-16gConfigured via resources
executor_coresCPU per executor2-8Configured via resources
driver_memoryDriver RAM2-8g2-8g
packagesMaven coordinatesDownloaded on submitDownloaded on submit
py_filesPython dependenciesLocal or HDFSMounted volumes

Best Practices

  1. Use cluster deploy mode for production — client mode ties the driver to Airflow's worker.
  2. Enable dynamic allocation to optimize resource usage: spark.dynamicAllocation.enabled=true.
  3. Set num_partitions appropriately for JDBC reads to avoid OOM or under-parallelization.
  4. Use packages for dependency management instead of pre-installing on cluster.
  5. Monitor Spark UI for job performance — check stages, shuffle, and spill metrics.
  6. Use deferrable operators for long-running Spark jobs to free worker slots.
  7. Set timeout on SparkSubmitOperator to prevent indefinite hangs.
  8. Tag Spark applications with Airflow run IDs for traceability.

For Spark on Kubernetes, configure the Spark connection to use k8s://https://<api-server>:6443 and set spark.kubernetes.authenticate.driver.serviceAccountName in conf. This avoids embedding credentials in the operator.

SparkSubmitOperator builds the spark-submit command from operator arguments. The command is executed as a subprocess on the Airflow worker (client mode) or via the cluster manager (cluster mode). Ensure the Airflow worker has access to spark-submit in its PATH.

Key Takeaways:

  • SparkSubmitOperator wraps spark-submit for PySpark, Scala, and Java applications
  • SparkJDBCOperator handles database reads/writes with partitioned parallel loading
  • Use cluster deploy mode for production to decouple the Spark driver from Airflow workers
  • Dynamic allocation and adaptive query execution optimize resource usage
  • Deferrable SparkSubmitOperator frees worker slots during long-running jobs
  • Configure spark-defaults.conf on the cluster for shared settings across jobs

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement