CW

Variable Management and Templates in Apache Airflow

Free Lesson

Advertisement

Variable Management and Templates

Architecture Diagram

Formal Definitions

DfVariable

A Variable in Airflow is a key-value pair stored in the metadata database, accessible via Variable.get() or {{ var.value.key }} in templates. Variables support string, JSON, and serialized data types. Formally, V=(k,v,t)V = (k, v, t) where kk is the key, vv is the value, and tt is the data type.

DfJinja Template

A Jinja template is a string containing template expressions that are resolved at task execution time. Airflow uses Jinja2 templating with additional context variables (execution date, task instance, etc.). A template T={e1,e2,,en}T = \{e_1, e_2, \ldots, e_n\} where each eie_i is a template expression.

DfVariable Cache

The Variable Cache stores recently accessed Variables in memory to reduce database queries. The cache uses a TTL-based eviction policy with configurable duration. Cache hit ratio Rcache=NhitsNtotalR_{\text{cache}} = \frac{N_{\text{hits}}}{N_{\text{total}}} should exceed 90% for optimal performance.

Detailed Explanation

Variable Basics

Variables provide a simple way to store configuration values that can be accessed across DAGs and tasks.

from airflow.models import Variable

# Get variable with default value
api_key = Variable.get("api_key", default_var="default_key")

# Get JSON variable
config = Variable.get("processing_config", deserialize_json=True)
# config is now a Python dict

# Set variable programmatically
Variable.set("last_run_date", "2024-01-15")
Variable.set("feature_flags", {"enable_new_feature": True}, serialize_json=True)

# Using CLI
# airflow variables set my_variable "my_value"
# airflow variables get my_variable
# airflow variables export /path/to/variables.json

Jinja Template Variables

Airflow provides built-in template variables for use in task parameters.

from airflow.decorators import task, dag
from datetime import datetime, timedelta

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['templating'],
)
def templating_dag():
    
    @task
    def process_data(
        execution_date,  # Automatically injected
        ds,              # Execution date as YYYY-MM-DD
        ds_nodash,       # Execution date as YYYYMMDD
        ts,              # Execution timestamp
        prev_execution_date,  # Previous execution date
        params,          # DAG run parameters
    ):
        """Template variables are automatically injected."""
        print(f"Processing data for: {ds}")
        print(f"Timestamp: {ts}")
        print(f"Previous run: {prev_execution_date}")
        print(f"Parameters: {params}")
    
    # Template strings in operator arguments
    @task
    def query_database(**context):
        """Use templates in SQL queries."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        hook = PostgresHook(postgres_conn_id='postgres_default')
        
        # Template rendered at runtime
        query = """
            SELECT * FROM events 
            WHERE event_date = '{{ ds }}'
            AND hour = {{ params.hour }}
        """
        
        df = hook.get_pandas_df(query)
        return len(df)
    
    process_data()
    query_database(params={"hour": 12})

templating_dag()

Variable Templates

from airflow.decorators import task, dag
from datetime import datetime

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['variable-templates'],
)
def variable_template_dag():
    
    @task
    def use_variable_template():
        """Access variables using Jinja templates."""
        # In operator arguments, use Jinja syntax
        # The actual variable access happens at runtime
        from airflow.models import Variable
        
        # Direct access
        bucket = Variable.get("data_bucket")
        prefix = Variable.get("data_prefix", default_var="raw/")
        
        return f"s3://{bucket}/{prefix}"
    
    @task
    def configure_from_variables():
        """Load configuration from multiple variables."""
        from airflow.models import Variable
        import json
        
        # Load structured config
        config = Variable.get("pipeline_config", deserialize_json=True)
        
        # Access individual settings
        max_retries = Variable.get("max_retries", default_var="3")
        timeout = Variable.get("timeout_seconds", default_var="300")
        
        return {
            "bucket": config.get("bucket"),
            "region": config.get("region"),
            "max_retries": int(max_retries),
            "timeout": int(timeout),
        }
    
    config = configure_from_variables()
    use_variable_template()

variable_template_dag()
Variable Cache TTL
Tcache=Taccess+ΔTTTLT_{\text{cache}} = T_{\text{access}} + \Delta T_{\text{TTL}}

Here,

  • TextcacheT_{ ext{cache}}=Time when cached value expires
  • TextaccessT_{ ext{access}}=Time of last cache access
  • DeltaTextTTLDelta T_{ ext{TTL}}=Configured TTL duration

Variable Lookup Performance

Lvar={Lcacheif cache hitLdb+Lcache_updateif cache missL_{\text{var}} = \begin{cases} L_{\text{cache}} & \text{if cache hit} \\ L_{\text{db}} + L_{\text{cache\_update}} & \text{if cache miss} \end{cases}

Here,

  • LextvarL_{ ext{var}}=Total variable lookup latency
  • LextcacheL_{ ext{cache}}=Cache lookup latency (~1μs)
  • LextdbL_{ ext{db}}=Database lookup latency (~5ms)
  • LextcacheupdateL_{ ext{cache_update}}=Cache update overhead (~1ms)

Variables are cached in memory with a default TTL of 5 minutes. Frequent access to the same variable will use the cached value, reducing database load. Use Variable.get() sparingly in loops; fetch once and reuse.

For sensitive data like API keys, use Airflow's Secrets Backend (Vault, AWS Secrets Manager) instead of Variables. Variables are stored in plaintext in the metadata database.

Key Concepts Table

Variable TypeStorageAccess MethodUse Case
StringMetadata DBVariable.get("key")Simple config
JSONMetadata DBVariable.get("key", deserialize_json=True)Complex config
EnvironmentOS EnvironmentVariable.get("key") with env prefixSecrets, Docker
Secrets BackendExternal vaultVariable.get("key") with backendProduction secrets
DAG ParametersDAG run{{ params.key }}Runtime config

Code Examples

Advanced Templating Patterns

from airflow.decorators import task, dag
from datetime import datetime, timedelta
from typing import Dict, Any

@dag(
    schedule_interval="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['templating', 'advanced'],
    params={
        "environment": "production",
        "dry_run": False,
        "batch_size": 1000,
    },
)
def advanced_templating_dag():
    
    @task
    def process_with_templates(
        ds: str,
        ds_nodash: str,
        execution_date: datetime,
        prev_execution_date: datetime,
        params: Dict[str, Any],
        **context,
    ):
        """Demonstrate advanced template usage."""
        from airflow.models import Variable
        
        # Access DAG run parameters
        environment = params.get("environment", "development")
        dry_run = params.get("dry_run", False)
        batch_size = params.get("batch_size", 1000)
        
        # Access variables
        config = Variable.get(f"{environment}_config", deserialize_json=True)
        
        # Build dynamic query
        query = f"""
            SELECT * FROM events 
            WHERE event_date >= '{prev_execution_date.strftime("%Y-%m-%d")}'
            AND event_date < '{ds}'
            AND environment = '{environment}'
            LIMIT {batch_size}
        """
        
        print(f"Environment: {environment}")
        print(f"Dry run: {dry_run}")
        print(f"Query:\n{query}")
        
        return {
            "environment": environment,
            "dry_run": dry_run,
            "batch_size": batch_size,
            "query": query,
        }
    
    @task
    def generate_report(config: dict):
        """Generate report with templated content."""
        from airflow.models import Variable
        
        report_template = Variable.get("report_template")
        
        # Use Jinja2 directly for complex templating
        from jinja2 import Template
        
        template = Template(report_template)
        report = template.render(
            environment=config["environment"],
            date=config.get("execution_date"),
            batch_size=config["batch_size"],
        )
        
        return report
    
    config = process_with_templates()
    generate_report(config)

advanced_templating_dag()

Variable Caching Strategy

# variable_caching.py
from airflow.models import Variable
from airflow import settings
from datetime import datetime, timedelta
import json

class VariableCache:
    """Custom variable caching with TTL."""
    
    def __init__(self, ttl_seconds=300):
        self.ttl = timedelta(seconds=ttl_seconds)
        self.cache = {}
        self.last_access = {}
    
    def get(self, key, default=None, deserialize_json=False):
        """Get variable with caching."""
        now = datetime.now()
        
        # Check cache
        if key in self.cache:
            if now - self.last_access[key] < self.ttl:
                value = self.cache[key]
                if deserialize_json and isinstance(value, str):
                    return json.loads(value)
                return value
        
        # Cache miss - fetch from database
        try:
            value = Variable.get(key, default_var=default)
            self.cache[key] = value
            self.last_access[key] = now
            
            if deserialize_json and isinstance(value, str):
                return json.loads(value)
            return value
        except Exception as e:
            print(f"Error fetching variable {key}: {e}")
            return default
    
    def set(self, key, value, serialize_json=False):
        """Set variable and update cache."""
        if serialize_json:
            value = json.dumps(value)
        
        Variable.set(key, value)
        self.cache[key] = value
        self.last_access[key] = datetime.now()
    
    def invalidate(self, key):
        """Invalidate cached variable."""
        self.cache.pop(key, None)
        self.last_access.pop(key, None)
    
    def clear(self):
        """Clear entire cache."""
        self.cache.clear()
        self.last_access.clear()

# Usage in DAG
variable_cache = VariableCache(ttl_seconds=600)

@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1))
def cached_variable_dag():
    
    @task
    def process_data():
        """Use cached variable access."""
        config = variable_cache.get("pipeline_config", deserialize_json=True)
        api_key = variable_cache.get("api_key")
        
        print(f"Config: {config}")
        print(f"Using API key: {api_key[:4]}...")
        
        return {"status": "success"}
    
    process_data()

cached_variable_dag()

Variable-Based DAG Configuration

from airflow.decorators import task, dag
from datetime import datetime
from airflow.models import Variable
import json

# Load environment-specific configuration
ENVIRONMENT = Variable.get("environment", default_var="development")
CONFIG = Variable.get(f"{ENVIRONMENT}_config", deserialize_json=True)

@dag(
    schedule_interval=CONFIG.get("schedule", "@daily"),
    start_date=datetime(2024, 1, 1),
    catchup=CONFIG.get("catchup", False),
    tags=[ENVIRONMENT, 'config-driven'],
    max_active_runs=CONFIG.get("max_active_runs", 1),
)
def config_driven_dag():
    
    @task
    def extract():
        """Extract using environment config."""
        source = CONFIG.get("source")
        connection_id = CONFIG.get(f"{source}_conn_id")
        
        print(f"Extracting from {source} using {connection_id}")
        return {"source": source, "connection_id": connection_id}
    
    @task
    def transform(extract_result: dict):
        """Transform using environment config."""
        transformations = CONFIG.get("transformations", [])
        
        print(f"Applying {len(transformations)} transformations")
        for t in transformations:
            print(f"  - {t}")
        
        return extract_result
    
    @task
    def load(transform_result: dict):
        """Load using environment config."""
        destination = CONFIG.get("destination")
        table = CONFIG.get("load_table")
        
        print(f"Loading to {destination}.{table}")
        return {"status": "success", "records": 0}
    
    extract() >> transform() >> load()

config_driven_dag()

Performance Metrics

Variable Access Patterns

Access PatternLatencyUse Case
Direct Variable.get()~5msOne-time access
Cached access~1μsRepeated access
Template variable~0ms (pre-rendered)DAG parameter
Secrets Backend~50msSecure values

Variable vs Parameter Comparison

FeatureVariableDAG ParameterTemplate Variable
StorageMetadata DBDAG RunPre-rendered
ScopeGlobalPer-runPer-task
MutabilityMutableImmutableImmutable
Access PatternVariable.get()params.key{{ var.value.key }}
Performance~5ms~0ms~0ms
Use CaseShared configRuntime configTask config

Key Takeaways:

  • Variables are global key-value pairs stored in the metadata database
  • Use Variable.get(key, deserialize_json=True) for JSON configuration
  • Variable caching reduces database load; default TTL is 5 minutes
  • Template variables ({{ var.value.key }}) provide concise access in operator arguments
  • For sensitive data, use Secrets Backends instead of plaintext Variables
  • DAG Parameters enable runtime configuration without modifying DAG code

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement