CW

Security Best Practices in Apache Airflow

Free Lesson

Advertisement

Security Best Practices

Architecture Diagram

Formal Definitions

DfRole-Based Access Control (RBAC)

RBAC is a security paradigm where access to resources is determined by the role assigned to a user. In Airflow, RBAC defines permissions P={(r,o,a):rRoles,oObjects,aActions}P = \{(r, o, a) : r \in \text{Roles}, o \in \text{Objects}, a \in \text{Actions}\} where a role rr grants action aa on object oo.

DfSecrets Backend

A Secrets Backend is an external service (HashiCorp Vault, AWS Secrets Manager, GCP Secret Manager) that stores and retrieves sensitive credentials. It replaces storing secrets in the metadata database with secure, auditable, and rotatable storage.

DfEncryption at Rest

Encryption at Rest ensures data is encrypted when stored. In Airflow, this applies to the metadata database, connection passwords, variables, and XCom data. The encryption follows E(k,m)=cE(k, m) = c where kk is the encryption key, mm is plaintext, and cc is ciphertext.

Detailed Explanation

RBAC Configuration

# webserver_config.py
from airflow.security import permissions

# Define custom roles
ROLES = {
    'DataEngineer': [
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_ADMIN_MENU),
    ],
    'DataAnalyst': [
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATAMODEL),
    ],
    'PipelineViewer': [
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
    ],
}

# OAuth configuration
AUTH_TYPE = AUTH_OAUTH
OAUTH_PROVIDERS = [
    {
        'name': 'Google',
        'icon': 'fa-google',
        'token_key': 'access_token',
        'remote_app': {
            'client_id': 'YOUR_CLIENT_ID',
            'client_secret': 'YOUR_CLIENT_SECRET',
            'api_base_url': 'https://www.googleapis.com/oauth2/v3/',
            'request_token_params': {'scope': 'email profile'},
            'access_token_url': 'https://accounts.google.com/o/oauth2/token',
            'authorize_url': 'https://accounts.google.com/o/oauth2/auth',
        },
    }
]

# Role mapping
AUTH_ROLES_MAP = {
    'Admin': ['admin@company.com'],
    'DataEngineer': ['engineers@company.com'],
    'DataAnalyst': ['analysts@company.com'],
}

Secrets Backend Setup

# airflow.cfg
[secrets]
# Backend configuration
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
    "connections_path": "airflow/connections",
    "variables_path": "airflow/variables",
    "config_path": "airflow/config",
    "mount_point": "secret",
    "secret_path": "airflow",
}

# Alternative: AWS Secrets Manager
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections/"}

# Alternative: GCP Secret Manager
# backend = airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
# backend_kwargs = {"project_id": "my-project", "sep": "-"}

# Usage in DAGs - hooks automatically resolve from secrets backend
from airflow.decorators import task, dag
from datetime import datetime

@dag(schedule_interval="@daily", start_date=datetime(2024, 1, 1))
def secure_dag():
    
    @task
    def use_secret_connection():
        """Connection is retrieved from secrets backend."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        
        # This looks up 'production_db' from Vault/AWS SM
        hook = PostgresHook(postgres_conn_id='production_db')
        return hook.get_first("SELECT CURRENT_USER")[0]
    
    @task
    def use_secret_variable():
        """Variable is retrieved from secrets backend."""
        from airflow.models import Variable
        
        api_key = Variable.get("api_key")
        return f"Using key: {api_key[:4]}..."
    
    use_secret_connection() >> use_secret_variable()

secure_dag()

Connection Encryption

# encryption_config.py
from airflow.providers.postgres.hooks.postgres import PostgresHook

def create_encrypted_connection():
    """Create connection with SSL/TLS encryption."""
    conn_params = {
        'host': 'db.example.com',
        'port': 5432,
        'dbname': 'analytics',
        'user': 'airflow_user',
        'password': 'secure_password',
        'sslmode': 'require',  # Enforce SSL
        'sslcert': '/path/to/client-cert.pem',
        'sslkey': '/path/to/client-key.pem',
        'sslrootcert': '/path/to/ca-cert.pem',
    }
    
    return conn_params

def verify_encryption():
    """Verify connection is encrypted."""
    hook = PostgresHook(postgres_conn_id='encrypted_db')
    
    result = hook.get_first("""
        SELECT 
            pgssl.ssl_is_used as ssl_enabled,
            pgssl.ssl_version as ssl_version,
            pgssl.ssl_cipher as cipher
        FROM pg_stat_ssl pgssl
        JOIN pg_stat_activity pgact ON pgssl.pid = pgact.pid
        WHERE pgact.usename = current_user
    """)
    
    return {
        'ssl_enabled': result[0],
        'ssl_version': result[1],
        'cipher': result[2],
    }
RBAC Permission Check
Access(r,o,a)={grantedif (r,o,a)Proledeniedotherwise\text{Access}(r, o, a) = \begin{cases} \text{granted} & \text{if } (r, o, a) \\ & \in P_{\text{role}} \\ \text{denied} & \text{otherwise} \end{cases}

Here,

  • rr=User role
  • oo=Resource object
  • aa=Action (read, write, delete)
  • PextroleP_{ ext{role}}=Set of permissions for role

Secret Rotation Interval

Trotationmin(Tpolicy,Tcompromise_risk)T_{\text{rotation}} \leq \min(T_{\text{policy}}, T_{\text{compromise\_risk}})

Here,

  • TextrotationT_{ ext{rotation}}=Time between secret rotations
  • TextpolicyT_{ ext{policy}}=Compliance policy requirement
  • TextcompromiseriskT_{ ext{compromise_risk}}=Risk-based rotation interval

Never store secrets in DAG files or environment variables in plaintext. Use Airflow's Secrets Backend (Vault, AWS Secrets Manager, GCP Secret Manager) for all credentials.

Enable audit logging to track who accessed what secrets and when. This is required for SOC2 and GDPR compliance.

Key Concepts Table

Security LayerComponentImplementationPriority
AuthenticationWebserverOAuth, LDAP, SAMLP0
AuthorizationRBACRole-based permissionsP0
EncryptionDatabaseTLS, AES-256P0
SecretsBackendVault, AWS SMP0
NetworkInfrastructureFirewalls, VPNP1
AuditLoggingActivity logsP1
CompliancePoliciesSOC2, GDPRP2

Code Examples

Security Audit Script

# security_audit.py
from airflow import settings
from airflow.models import Connection, Variable
from sqlalchemy import text
import json

def audit_security_posture():
    """Comprehensive security audit of Airflow deployment."""
    session = settings.Session()
    findings = []
    
    # Check for plaintext passwords in metadata DB
    plaintext_conns = session.query(Connection).filter(
        Connection.conn_type.notin_(['aws', 'google_cloud_platform'])
    ).all()
    
    for conn in plaintext_conns:
        if conn.password and not conn.password.startswith('{'):
            findings.append({
                'severity': 'HIGH',
                'category': 'Secrets',
                'finding': f'Plaintext password in connection: {conn.conn_id}',
                'recommendation': 'Move to secrets backend',
            })
    
    # Check for variables with sensitive data
    sensitive_patterns = ['password', 'secret', 'key', 'token']
    variables = session.query(Variable).all()
    
    for var in variables:
        if any(pattern in var.key.lower() for pattern in sensitive_patterns):
            findings.append({
                'severity': 'MEDIUM',
                'category': 'Secrets',
                'finding': f'Potentially sensitive variable: {var.key}',
                'recommendation': 'Move to secrets backend',
            })
    
    # Check for DAGs with hardcoded secrets
    import ast
    import os
    
    dags_folder = '/opt/airflow/dags'
    for root, dirs, files in os.walk(dags_folder):
        for file in files:
            if file.endswith('.py'):
                filepath = os.path.join(root, file)
                with open(filepath, 'r') as f:
                    content = f.read()
                
                # Simple pattern matching
                for pattern in ['password=', 'secret=', 'api_key=']:
                    if pattern in content:
                        findings.append({
                            'severity': 'HIGH',
                            'category': 'Code',
                            'finding': f'Potential hardcoded secret in {filepath}',
                            'recommendation': 'Use variables or secrets backend',
                        })
    
    return findings

def generate_security_report(findings):
    """Generate security audit report."""
    report = {
        'total_findings': len(findings),
        'high_severity': len([f for f in findings if f['severity'] == 'HIGH']),
        'medium_severity': len([f for f in findings if f['severity'] == 'MEDIUM']),
        'low_severity': len([f for f in findings if f['severity'] == 'LOW']),
        'findings': findings,
    }
    
    print(json.dumps(report, indent=2))
    return report

if __name__ == "__main__":
    findings = audit_security_posture()
    generate_security_report(findings)

Network Security Configuration

# docker-compose-security.yml
version: '3.8'

services:
  airflow-webserver:
    image: apache/airflow:2.8.0
    command: webserver
    networks:
      - airflow-internal
    environment:
      - AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
      - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=False
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 2G
    # Security context
    security_opt:
      - no-new-privileges:true
    read_only: true
    tmpfs:
      - /tmp

  airflow-scheduler:
    image: apache/airflow:2.8.0
    command: scheduler
    networks:
      - airflow-internal
    environment:
      - AIRFLOW__CORE__FERNET_KEY=${FERNET_KEY}
    security_opt:
      - no-new-privileges:true

  postgres:
    image: postgres:15
    networks:
      - airflow-internal
    environment:
      POSTGRES_PASSWORD_FILE: /run/secrets/db_password
    secrets:
      - db_password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    # Disable network access except from Airflow
    # Use internal network only

  vault:
    image: hashicorp/vault:1.15
    networks:
      - airflow-internal
    cap_add:
      - IPC_LOCK
    environment:
      VAULT_ADDR: 'http://0.0.0.0:8200'
    volumes:
      - vault_data:/vault/file
    command: server -dev

networks:
  airflow-internal:
    driver: bridge
    internal: true  # No external access

secrets:
  db_password:
    file: ./secrets/db_password.txt

volumes:
  postgres_data:
  vault_data:

Performance Metrics

Security Posture Score

MetricScoreWeightStatus
Secrets in Backend100%30%PASS
TLS Enabled100%25%PASS
RBAC Configured80%20%PASS
Audit Logging90%15%PASS
Network Segmentation70%10%WARNING

Compliance Checklist

RequirementStatusEvidenceOwner
SOC2 - Access ControlsPASSRBAC configuredSecurity
SOC2 - EncryptionPASSTLS 1.3 enabledPlatform
GDPR - Data ProtectionPASSEncryption at restPlatform
GDPR - Audit TrailPASSLogging enabledSecurity
HIPAA - PHI HandlingN/ANo PHI processed-

Key Takeaways:

  • Use OAuth/LDAP/SAML for authentication; never use default credentials
  • Implement RBAC with least-privilege principles
  • Store all secrets in a Secrets Backend (Vault, AWS SM, GCP SM)
  • Enable TLS for all connections; encrypt data at rest
  • Enable audit logging for compliance (SOC2, GDPR)
  • Segment networks; use internal networks for Airflow components
  • Rotate secrets regularly; automate rotation where possible

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement