Multi-Tenancy and Isolation
Architecture Diagram
Formal Definitions
DfMulti-Tenancy
Multi-Tenancy is an architecture where a single Airflow instance serves multiple teams or organizations (tenants) with logical isolation. Each tenant has their own DAGs, connections, variables, and resource quotas. Formally, where each tenant has isolated resources .
DfResource Quota
A Resource Quota limits the resources a tenant can consume. It includes maximum concurrent tasks, pool slots, and DAG runs. The quota function is mapping tenants to their resource limits.
DfTenant Isolation
Tenant Isolation ensures that one tenant's actions cannot affect another tenant's resources or data. Isolation levels include: (1) DAG isolation via tags, (2) resource isolation via pools, (3) credential isolation via connections, (4) network isolation via namespaces.
Detailed Explanation
RBAC Configuration for Multi-Tenancy
# webserver_config.py
from airflow.security import permissions
# Define tenant-specific roles
ROLES = {
'TenantAlphaAdmin': [
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_ADMIN_MENU),
],
'TenantAlphaViewer': [
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
],
'TenantBetaAdmin': [
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
],
}
# Map users to roles
AUTH_ROLES_MAP = {
'TenantAlphaAdmin': ['admin@alpha.com', 'lead@alpha.com'],
'TenantAlphaViewer': ['viewer@alpha.com'],
'TenantBetaAdmin': ['admin@beta.com'],
}
# Custom access control
from airflow.www.security import AirflowSecurityManager
class TenantSecurityManager(AirflowSecurityManager):
"""Custom security manager for tenant isolation."""
def can_access_dag(self, user, dag_id):
"""Check if user can access specific DAG."""
# Extract tenant from DAG tags
dag = self.get_dag(dag_id)
if not dag:
return False
tenant_tags = [tag for tag in dag.tags if tag.startswith('tenant:')]
# Check user's tenant
user_tenants = self.get_user_tenants(user)
return any(tag.replace('tenant:', '') in user_tenants for tag in tenant_tags)
def get_user_tenants(self, user):
"""Get tenants user has access to."""
# Implementation depends on your user model
return user.extra_dict.get('tenants', [])
Pool-Based Resource Quotas
# airflow.cfg - Pool configuration for tenants
[core]
# Default pool slots
default_pool_slots = 128
# Define pools for each tenant
# Use CLI: airflow pools set alpha_pool 32 "Pool for Tenant Alpha"
# Use CLI: airflow pools set beta_pool 32 "Pool for Tenant Beta"
# Usage in DAGs
from airflow.decorators import task, dag
from datetime import datetime
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['tenant:alpha'],
default_args={
'pool': 'alpha_pool', # Assign to tenant's pool
},
)
def tenant_alpha_dag():
@task(pool='alpha_pool', pool_slots=4)
def alpha_processing():
"""Task using Tenant Alpha's pool."""
return {"status": "processing"}
@task(pool='alpha_pool', pool_slots=2)
def alpha_reporting():
"""Task using Tenant Alpha's pool."""
return {"status": "reporting"}
alpha_processing() >> alpha_reporting()
tenant_alpha_dag()
Connection and Variable Isolation
# tenant_isolation.py
from airflow.models import Connection, Variable
from airflow import settings
class TenantIsolation:
"""Manage tenant-specific connections and variables."""
def __init__(self, tenant_id):
self.tenant_id = tenant_id
self.prefix = f"{tenant_id}_"
def get_connection(self, conn_id):
"""Get tenant-specific connection."""
tenant_conn_id = f"{self.prefix}{conn_id}"
session = settings.Session()
conn = session.query(Connection).filter(
Connection.conn_id == tenant_conn_id
).first()
if not conn:
# Fall back to shared connection
conn = session.query(Connection).filter(
Connection.conn_id == conn_id
).first()
return conn
def get_variable(self, key, default=None):
"""Get tenant-specific variable."""
tenant_key = f"{self.prefix}{key}"
try:
return Variable.get(tenant_key)
except KeyError:
if default is not None:
return default
raise
def set_variable(self, key, value):
"""Set tenant-specific variable."""
tenant_key = f"{self.prefix}{key}"
Variable.set(tenant_key, value)
def list_connections(self):
"""List all connections for this tenant."""
session = settings.Session()
return session.query(Connection).filter(
Connection.conn_id.like(f"{self.prefix}%")
).all()
# Usage in DAGs
from airflow.decorators import task, dag
from datetime import datetime
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
tags=['tenant:alpha'],
)
def tenant_aware_dag():
@task
def use_tenant_resources():
"""Use tenant-specific connections and variables."""
isolation = TenantIsolation('alpha')
# Get tenant-specific connection
conn = isolation.get_connection('database')
print(f"Using connection: {conn.conn_id}")
# Get tenant-specific variable
config = isolation.get_variable('config', default='{}')
print(f"Config: {config}")
return {"tenant": "alpha"}
use_tenant_resources()
tenant_aware_dag()
Here,
- =Tenant resource utilization
- =Resources actually used by tenant
- =Resource quota assigned to tenant
Isolation Score
Here,
- =Isolation score (0-1, higher is better)
- =Resources of tenant i
- =Resources of tenant j
Use DAG tags with tenant: prefix to enable tag-based access control. Configure RBAC to restrict users to their tenant's DAGs.
Set up separate pools for each tenant to prevent resource contention. Use pool_slots to limit task concurrency per tenant.
Key Concepts Table
| Isolation Layer | Method | Implementation | Granularity |
|---|---|---|---|
| DAG Access | RBAC + Tags | tags=['tenant:alpha'] | Per-DAG |
| Resources | Pools | pool='alpha_pool' | Per-task |
| Credentials | Connections | conn_id='alpha_db' | Per-connection |
| Configuration | Variables | Variable.get('alpha_config') | Per-variable |
| Logs | Log routing | S3 prefix s3://logs/alpha/ | Per-DAG |
| Network | Namespaces | K8s namespace airflow-alpha | Per-tenant |
Code Examples
Tenant Management API
# tenant_management.py
from airflow import settings
from airflow.models import Connection, Variable, Pool
from sqlalchemy import text
class TenantManager:
"""Manage tenant provisioning and deprovisioning."""
def __init__(self):
self.session = settings.Session()
def provision_tenant(self, tenant_id, config):
"""Provision a new tenant with resources."""
# Create pool
pool = Pool(
pool=f'{tenant_id}_pool',
slots=config.get('max_concurrent_tasks', 32),
description=f'Pool for tenant {tenant_id}',
)
self.session.add(pool)
# Create connections
for conn_config in config.get('connections', []):
conn = Connection(
conn_id=f"{tenant_id}_{conn_config['name']}",
conn_type=conn_config['type'],
host=conn_config.get('host'),
login=conn_config.get('login'),
password=conn_config.get('password'),
schema=conn_config.get('schema'),
)
self.session.add(conn)
# Create variables
for key, value in config.get('variables', {}).items():
var = Variable(
key=f"{tenant_id}_{key}",
val=str(value),
)
self.session.add(var)
self.session.commit()
print(f"Tenant {tenant_id} provisioned successfully")
def deprovision_tenant(self, tenant_id):
"""Deprovision a tenant and clean up resources."""
# Delete pool
pool = self.session.query(Pool).filter(
Pool.pool == f'{tenant_id}_pool'
).first()
if pool:
self.session.delete(pool)
# Delete connections
connections = self.session.query(Connection).filter(
Connection.conn_id.like(f'{tenant_id}_%')
).all()
for conn in connections:
self.session.delete(conn)
# Delete variables
variables = self.session.query(Variable).filter(
Variable.key.like(f'{tenant_id}_%')
).all()
for var in variables:
self.session.delete(var)
self.session.commit()
print(f"Tenant {tenant_id} deprovisioned")
def get_tenant_usage(self, tenant_id):
"""Get resource usage for a tenant."""
# Pool usage
pool = self.session.query(Pool).filter(
Pool.pool == f'{tenant_id}_pool'
).first()
pool_usage = {
'total_slots': pool.slots if pool else 0,
'occupied': pool.occupied_slots if pool else 0,
'utilization': pool.occupied_slots / pool.slots if pool and pool.slots > 0 else 0,
}
# Task count
from airflow.models import TaskInstance
from datetime import datetime, timedelta
task_count = self.session.query(TaskInstance).filter(
TaskInstance.pool == f'{tenant_id}_pool',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=24),
).count()
return {
'pool': pool_usage,
'tasks_24h': task_count,
}
if __name__ == "__main__":
manager = TenantManager()
# Provision new tenant
manager.provision_tenant('gamma', {
'max_concurrent_tasks': 64,
'connections': [
{'name': 'database', 'type': 'postgres', 'host': 'db.gamma.com'},
],
'variables': {'config': '{}'},
})
# Check usage
usage = manager.get_tenant_usage('gamma')
print(f"Tenant usage: {usage}")
Tenant-Aware DAG Factory
# dag_factory.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
def create_tenant_dag(
dag_id: str,
tenant_id: str,
schedule: str,
tasks: Dict[str, Any],
):
"""Factory function to create tenant-aware DAGs."""
@dag(
dag_id=f"{tenant_id}_{dag_id}",
schedule_interval=schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[f'tenant:{tenant_id}', 'factory-generated'],
default_args={
'pool': f'{tenant_id}_pool',
'owner': tenant_id,
},
)
def tenant_dag():
task_instances = {}
for task_name, task_config in tasks.items():
@task(
task_id=task_name,
pool=f'{tenant_id}_pool',
pool_slots=task_config.get('slots', 1),
retries=task_config.get('retries', 1),
)
def task_func(**context):
"""Execute tenant-specific task."""
print(f"Executing {task_name} for tenant {tenant_id}")
return {"tenant": tenant_id, "task": task_name}
task_instances[task_name] = task_func()
# Set dependencies
for task_name, task_config in tasks.items():
for upstream in task_config.get('upstream', []):
task_instances[upstream] >> task_instances[task_name]
return tenant_dag()
# Create DAGs for different tenants
alpha_etl = create_tenant_dag(
dag_id='etl_pipeline',
tenant_id='alpha',
schedule='@daily',
tasks={
'extract': {'slots': 2, 'retries': 3},
'transform': {'slots': 4, 'upstream': ['extract']},
'load': {'slots': 2, 'upstream': ['transform']},
},
)
beta_etl = create_tenant_dag(
dag_id='etl_pipeline',
tenant_id='beta',
schedule='0 6 * * *',
tasks={
'extract': {'slots': 2, 'retries': 3},
'transform': {'slots': 4, 'upstream': ['extract']},
'load': {'slots': 2, 'upstream': ['transform']},
},
)
Tenant Monitoring
# tenant_monitoring.py
from airflow import settings
from airflow.models import TaskInstance, DagRun, Pool
from sqlalchemy import func
from datetime import datetime, timedelta
class TenantMonitor:
"""Monitor tenant resource usage and performance."""
def __init__(self):
self.session = settings.Session()
def get_tenant_metrics(self, tenant_id, hours=24):
"""Get comprehensive metrics for a tenant."""
# Task metrics
task_stats = self.session.query(
TaskInstance.state,
func.count(TaskInstance.task_id)
).filter(
TaskInstance.pool.like(f'{tenant_id}_%'),
TaskInstance.execution_date >= datetime.now() - timedelta(hours=hours)
).group_by(TaskInstance.state).all()
# Success rate
total_tasks = sum(count for _, count in task_stats)
successful_tasks = dict(task_stats).get('success', 0)
success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0
# Average duration
avg_duration = self.session.query(
func.avg(TaskInstance.duration)
).filter(
TaskInstance.pool.like(f'{tenant_id}_%'),
TaskInstance.state == 'success',
TaskInstance.execution_date >= datetime.now() - timedelta(hours=hours)
).scalar()
# Pool utilization
pool = self.session.query(Pool).filter(
Pool.pool == f'{tenant_id}_pool'
).first()
return {
'tenant_id': tenant_id,
'period_hours': hours,
'task_stats': dict(task_stats),
'total_tasks': total_tasks,
'success_rate': success_rate,
'avg_duration': avg_duration,
'pool_utilization': pool.occupied_slots / pool.slots if pool and pool.slots > 0 else 0,
}
def compare_tenants(self, tenant_ids):
"""Compare metrics across tenants."""
metrics = {}
for tenant_id in tenant_ids:
metrics[tenant_id] = self.get_tenant_metrics(tenant_id)
# Find tenant with highest utilization
max_utilization = max(
metrics.items(),
key=lambda x: x[1]['pool_utilization']
)
# Find tenant with lowest success rate
min_success = min(
metrics.items(),
key=lambda x: x[1]['success_rate']
)
return {
'tenants': metrics,
'highest_utilization': max_utilization[0],
'lowest_success_rate': min_success[0],
}
if __name__ == "__main__":
monitor = TenantMonitor()
# Get metrics for all tenants
comparison = monitor.compare_tenants(['alpha', 'beta', 'gamma'])
print("Tenant Comparison:")
for tenant_id, metrics in comparison['tenants'].items():
print(f"\n{tenant_id}:")
print(f" Tasks: {metrics['total_tasks']}")
print(f" Success Rate: {metrics['success_rate']:.2%}")
print(f" Avg Duration: {metrics['avg_duration']:.2f}s")
print(f" Pool Utilization: {metrics['pool_utilization']:.2%}")
Performance Metrics
Multi-Tenancy Metrics
| Metric | Target | Warning | Critical |
|---|---|---|---|
| Isolation Score | > 0.9 | 0.7-0.9 | < 0.7 |
| Pool Utilization | < 80% | 80-95% | > 95% |
| Cross-Tenant Impact | 0 | Any | Multiple |
| Provisioning Time | < 5min | 5-15min | > 15min |
Tenant Resource Distribution
| Tenant | Pool Slots | DAG Count | Task Count | Usage |
|---|---|---|---|---|
| Alpha | 32 | 15 | 200 | 75% |
| Beta | 32 | 10 | 150 | 60% |
| Gamma | 64 | 25 | 400 | 80% |
Key Takeaways:
- Use RBAC with DAG tags for tenant-level access control
- Create separate pools for each tenant to prevent resource contention
- Prefix connections and variables with tenant ID for isolation
- Monitor tenant resource usage and enforce quotas
- Use tenant-aware DAG factories for consistent provisioning
- Implement tenant monitoring for capacity planning
See Also
- Security Best Practices — Security and access control
- Kubernetes Executor — K8s namespace isolation
- Monitoring and Alerting — Tenant-level monitoring
- Performance Tuning — Optimizing multi-tenant performance