CW

Databricks Provider Integration with Airflow

Free Lesson

Advertisement

Databricks Provider Integration with Airflow

Architecture Diagram

Formal Definitions

DfDatabricks Provider

The Databricks provider (apache-airflow-providers-databricks) is a collection of operators, hooks, triggers, and sensors that integrate Apache Airflow with Databricks. It manages cluster lifecycle, job submission, notebook execution, and data movement operations through the Databricks REST API.

DfDatabricks Hook

The DatabricksHook encapsulates authentication (token or OAuth), API versioning, retry logic, and HTTP client management for Databricks REST API calls. It provides a unified interface for all Databricks operations.

Detailed Explanation

Connection Setup

# Airflow connection configuration
# Connection ID: databricks_default
# Connection Type: Databricks
# Host: https://<databricks-instance>.databricks.com
# Password: <personal-access-token>
# Extra: {"login": "<user>", "token": "<personal-access-token>"}

DatabricksSubmitRunOperator

The primary operator for submitting new Databricks jobs. It creates a new cluster or uses an existing one, submits the job, and monitors until completion.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import (
    DatabricksSubmitRunOperator,
)

with DAG(
    dag_id='databricks_spark_job',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['databricks', 'spark'],
) as dag:

    submit_spark = DatabricksSubmitRunOperator(
        task_id='submit_spark_job',
        databricks_conn_id='databricks_default',
        new_cluster={
            'spark_version': '13.3.x-scala2.12',
            'node_type_id': 'i3.xlarge',
            'num_workers': 4,
            'spark_conf': {
                'spark.sql.shuffle.partitions': '200',
                'spark.sql.adaptive.enabled': 'true',
            },
            'custom_tags': {
                'team': 'data-engineering',
                'project': 'etl-pipeline',
            },
        },
        spark_jar_task={
            'main_class_name': 'com.company.ETLJob',
            'parameters': [
                '--date', '{{ ds }}',
                '--source', 's3://data-lake/raw/',
                '--target', 's3://data-lake/processed/',
            ],
        },
        libraries=[
            {'jar': 's3://jars/company-etl-1.0.jar'},
            {'pypi': {'package': 'pyspark==3.5.0'}},
        ],
        timeout_seconds=3600,
        polling_period_seconds=30,
        trigger_rule='all_success',
    )

    submit_spark

DatabricksNotebookOperator

Execute Databricks notebooks as part of Airflow workflows.

from airflow.providers.databricks.operators.databricks import (
    DatabricksNotebookOperator,
)

run_notebook = DatabricksNotebookOperator(
    task_id='run_data_processing',
    databricks_conn_id='databricks_default',
    source='Repos/team/notebooks/data_processing',
    source_type='WORKSPACE',
    base_parameters={
        'date': '{{ ds }}',
        'run_type': 'scheduled',
    },
    new_cluster={
        'spark_version': '13.3.x-scala2.12',
        'node_type_id': 'Standard_DS3_v2',
        'num_workers': 2,
    },
    timeout_seconds=1800,
)

DatabricksRunNowOperator

Trigger an existing Databricks job by ID.

from airflow.providers.databricks.operators.databricks import (
    DatabricksRunNowOperator,
)

trigger_existing_job = DatabricksRunNowOperator(
    task_id='trigger_etl_job',
    databricks_conn_id='databricks_default',
    job_id=123456789,
    json_params={
        'date': '{{ ds }}',
        'overwrite': 'true',
    },
    poll=True,
    poll_interval=30,
    timeout=3600,
)

Sensor for Job Completion

from airflow.providers.databricks.sensors.databricks import (
    DatabricksJobSensor,
)

wait_for_job = DatabricksJobSensor(
    task_id='wait_for_databricks_job',
    databricks_conn_id='databricks_default',
    run_id='{{ ti.xcom_pull(task_ids="submit_job", key="run_id") }}',
    poke_interval=30,
    timeout=3600,
    mode='reschedule',
)

Full ETL Pipeline

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import (
    DatabricksSubmitRunOperator,
    DatabricksRunNowOperator,
)
from airflow.providers.databricks.sensors.databricks import (
    DatabricksJobSensor,
)
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator


def on_job_failure(context):
    SlackWebhookOperator(
        task_id='alert',
        slack_webhook_conn_id='slack_webhook',
        message=f'Databricks job failed: {context["exception"]}',
    ).execute(context={})


with DAG(
    dag_id='databricks_full_etl',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 2 * * *',
    catchup=False,
    default_args={
        'on_failure_callback': on_job_failure,
        'retries': 1,
        'retry_delay': timedelta(minutes=10),
    },
    tags=['databricks', 'etl'],
) as dag:

    submit_extract = DatabricksSubmitRunOperator(
        task_id='extract_data',
        new_cluster={
            'spark_version': '13.3.x-scala2.12',
            'node_type_id': 'i3.xlarge',
            'num_workers': 2,
        },
        spark_jar_task={
            'main_class_name': 'com.company.ExtractJob',
            'parameters': ['--date', '{{ ds }}'],
        },
        libraries=[{'jar': 's3://jars/extract-1.0.jar'}],
        timeout_seconds=3600,
    )

    wait_extract = DatabricksJobSensor(
        task_id='wait_extract',
        run_id='{{ ti.xcom_pull(task_ids="extract_data", key="run_id") }}',
        poke_interval=30,
        mode='reschedule',
    )

    submit_transform = DatabricksSubmitRunOperator(
        task_id='transform_data',
        existing_cluster_id='0123-456789-abcdef',
        spark_jar_task={
            'main_class_name': 'com.company.TransformJob',
            'parameters': ['--date', '{{ ds }}'],
        },
        libraries=[{'jar': 's3://jars/transform-1.0.jar'}],
        timeout_seconds=3600,
    )

    wait_transform = DatabricksJobSensor(
        task_id='wait_transform',
        run_id='{{ ti.xcom_pull(task_ids="transform_data", key="run_id") }}',
        poke_interval=30,
        mode='reschedule',
    )

    notify = PythonOperator(
        task_id='notify_completion',
        python_callable=lambda: print('ETL pipeline completed'),
    )

    submit_extract >> wait_extract >> submit_transform >> wait_transform >> notify

Key Concepts Table

OperatorPurposeCreates ClusterPolling
DatabricksSubmitRunOperatorSubmit new jobYes or existingYes
DatabricksNotebookOperatorRun notebookYes or existingYes
DatabricksRunNowOperatorTrigger existing jobNoOptional
DatabricksJobSensorWait for jobNoYes
DatabricksClusterLifeCycleOperatorStart/stop/restart clusterNoYes

Configuration Reference

ParameterDescriptionExample
databricks_conn_idAirflow connection IDdatabricks_default
new_clusterCluster spec for new cluster{'spark_version': '13.3.x', ...}
existing_cluster_idReuse existing cluster0123-456789-abcdef
spark_jar_taskJAR-based job definition{'main_class_name': '...'}
notebook_taskNotebook-based job{'notebook_path': '...'}
timeout_secondsMax wait time3600
polling_period_secondsPoll interval30
json_paramsJob parameters{'key': 'value'}

Best Practices

  1. Use existing clusters when possible — cluster startup can take 5-10 minutes.
  2. Set timeout_seconds on all operators to prevent indefinite waits.
  3. Use mode='reschedule' on sensors to free worker slots during waits.
  4. Tag clusters with team/project for cost tracking and management.
  5. Monitor job runs through Databricks job history and Airflow task logs.
  6. Use libraries for dependency management — wheel files, pip packages, or maven coordinates.
  7. Handle TERMINATED_WITH_ERRORS state with retries and failure callbacks.
  8. Leverage json_params for dynamic job parameterization via Jinja templates.

For long-running Spark jobs (>30 minutes), use DatabricksRunNowOperator with a pre-existing job configured in Databricks. This avoids the 15-minute timeout on SubmitRunOperator and gives you Databricks-native job monitoring.

The Databricks provider requires a Personal Access Token (PAT) or OAuth token stored in Airflow's connection. For managed Airflow (MWAA/Cloud Composer), use the environment's secret backend for token management.

Key Takeaways:

  • The Databricks provider offers operators, sensors, and hooks for full Databricks integration
  • DatabricksSubmitRunOperator creates new jobs with configurable clusters
  • DatabricksRunNowOperator triggers pre-existing Databricks jobs
  • Use DatabricksJobSensor with mode='reschedule' for efficient waiting
  • Configure existing_cluster_id to avoid cluster startup overhead
  • Always set timeout_seconds and handle failure callbacks

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement