Variable Management and Templates
Architecture Diagram
Formal Definitions
DfVariable
A Variable in Airflow is a key-value pair stored in the metadata database, accessible via Variable.get() or {{ var.value.key }} in templates. Variables support string, JSON, and serialized data types. Formally, where is the key, is the value, and is the data type.
DfJinja Template
A Jinja template is a string containing template expressions that are resolved at task execution time. Airflow uses Jinja2 templating with additional context variables (execution date, task instance, etc.). A template where each is a template expression.
DfVariable Cache
The Variable Cache stores recently accessed Variables in memory to reduce database queries. The cache uses a TTL-based eviction policy with configurable duration. Cache hit ratio should exceed 90% for optimal performance.
Detailed Explanation
Variable Basics
Variables provide a simple way to store configuration values that can be accessed across DAGs and tasks.
from airflow.models import Variable
# Get variable with default value
api_key = Variable.get("api_key", default_var="default_key")
# Get JSON variable
config = Variable.get("processing_config", deserialize_json=True)
# config is now a Python dict
# Set variable programmatically
Variable.set("last_run_date", "2024-01-15")
Variable.set("feature_flags", {"enable_new_feature": True}, serialize_json=True)
# Using CLI
# airflow variables set my_variable "my_value"
# airflow variables get my_variable
# airflow variables export /path/to/variables.json
Jinja Template Variables
Airflow provides built-in template variables for use in task parameters.
from airflow.decorators import task, dag
from datetime import datetime, timedelta
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['templating'],
)
def templating_dag():
@task
def process_data(
execution_date, # Automatically injected
ds, # Execution date as YYYY-MM-DD
ds_nodash, # Execution date as YYYYMMDD
ts, # Execution timestamp
prev_execution_date, # Previous execution date
params, # DAG run parameters
):
"""Template variables are automatically injected."""
print(f"Processing data for: {ds}")
print(f"Timestamp: {ts}")
print(f"Previous run: {prev_execution_date}")
print(f"Parameters: {params}")
# Template strings in operator arguments
@task
def query_database(**context):
"""Use templates in SQL queries."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='postgres_default')
# Template rendered at runtime
query = """
SELECT * FROM events
WHERE event_date = '{{ ds }}'
AND hour = {{ params.hour }}
"""
df = hook.get_pandas_df(query)
return len(df)
process_data()
query_database(params={"hour": 12})
templating_dag()
Variable Templates
from airflow.decorators import task, dag
from datetime import datetime
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['variable-templates'],
)
def variable_template_dag():
@task
def use_variable_template():
"""Access variables using Jinja templates."""
# In operator arguments, use Jinja syntax
# The actual variable access happens at runtime
from airflow.models import Variable
# Direct access
bucket = Variable.get("data_bucket")
prefix = Variable.get("data_prefix", default_var="raw/")
return f"s3://{bucket}/{prefix}"
@task
def configure_from_variables():
"""Load configuration from multiple variables."""
from airflow.models import Variable
import json
# Load structured config
config = Variable.get("pipeline_config", deserialize_json=True)
# Access individual settings
max_retries = Variable.get("max_retries", default_var="3")
timeout = Variable.get("timeout_seconds", default_var="300")
return {
"bucket": config.get("bucket"),
"region": config.get("region"),
"max_retries": int(max_retries),
"timeout": int(timeout),
}
config = configure_from_variables()
use_variable_template()
variable_template_dag()
Here,
- =Time when cached value expires
- =Time of last cache access
- =Configured TTL duration
Variable Lookup Performance
Here,
- =Total variable lookup latency
- =Cache lookup latency (~1μs)
- =Database lookup latency (~5ms)
- =Cache update overhead (~1ms)
Variables are cached in memory with a default TTL of 5 minutes. Frequent access to the same variable will use the cached value, reducing database load. Use Variable.get() sparingly in loops; fetch once and reuse.
For sensitive data like API keys, use Airflow's Secrets Backend (Vault, AWS Secrets Manager) instead of Variables. Variables are stored in plaintext in the metadata database.
Key Concepts Table
| Variable Type | Storage | Access Method | Use Case |
|---|---|---|---|
| String | Metadata DB | Variable.get("key") | Simple config |
| JSON | Metadata DB | Variable.get("key", deserialize_json=True) | Complex config |
| Environment | OS Environment | Variable.get("key") with env prefix | Secrets, Docker |
| Secrets Backend | External vault | Variable.get("key") with backend | Production secrets |
| DAG Parameters | DAG run | {{ params.key }} | Runtime config |
Code Examples
Advanced Templating Patterns
from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Dict, Any
@dag(
schedule_interval="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['templating', 'advanced'],
params={
"environment": "production",
"dry_run": False,
"batch_size": 1000,
},
)
def advanced_templating_dag():
@task
def process_with_templates(
ds: str,
ds_nodash: str,
execution_date: datetime,
prev_execution_date: datetime,
params: Dict[str, Any],
**context,
):
"""Demonstrate advanced template usage."""
from airflow.models import Variable
# Access DAG run parameters
environment = params.get("environment", "development")
dry_run = params.get("dry_run", False)
batch_size = params.get("batch_size", 1000)
# Access variables
config = Variable.get(f"{environment}_config", deserialize_json=True)
# Build dynamic query
query = f"""
SELECT * FROM events
WHERE event_date >= '{prev_execution_date.strftime("%Y-%m-%d")}'
AND event_date < '{ds}'
AND environment = '{environment}'
LIMIT {batch_size}
"""
print(f"Environment: {environment}")
print(f"Dry run: {dry_run}")
print(f"Query:\n{query}")
return {
"environment": environment,
"dry_run": dry_run,
"batch_size": batch_size,
"query": query,
}
@task
def generate_report(config: dict):
"""Generate report with templated content."""
from airflow.models import Variable
report_template = Variable.get("report_template")
# Use Jinja2 directly for complex templating
from jinja2 import Template
template = Template(report_template)
report = template.render(
environment=config["environment"],
date=config.get("execution_date"),
batch_size=config["batch_size"],
)
return report
config = process_with_templates()
generate_report(config)
advanced_templating_dag()
Variable Caching Strategy
# variable_caching.py
from airflow.models import Variable
from airflow import settings
from datetime import datetime, timedelta
import json
class VariableCache:
"""Custom variable caching with TTL."""
def __init__(self, ttl_seconds=300):
self.ttl = timedelta(seconds=ttl_seconds)
self.cache = {}
self.last_access = {}
def get(self, key, default=None, deserialize_json=False):
"""Get variable with caching."""
now = datetime.now()
# Check cache
if key in self.cache:
if now - self.last_access[key] < self.ttl:
value = self.cache[key]
if deserialize_json and isinstance(value, str):
return json.loads(value)
return value
# Cache miss - fetch from database
try:
value = Variable.get(key, default_var=default)
self.cache[key] = value
self.last_access[key] = now
if deserialize_json and isinstance(value, str):
return json.loads(value)
return value
except Exception as e:
print(f"Error fetching variable {key}: {e}")
return default
def set(self, key, value, serialize_json=False):
"""Set variable and update cache."""
if serialize_json:
value = json.dumps(value)
Variable.set(key, value)
self.cache[key] = value
self.last_access[key] = datetime.now()
def invalidate(self, key):
"""Invalidate cached variable."""
self.cache.pop(key, None)
self.last_access.pop(key, None)
def clear(self):
"""Clear entire cache."""
self.cache.clear()
self.last_access.clear()
# Usage in DAG
variable_cache = VariableCache(ttl_seconds=600)
@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1))
def cached_variable_dag():
@task
def process_data():
"""Use cached variable access."""
config = variable_cache.get("pipeline_config", deserialize_json=True)
api_key = variable_cache.get("api_key")
print(f"Config: {config}")
print(f"Using API key: {api_key[:4]}...")
return {"status": "success"}
process_data()
cached_variable_dag()
Variable-Based DAG Configuration
from airflow.decorators import task, dag
from datetime import datetime
from airflow.models import Variable
import json
# Load environment-specific configuration
ENVIRONMENT = Variable.get("environment", default_var="development")
CONFIG = Variable.get(f"{ENVIRONMENT}_config", deserialize_json=True)
@dag(
schedule_interval=CONFIG.get("schedule", "@daily"),
start_date=datetime(2024, 1, 1),
catchup=CONFIG.get("catchup", False),
tags=[ENVIRONMENT, 'config-driven'],
max_active_runs=CONFIG.get("max_active_runs", 1),
)
def config_driven_dag():
@task
def extract():
"""Extract using environment config."""
source = CONFIG.get("source")
connection_id = CONFIG.get(f"{source}_conn_id")
print(f"Extracting from {source} using {connection_id}")
return {"source": source, "connection_id": connection_id}
@task
def transform(extract_result: dict):
"""Transform using environment config."""
transformations = CONFIG.get("transformations", [])
print(f"Applying {len(transformations)} transformations")
for t in transformations:
print(f" - {t}")
return extract_result
@task
def load(transform_result: dict):
"""Load using environment config."""
destination = CONFIG.get("destination")
table = CONFIG.get("load_table")
print(f"Loading to {destination}.{table}")
return {"status": "success", "records": 0}
extract() >> transform() >> load()
config_driven_dag()
Performance Metrics
Variable Access Patterns
| Access Pattern | Latency | Use Case |
|---|---|---|
| Direct Variable.get() | ~5ms | One-time access |
| Cached access | ~1μs | Repeated access |
| Template variable | ~0ms (pre-rendered) | DAG parameter |
| Secrets Backend | ~50ms | Secure values |
Variable vs Parameter Comparison
| Feature | Variable | DAG Parameter | Template Variable |
|---|---|---|---|
| Storage | Metadata DB | DAG Run | Pre-rendered |
| Scope | Global | Per-run | Per-task |
| Mutability | Mutable | Immutable | Immutable |
| Access Pattern | Variable.get() | params.key | {{ var.value.key }} |
| Performance | ~5ms | ~0ms | ~0ms |
| Use Case | Shared config | Runtime config | Task config |
Key Takeaways:
- Variables are global key-value pairs stored in the metadata database
- Use
Variable.get(key, deserialize_json=True)for JSON configuration - Variable caching reduces database load; default TTL is 5 minutes
- Template variables (
{{ var.value.key }}) provide concise access in operator arguments - For sensitive data, use Secrets Backends instead of plaintext Variables
- DAG Parameters enable runtime configuration without modifying DAG code
See Also
- XCom Communications — Task-to-task data passing
- Connection Management — External system connections
- Security Best Practices — Secrets management
- DAG Design Patterns — DAG configuration patterns