CW

Multi-Tenancy and Isolation in Apache Airflow

Free Lesson

Advertisement

Multi-Tenancy and Isolation

Architecture Diagram

Formal Definitions

DfMulti-Tenancy

Multi-Tenancy is an architecture where a single Airflow instance serves multiple teams or organizations (tenants) with logical isolation. Each tenant has their own DAGs, connections, variables, and resource quotas. Formally, T={t1,t2,,tn}T = \{t_1, t_2, \ldots, t_n\} where each tenant tit_i has isolated resources RiR_i.

DfResource Quota

A Resource Quota limits the resources a tenant can consume. It includes maximum concurrent tasks, pool slots, and DAG runs. The quota function is Q:T(Nmax_tasks,Spool,Rmax_runs)Q: T \rightarrow (N_{\text{max\_tasks}}, S_{\text{pool}}, R_{\text{max\_runs}}) mapping tenants to their resource limits.

DfTenant Isolation

Tenant Isolation ensures that one tenant's actions cannot affect another tenant's resources or data. Isolation levels include: (1) DAG isolation via tags, (2) resource isolation via pools, (3) credential isolation via connections, (4) network isolation via namespaces.

Detailed Explanation

RBAC Configuration for Multi-Tenancy

# webserver_config.py
from airflow.security import permissions

# Define tenant-specific roles
ROLES = {
    'TenantAlphaAdmin': [
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_ADMIN_MENU),
    ],
    'TenantAlphaViewer': [
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
    ],
    'TenantBetaAdmin': [
        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
    ],
}

# Map users to roles
AUTH_ROLES_MAP = {
    'TenantAlphaAdmin': ['admin@alpha.com', 'lead@alpha.com'],
    'TenantAlphaViewer': ['viewer@alpha.com'],
    'TenantBetaAdmin': ['admin@beta.com'],
}

# Custom access control
from airflow.www.security import AirflowSecurityManager

class TenantSecurityManager(AirflowSecurityManager):
    """Custom security manager for tenant isolation."""
    
    def can_access_dag(self, user, dag_id):
        """Check if user can access specific DAG."""
        # Extract tenant from DAG tags
        dag = self.get_dag(dag_id)
        if not dag:
            return False
        
        tenant_tags = [tag for tag in dag.tags if tag.startswith('tenant:')]
        
        # Check user's tenant
        user_tenants = self.get_user_tenants(user)
        
        return any(tag.replace('tenant:', '') in user_tenants for tag in tenant_tags)
    
    def get_user_tenants(self, user):
        """Get tenants user has access to."""
        # Implementation depends on your user model
        return user.extra_dict.get('tenants', [])

Pool-Based Resource Quotas

# airflow.cfg - Pool configuration for tenants
[core]
# Default pool slots
default_pool_slots = 128

# Define pools for each tenant
# Use CLI: airflow pools set alpha_pool 32 "Pool for Tenant Alpha"
# Use CLI: airflow pools set beta_pool 32 "Pool for Tenant Beta"

# Usage in DAGs
from airflow.decorators import task, dag
from datetime import datetime

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['tenant:alpha'],
    default_args={
        'pool': 'alpha_pool',  # Assign to tenant's pool
    },
)
def tenant_alpha_dag():
    
    @task(pool='alpha_pool', pool_slots=4)
    def alpha_processing():
        """Task using Tenant Alpha's pool."""
        return {"status": "processing"}
    
    @task(pool='alpha_pool', pool_slots=2)
    def alpha_reporting():
        """Task using Tenant Alpha's pool."""
        return {"status": "reporting"}
    
    alpha_processing() >> alpha_reporting()

tenant_alpha_dag()

Connection and Variable Isolation

# tenant_isolation.py
from airflow.models import Connection, Variable
from airflow import settings

class TenantIsolation:
    """Manage tenant-specific connections and variables."""
    
    def __init__(self, tenant_id):
        self.tenant_id = tenant_id
        self.prefix = f"{tenant_id}_"
    
    def get_connection(self, conn_id):
        """Get tenant-specific connection."""
        tenant_conn_id = f"{self.prefix}{conn_id}"
        
        session = settings.Session()
        conn = session.query(Connection).filter(
            Connection.conn_id == tenant_conn_id
        ).first()
        
        if not conn:
            # Fall back to shared connection
            conn = session.query(Connection).filter(
                Connection.conn_id == conn_id
            ).first()
        
        return conn
    
    def get_variable(self, key, default=None):
        """Get tenant-specific variable."""
        tenant_key = f"{self.prefix}{key}"
        
        try:
            return Variable.get(tenant_key)
        except KeyError:
            if default is not None:
                return default
            raise
    
    def set_variable(self, key, value):
        """Set tenant-specific variable."""
        tenant_key = f"{self.prefix}{key}"
        Variable.set(tenant_key, value)
    
    def list_connections(self):
        """List all connections for this tenant."""
        session = settings.Session()
        return session.query(Connection).filter(
            Connection.conn_id.like(f"{self.prefix}%")
        ).all()

# Usage in DAGs
from airflow.decorators import task, dag
from datetime import datetime

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    tags=['tenant:alpha'],
)
def tenant_aware_dag():
    
    @task
    def use_tenant_resources():
        """Use tenant-specific connections and variables."""
        isolation = TenantIsolation('alpha')
        
        # Get tenant-specific connection
        conn = isolation.get_connection('database')
        print(f"Using connection: {conn.conn_id}")
        
        # Get tenant-specific variable
        config = isolation.get_variable('config', default='{}')
        print(f"Config: {config}")
        
        return {"tenant": "alpha"}
    
    use_tenant_resources()

tenant_aware_dag()
Tenant Resource Utilization
Utenant=RusedRquota×100%U_{\text{tenant}} = \frac{R_{\text{used}}}{R_{\text{quota}}} \times 100\%

Here,

  • UexttenantU_{ ext{tenant}}=Tenant resource utilization
  • RextusedR_{ ext{used}}=Resources actually used by tenant
  • RextquotaR_{ ext{quota}}=Resource quota assigned to tenant

Isolation Score

I=1ijRiRjiRiI = 1 - \frac{\sum_{i \neq j} |R_i \cap R_j|}{\sum_{i} |R_i|}

Here,

  • II=Isolation score (0-1, higher is better)
  • RiR_i=Resources of tenant i
  • RjR_j=Resources of tenant j

Use DAG tags with tenant: prefix to enable tag-based access control. Configure RBAC to restrict users to their tenant's DAGs.

Set up separate pools for each tenant to prevent resource contention. Use pool_slots to limit task concurrency per tenant.

Key Concepts Table

Isolation LayerMethodImplementationGranularity
DAG AccessRBAC + Tagstags=['tenant:alpha']Per-DAG
ResourcesPoolspool='alpha_pool'Per-task
CredentialsConnectionsconn_id='alpha_db'Per-connection
ConfigurationVariablesVariable.get('alpha_config')Per-variable
LogsLog routingS3 prefix s3://logs/alpha/Per-DAG
NetworkNamespacesK8s namespace airflow-alphaPer-tenant

Code Examples

Tenant Management API

# tenant_management.py
from airflow import settings
from airflow.models import Connection, Variable, Pool
from sqlalchemy import text

class TenantManager:
    """Manage tenant provisioning and deprovisioning."""
    
    def __init__(self):
        self.session = settings.Session()
    
    def provision_tenant(self, tenant_id, config):
        """Provision a new tenant with resources."""
        
        # Create pool
        pool = Pool(
            pool=f'{tenant_id}_pool',
            slots=config.get('max_concurrent_tasks', 32),
            description=f'Pool for tenant {tenant_id}',
        )
        self.session.add(pool)
        
        # Create connections
        for conn_config in config.get('connections', []):
            conn = Connection(
                conn_id=f"{tenant_id}_{conn_config['name']}",
                conn_type=conn_config['type'],
                host=conn_config.get('host'),
                login=conn_config.get('login'),
                password=conn_config.get('password'),
                schema=conn_config.get('schema'),
            )
            self.session.add(conn)
        
        # Create variables
        for key, value in config.get('variables', {}).items():
            var = Variable(
                key=f"{tenant_id}_{key}",
                val=str(value),
            )
            self.session.add(var)
        
        self.session.commit()
        print(f"Tenant {tenant_id} provisioned successfully")
    
    def deprovision_tenant(self, tenant_id):
        """Deprovision a tenant and clean up resources."""
        
        # Delete pool
        pool = self.session.query(Pool).filter(
            Pool.pool == f'{tenant_id}_pool'
        ).first()
        if pool:
            self.session.delete(pool)
        
        # Delete connections
        connections = self.session.query(Connection).filter(
            Connection.conn_id.like(f'{tenant_id}_%')
        ).all()
        for conn in connections:
            self.session.delete(conn)
        
        # Delete variables
        variables = self.session.query(Variable).filter(
            Variable.key.like(f'{tenant_id}_%')
        ).all()
        for var in variables:
            self.session.delete(var)
        
        self.session.commit()
        print(f"Tenant {tenant_id} deprovisioned")
    
    def get_tenant_usage(self, tenant_id):
        """Get resource usage for a tenant."""
        
        # Pool usage
        pool = self.session.query(Pool).filter(
            Pool.pool == f'{tenant_id}_pool'
        ).first()
        
        pool_usage = {
            'total_slots': pool.slots if pool else 0,
            'occupied': pool.occupied_slots if pool else 0,
            'utilization': pool.occupied_slots / pool.slots if pool and pool.slots > 0 else 0,
        }
        
        # Task count
        from airflow.models import TaskInstance
        from datetime import datetime, timedelta
        
        task_count = self.session.query(TaskInstance).filter(
            TaskInstance.pool == f'{tenant_id}_pool',
            TaskInstance.execution_date >= datetime.now() - timedelta(hours=24),
        ).count()
        
        return {
            'pool': pool_usage,
            'tasks_24h': task_count,
        }

if __name__ == "__main__":
    manager = TenantManager()
    
    # Provision new tenant
    manager.provision_tenant('gamma', {
        'max_concurrent_tasks': 64,
        'connections': [
            {'name': 'database', 'type': 'postgres', 'host': 'db.gamma.com'},
        ],
        'variables': {'config': '{}'},
    })
    
    # Check usage
    usage = manager.get_tenant_usage('gamma')
    print(f"Tenant usage: {usage}")

Tenant-Aware DAG Factory

# dag_factory.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any

def create_tenant_dag(
    dag_id: str,
    tenant_id: str,
    schedule: str,
    tasks: Dict[str, Any],
):
    """Factory function to create tenant-aware DAGs."""
    
    @dag(
        dag_id=f"{tenant_id}_{dag_id}",
        schedule_interval=schedule,
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=[f'tenant:{tenant_id}', 'factory-generated'],
        default_args={
            'pool': f'{tenant_id}_pool',
            'owner': tenant_id,
        },
    )
    def tenant_dag():
        
        task_instances = {}
        
        for task_name, task_config in tasks.items():
            @task(
                task_id=task_name,
                pool=f'{tenant_id}_pool',
                pool_slots=task_config.get('slots', 1),
                retries=task_config.get('retries', 1),
            )
            def task_func(**context):
                """Execute tenant-specific task."""
                print(f"Executing {task_name} for tenant {tenant_id}")
                return {"tenant": tenant_id, "task": task_name}
            
            task_instances[task_name] = task_func()
        
        # Set dependencies
        for task_name, task_config in tasks.items():
            for upstream in task_config.get('upstream', []):
                task_instances[upstream] >> task_instances[task_name]
    
    return tenant_dag()

# Create DAGs for different tenants
alpha_etl = create_tenant_dag(
    dag_id='etl_pipeline',
    tenant_id='alpha',
    schedule='@daily',
    tasks={
        'extract': {'slots': 2, 'retries': 3},
        'transform': {'slots': 4, 'upstream': ['extract']},
        'load': {'slots': 2, 'upstream': ['transform']},
    },
)

beta_etl = create_tenant_dag(
    dag_id='etl_pipeline',
    tenant_id='beta',
    schedule='0 6 * * *',
    tasks={
        'extract': {'slots': 2, 'retries': 3},
        'transform': {'slots': 4, 'upstream': ['extract']},
        'load': {'slots': 2, 'upstream': ['transform']},
    },
)

Tenant Monitoring

# tenant_monitoring.py
from airflow import settings
from airflow.models import TaskInstance, DagRun, Pool
from sqlalchemy import func
from datetime import datetime, timedelta

class TenantMonitor:
    """Monitor tenant resource usage and performance."""
    
    def __init__(self):
        self.session = settings.Session()
    
    def get_tenant_metrics(self, tenant_id, hours=24):
        """Get comprehensive metrics for a tenant."""
        
        # Task metrics
        task_stats = self.session.query(
            TaskInstance.state,
            func.count(TaskInstance.task_id)
        ).filter(
            TaskInstance.pool.like(f'{tenant_id}_%'),
            TaskInstance.execution_date >= datetime.now() - timedelta(hours=hours)
        ).group_by(TaskInstance.state).all()
        
        # Success rate
        total_tasks = sum(count for _, count in task_stats)
        successful_tasks = dict(task_stats).get('success', 0)
        success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0
        
        # Average duration
        avg_duration = self.session.query(
            func.avg(TaskInstance.duration)
        ).filter(
            TaskInstance.pool.like(f'{tenant_id}_%'),
            TaskInstance.state == 'success',
            TaskInstance.execution_date >= datetime.now() - timedelta(hours=hours)
        ).scalar()
        
        # Pool utilization
        pool = self.session.query(Pool).filter(
            Pool.pool == f'{tenant_id}_pool'
        ).first()
        
        return {
            'tenant_id': tenant_id,
            'period_hours': hours,
            'task_stats': dict(task_stats),
            'total_tasks': total_tasks,
            'success_rate': success_rate,
            'avg_duration': avg_duration,
            'pool_utilization': pool.occupied_slots / pool.slots if pool and pool.slots > 0 else 0,
        }
    
    def compare_tenants(self, tenant_ids):
        """Compare metrics across tenants."""
        
        metrics = {}
        for tenant_id in tenant_ids:
            metrics[tenant_id] = self.get_tenant_metrics(tenant_id)
        
        # Find tenant with highest utilization
        max_utilization = max(
            metrics.items(),
            key=lambda x: x[1]['pool_utilization']
        )
        
        # Find tenant with lowest success rate
        min_success = min(
            metrics.items(),
            key=lambda x: x[1]['success_rate']
        )
        
        return {
            'tenants': metrics,
            'highest_utilization': max_utilization[0],
            'lowest_success_rate': min_success[0],
        }

if __name__ == "__main__":
    monitor = TenantMonitor()
    
    # Get metrics for all tenants
    comparison = monitor.compare_tenants(['alpha', 'beta', 'gamma'])
    
    print("Tenant Comparison:")
    for tenant_id, metrics in comparison['tenants'].items():
        print(f"\n{tenant_id}:")
        print(f"  Tasks: {metrics['total_tasks']}")
        print(f"  Success Rate: {metrics['success_rate']:.2%}")
        print(f"  Avg Duration: {metrics['avg_duration']:.2f}s")
        print(f"  Pool Utilization: {metrics['pool_utilization']:.2%}")

Performance Metrics

Multi-Tenancy Metrics

MetricTargetWarningCritical
Isolation Score> 0.90.7-0.9< 0.7
Pool Utilization< 80%80-95%> 95%
Cross-Tenant Impact0AnyMultiple
Provisioning Time< 5min5-15min> 15min

Tenant Resource Distribution

TenantPool SlotsDAG CountTask CountUsage
Alpha321520075%
Beta321015060%
Gamma642540080%

Key Takeaways:

  • Use RBAC with DAG tags for tenant-level access control
  • Create separate pools for each tenant to prevent resource contention
  • Prefix connections and variables with tenant ID for isolation
  • Monitor tenant resource usage and enforce quotas
  • Use tenant-aware DAG factories for consistent provisioning
  • Implement tenant monitoring for capacity planning

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement