Databricks Provider Integration with Airflow
Architecture Diagram
Formal Definitions
DfDatabricks Provider
The Databricks provider (apache-airflow-providers-databricks) is a collection of operators, hooks, triggers, and sensors that integrate Apache Airflow with Databricks. It manages cluster lifecycle, job submission, notebook execution, and data movement operations through the Databricks REST API.
DfDatabricks Hook
The DatabricksHook encapsulates authentication (token or OAuth), API versioning, retry logic, and HTTP client management for Databricks REST API calls. It provides a unified interface for all Databricks operations.
Detailed Explanation
Connection Setup
# Airflow connection configuration
# Connection ID: databricks_default
# Connection Type: Databricks
# Host: https://<databricks-instance>.databricks.com
# Password: <personal-access-token>
# Extra: {"login": "<user>", "token": "<personal-access-token>"}
DatabricksSubmitRunOperator
The primary operator for submitting new Databricks jobs. It creates a new cluster or uses an existing one, submits the job, and monitors until completion.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import (
DatabricksSubmitRunOperator,
)
with DAG(
dag_id='databricks_spark_job',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['databricks', 'spark'],
) as dag:
submit_spark = DatabricksSubmitRunOperator(
task_id='submit_spark_job',
databricks_conn_id='databricks_default',
new_cluster={
'spark_version': '13.3.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 4,
'spark_conf': {
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
},
'custom_tags': {
'team': 'data-engineering',
'project': 'etl-pipeline',
},
},
spark_jar_task={
'main_class_name': 'com.company.ETLJob',
'parameters': [
'--date', '{{ ds }}',
'--source', 's3://data-lake/raw/',
'--target', 's3://data-lake/processed/',
],
},
libraries=[
{'jar': 's3://jars/company-etl-1.0.jar'},
{'pypi': {'package': 'pyspark==3.5.0'}},
],
timeout_seconds=3600,
polling_period_seconds=30,
trigger_rule='all_success',
)
submit_spark
DatabricksNotebookOperator
Execute Databricks notebooks as part of Airflow workflows.
from airflow.providers.databricks.operators.databricks import (
DatabricksNotebookOperator,
)
run_notebook = DatabricksNotebookOperator(
task_id='run_data_processing',
databricks_conn_id='databricks_default',
source='Repos/team/notebooks/data_processing',
source_type='WORKSPACE',
base_parameters={
'date': '{{ ds }}',
'run_type': 'scheduled',
},
new_cluster={
'spark_version': '13.3.x-scala2.12',
'node_type_id': 'Standard_DS3_v2',
'num_workers': 2,
},
timeout_seconds=1800,
)
DatabricksRunNowOperator
Trigger an existing Databricks job by ID.
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
)
trigger_existing_job = DatabricksRunNowOperator(
task_id='trigger_etl_job',
databricks_conn_id='databricks_default',
job_id=123456789,
json_params={
'date': '{{ ds }}',
'overwrite': 'true',
},
poll=True,
poll_interval=30,
timeout=3600,
)
Sensor for Job Completion
from airflow.providers.databricks.sensors.databricks import (
DatabricksJobSensor,
)
wait_for_job = DatabricksJobSensor(
task_id='wait_for_databricks_job',
databricks_conn_id='databricks_default',
run_id='{{ ti.xcom_pull(task_ids="submit_job", key="run_id") }}',
poke_interval=30,
timeout=3600,
mode='reschedule',
)
Full ETL Pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import (
DatabricksSubmitRunOperator,
DatabricksRunNowOperator,
)
from airflow.providers.databricks.sensors.databricks import (
DatabricksJobSensor,
)
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def on_job_failure(context):
SlackWebhookOperator(
task_id='alert',
slack_webhook_conn_id='slack_webhook',
message=f'Databricks job failed: {context["exception"]}',
).execute(context={})
with DAG(
dag_id='databricks_full_etl',
start_date=datetime(2024, 1, 1),
schedule_interval='0 2 * * *',
catchup=False,
default_args={
'on_failure_callback': on_job_failure,
'retries': 1,
'retry_delay': timedelta(minutes=10),
},
tags=['databricks', 'etl'],
) as dag:
submit_extract = DatabricksSubmitRunOperator(
task_id='extract_data',
new_cluster={
'spark_version': '13.3.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 2,
},
spark_jar_task={
'main_class_name': 'com.company.ExtractJob',
'parameters': ['--date', '{{ ds }}'],
},
libraries=[{'jar': 's3://jars/extract-1.0.jar'}],
timeout_seconds=3600,
)
wait_extract = DatabricksJobSensor(
task_id='wait_extract',
run_id='{{ ti.xcom_pull(task_ids="extract_data", key="run_id") }}',
poke_interval=30,
mode='reschedule',
)
submit_transform = DatabricksSubmitRunOperator(
task_id='transform_data',
existing_cluster_id='0123-456789-abcdef',
spark_jar_task={
'main_class_name': 'com.company.TransformJob',
'parameters': ['--date', '{{ ds }}'],
},
libraries=[{'jar': 's3://jars/transform-1.0.jar'}],
timeout_seconds=3600,
)
wait_transform = DatabricksJobSensor(
task_id='wait_transform',
run_id='{{ ti.xcom_pull(task_ids="transform_data", key="run_id") }}',
poke_interval=30,
mode='reschedule',
)
notify = PythonOperator(
task_id='notify_completion',
python_callable=lambda: print('ETL pipeline completed'),
)
submit_extract >> wait_extract >> submit_transform >> wait_transform >> notify
Key Concepts Table
| Operator | Purpose | Creates Cluster | Polling |
|---|---|---|---|
| DatabricksSubmitRunOperator | Submit new job | Yes or existing | Yes |
| DatabricksNotebookOperator | Run notebook | Yes or existing | Yes |
| DatabricksRunNowOperator | Trigger existing job | No | Optional |
| DatabricksJobSensor | Wait for job | No | Yes |
| DatabricksClusterLifeCycleOperator | Start/stop/restart cluster | No | Yes |
Configuration Reference
| Parameter | Description | Example |
|---|---|---|
databricks_conn_id | Airflow connection ID | databricks_default |
new_cluster | Cluster spec for new cluster | {'spark_version': '13.3.x', ...} |
existing_cluster_id | Reuse existing cluster | 0123-456789-abcdef |
spark_jar_task | JAR-based job definition | {'main_class_name': '...'} |
notebook_task | Notebook-based job | {'notebook_path': '...'} |
timeout_seconds | Max wait time | 3600 |
polling_period_seconds | Poll interval | 30 |
json_params | Job parameters | {'key': 'value'} |
Best Practices
- Use existing clusters when possible — cluster startup can take 5-10 minutes.
- Set
timeout_secondson all operators to prevent indefinite waits. - Use
mode='reschedule'on sensors to free worker slots during waits. - Tag clusters with team/project for cost tracking and management.
- Monitor job runs through Databricks job history and Airflow task logs.
- Use
librariesfor dependency management — wheel files, pip packages, or maven coordinates. - Handle
TERMINATED_WITH_ERRORSstate with retries and failure callbacks. - Leverage
json_paramsfor dynamic job parameterization via Jinja templates.
For long-running Spark jobs (>30 minutes), use DatabricksRunNowOperator with a pre-existing job configured in Databricks. This avoids the 15-minute timeout on SubmitRunOperator and gives you Databricks-native job monitoring.
The Databricks provider requires a Personal Access Token (PAT) or OAuth token stored in Airflow's connection. For managed Airflow (MWAA/Cloud Composer), use the environment's secret backend for token management.
Key Takeaways:
- The Databricks provider offers operators, sensors, and hooks for full Databricks integration
- DatabricksSubmitRunOperator creates new jobs with configurable clusters
- DatabricksRunNowOperator triggers pre-existing Databricks jobs
- Use DatabricksJobSensor with
mode='reschedule'for efficient waiting - Configure existing_cluster_id to avoid cluster startup overhead
- Always set timeout_seconds and handle failure callbacks
See Also
- Spark Provider Integration — Apache Spark with Airflow
- BigQuery Provider — BigQuery integration patterns
- Operators and Hooks — Operator lifecycle and hook architecture
- Sensors and Operators — Sensor patterns and poke modes