Interview Question
βΉοΈInterview Context
Company: Netflix / Uber Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Explain how concurrency is managed in Airflow. How do pools, priority weights, and slots work? How do you prevent resource contention in a multi-tenant environment?"
Detailed Theory
Concurrency Fundamentals
# concurrency_fundamentals.py
"""
Airflow Concurrency Concepts:
1. Parallelism:
- Global setting for max tasks running across all DAGs
- Controlled by AIRFLOW__CORE__PARALLELISM
2. Pools:
- Logical groups for task isolation
- Limit concurrent tasks per pool
- Default pool for all tasks
3. Priority Weights:
- Determine task execution order
- Higher weight = higher priority
- Used when tasks compete for slots
4. Worker Concurrency:
- Max tasks per worker (Celery)
- Controlled by worker_concurrency
5. DAG-level Concurrency:
- max_active_runs_per_dag
- max_active_tasks_per_dag
"""
1. Pool Configuration
# pool_configuration.py
from airflow.models.pool import Pool
from airflow.utils.session import provide_session
# Create pools programmatically
@provide_session
def create_pools(session=None):
"""Create resource pools"""
# Database pool
db_pool = Pool(
pool='database_pool',
slots=10, # Max 10 concurrent database tasks
description='Pool for database operations'
)
session.add(db_pool)
# API pool
api_pool = Pool(
pool='api_pool',
slots=5, # Max 5 concurrent API calls
description='Pool for API operations'
)
session.add(api_pool)
# S3 pool
s3_pool = Pool(
pool='s3_pool',
slots=20, # Max 20 concurrent S3 operations
description='Pool for S3 operations'
)
session.add(s3_pool)
session.commit()
# Use pools in tasks
from airflow.decorators import dag, task
@dag(
dag_id='pool_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def pool_dag():
@task(pool='database_pool', pool_slots=1)
def database_task() -> dict:
"""Task using database pool"""
return {'status': 'database_done'}
@task(pool='api_pool', pool_slots=1)
def api_task() -> dict:
"""Task using API pool"""
return {'status': 'api_done'}
@task(pool='s3_pool', pool_slots=2) # Uses 2 slots
def s3_task() -> dict:
"""Task using 2 S3 slots"""
return {'status': 's3_done'}
database_task() >> api_task() >> s3_task()
pool_dag()
# Pool configuration in airflow.cfg
POOL_CONFIG = """
# Default pool settings
[core]
# Default pool slots
default_pool_slots = 128
# List of pools
[pool]
# List of pools as JSON
list = [
{"pool": "default_pool", "slots": 128, "description": "Default pool"},
{"pool": "database_pool", "slots": 10, "description": "Database operations"},
{"pool": "api_pool", "slots": 5, "description": "API operations"},
{"pool": "s3_pool", "slots": 20, "description": "S3 operations"}
]
"""
βΉοΈPro Tip
Use pools to isolate different types of workloads. For example, separate database, API, and file system operations to prevent one type from exhausting resources needed by another.
2. Priority Weights
# priority_weights.py
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id='priority_weight_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def priority_dag():
@task(priority_weight=10)
def high_priority_task() -> dict:
"""High priority task"""
return {'priority': 'high'}
@task(priority_weight=5)
def medium_priority_task() -> dict:
"""Medium priority task"""
return {'priority': 'medium'}
@task(priority_weight=1)
def low_priority_task() -> dict:
"""Low priority task"""
return {'priority': 'low'}
# All tasks compete for same pool
high = high_priority_task()
medium = medium_priority_task()
low = low_priority_task()
# Dependencies don't affect priority
high >> medium >> low
priority_dag()
# Priority weight calculation
PRIORITY_CONFIG = """
# Priority weight calculation
[core]
# Method to calculate priority weight
priority_weight_method = downstream # Options: downstream, upstream, absolute
# Downstream: Count of downstream tasks
# Upstream: Count of upstream tasks
# Absolute: Fixed weight from priority_weight parameter
"""
3. Worker Concurrency (Celery)
# worker_concurrency.py
"""
Celery Worker Concurrency:
Configure how many tasks each worker can run concurrently.
"""
# Worker configuration
WORKER_CONFIG = """
# Celery worker settings
[celery]
# Worker concurrency (tasks per worker)
worker_concurrency = 16
# Worker prefetch multiplier
worker_prefetch_multiplier = 1
# Worker max tasks per child (restart worker after N tasks)
worker_max_tasks_per_child = 1000
# Worker max memory per child (restart worker after N bytes)
worker_max_memory_per_child = 2000000 # 2GB
# Task timeout
task_time_limit = 3600 # 1 hour
task_soft_time_limit = 3000 # 50 minutes
"""
# Worker sizing
WORKER_SIZING = """
For CPU-bound tasks:
- worker_concurrency = 2 * CPU cores
- Example: 8 cores = 16 workers
For I/O-bound tasks:
- worker_concurrency = 4 * CPU cores
- Example: 8 cores = 32 workers
For mixed workloads:
- worker_concurrency = 3 * CPU cores
- Example: 8 cores = 24 workers
"""
# Worker scaling
def calculate_worker_count(
total_tasks_per_hour: int,
avg_task_duration_minutes: int,
target_utilization: float = 0.8
) -> int:
"""Calculate number of workers needed"""
tasks_per_worker_per_hour = 60 / avg_task_duration_minutes
required_workers = total_tasks_per_hour / tasks_per_worker_per_hour
return int(required_workers / target_utilization) + 1
4. DAG-level Concurrency
# dag_level_concurrency.py
from airflow.decorators import dag
from datetime import datetime
@dag(
dag_id='dag_level_concurrency',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
max_active_runs=2, # Max 2 concurrent DAG runs
max_active_tasks=10, # Max 10 concurrent tasks in this DAG
)
def concurrency_dag():
pass
# Different DAGs with different concurrency limits
@dag(
dag_id='high_throughput_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@hourly',
max_active_runs=5,
max_active_tasks=50,
)
def high_throughput_dag():
pass
@dag(
dag_id='resource_intensive_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
max_active_runs=1,
max_active_tasks=5,
)
def resource_intensive_dag():
pass
# Global concurrency settings
GLOBAL_CONFIG = """
# Global concurrency settings
[core]
# Maximum number of tasks that can run concurrently across all DAGs
parallelism = 32
# Maximum number of task instances allowed to run concurrently
# across all DAGs
max_active_tasks_per_dag = 16
# Maximum number of DAG runs per DAG
max_active_runs_per_dag = 16
# For Celery executor
[celery]
# Worker concurrency
worker_concurrency = 16
"""
5. Resource Contention Prevention
# resource_contention.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
@dag(
dag_id='resource_contention_prevention',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def contention_prevention():
@task(
pool='database_pool',
pool_slots=2, # Reserve 2 slots
priority_weight=10,
)
def critical_database_task() -> Dict[str, Any]:
"""Critical task that needs database resources"""
return {'status': 'critical_done'}
@task(
pool='database_pool',
pool_slots=1, # Use 1 slot
priority_weight=5,
)
def normal_database_task() -> Dict[str, Any]:
"""Normal task that uses database"""
return {'status': 'normal_done'}
@task(
pool='api_pool',
pool_slots=1,
priority_weight=1,
)
def api_task() -> Dict[str, Any]:
"""API task (separate pool)"""
return {'status': 'api_done'}
# Resource isolation
critical = critical_database_task()
normal = normal_database_task()
api = api_task()
# Different pools prevent contention
critical >> normal >> api
contention_prevention()
β οΈImportant
Use separate pools for different resource types (database, API, file system). This prevents one type of workload from exhausting resources needed by others.
Real-World Scenarios
Scenario 1: Netflix's Multi-Tenant Environment
# netflix_multi_tenant.py
"""
Netflix-style multi-tenant concurrency:
- Team-based pool isolation
- Priority-based scheduling
- Resource quotas
"""
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
@dag(
dag_id='netflix_multi_tenant',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['netflix', 'multi-tenant', 'production'],
)
def multi_tenant_pipeline():
@task(
pool='netflix_data_engineering_pool',
pool_slots=5,
priority_weight=10,
)
def team_a_task() -> Dict[str, Any]:
"""Team A's task"""
return {'team': 'A', 'status': 'done'}
@task(
pool='netflix_data_engineering_pool',
pool_slots=3,
priority_weight=5,
)
def team_b_task() -> Dict[str, Any]:
"""Team B's task"""
return {'team': 'B', 'status': 'done'}
@task(
pool='netflix_ml_pool',
pool_slots=2,
priority_weight=8,
)
def ml_task() -> Dict[str, Any]:
"""ML team's task"""
return {'team': 'ML', 'status': 'done'}
# Different pools for different teams
team_a = team_a_task()
team_b = team_b_task()
ml = ml_task()
# Tasks can run in parallel if in different pools
[team_a, team_b, ml]
multi_tenant_pipeline()
Scenario 2: Uber's High-Throughput Pipeline
# uber_high_throughput.py
"""
Uber-style high-throughput pipeline:
- Maximize parallelism
- Efficient resource usage
- Dynamic scaling
"""
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict
@dag(
dag_id='uber_high_throughput',
schedule_interval='*/5 * * * *', # Every 5 minutes
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=3, # Allow multiple runs
max_active_tasks=100, # High task concurrency
tags=['uber', 'high-throughput', 'production'],
)
def high_throughput_pipeline():
@task
def get_partitions() -> List[str]:
"""Get partitions to process"""
return [f'partition_{i}' for i in range(20)]
@task(
pool='uber_processing_pool',
pool_slots=1,
priority_weight=5,
)
def process_partition(partition: str) -> Dict[str, str]:
"""Process individual partition"""
return {'partition': partition, 'status': 'processed'}
@task(
pool='uber_aggregation_pool',
pool_slots=5,
priority_weight=10,
)
def aggregate_results(results: List[Dict[str, str]]) -> Dict[str, Any]:
"""Aggregate all results"""
return {'total': len(results), 'status': 'aggregated'}
# High parallelism
partitions = get_partitions()
processed = process_partition.expand(partitions)
aggregated = aggregate_results(processed)
high_throughput_pipeline()
Edge Cases
β οΈCommon Pitfalls
-
Pool Exhaustion: Monitor pool usage to prevent exhaustion.
-
Deadlocks: Avoid circular dependencies that can cause deadlocks.
-
Priority Inversion: Ensure high-priority tasks aren't blocked by low-priority tasks.
-
Resource Starvation: Set appropriate pool sizes to prevent starvation.
# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id='concurrency_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
# Pool exhaustion issue
@task(pool='small_pool', pool_slots=10) # BAD: Exceeds pool size
def pool_exhaustion():
pass
# Correct pool usage
@task(pool='small_pool', pool_slots=1) # GOOD: Within pool size
def correct_pool_usage():
pass
# Deadlock issue
# @task
# def task_a(task_b_result):
# pass
# @task
# def task_b(task_a_result):
# pass
# This creates a circular dependency!
correct_pool_usage()
edge_cases()
QuizBox
Best Practices
# best_practices.py
"""
Concurrency Best Practices:
1. Pool Design:
- Create pools for different resource types
- Set appropriate slot counts
- Monitor pool usage
2. Priority Management:
- Use priority weights for critical tasks
- Avoid priority inversion
- Document priority scheme
3. Worker Sizing:
- Match concurrency to workload type
- Monitor worker utilization
- Scale workers based on demand
4. Resource Isolation:
- Separate different workloads
- Prevent resource starvation
- Set appropriate limits
5. Monitoring:
- Track pool utilization
- Monitor queue depth
- Alert on resource exhaustion
"""
βΉοΈNetflix Interview Tip
At Netflix, they use separate pools for different teams and workloads. When discussing concurrency, emphasize the importance of resource isolation and priority-based scheduling. Also mention how they dynamically scale workers based on demand.
Summary
Concurrency management is critical for efficient Airflow operation. Key takeaways:
- Parallelism for global limits
- Pools for resource isolation
- Priority weights for execution order
- Worker concurrency for Celery
- DAG-level limits for per-DAG control
For Netflix and Uber interviews, focus on:
- Multi-tenant resource isolation
- Priority-based scheduling
- Dynamic scaling
- Monitoring and alerting
- Performance optimization
This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.