Interview Question
βΉοΈInterview Context
Company: Google / Apple Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "How do you optimize Airflow performance at scale? Explain parallelism tuning, database optimization, and resource management strategies."
Detailed Theory
Performance Fundamentals
# performance_fundamentals.py
"""
Airflow Performance Factors:
1. Parallelism:
- Global parallelism settings
- DAG-level concurrency
- Task-level parallelism
2. Database:
- Connection pooling
- Query optimization
- Index management
3. Workers:
- Worker concurrency
- Resource allocation
- Scaling strategies
4. DAG Design:
- Task granularity
- Dependency optimization
- Caching strategies
5. Infrastructure:
- CPU/Memory allocation
- Network optimization
- Storage performance
"""
1. Parallelism Configuration
# parallelism_config.py
"""
Parallelism Configuration:
Optimize how many tasks can run concurrently.
"""
# Global parallelism
PARALLELISM_CONFIG = """
[core]
# Maximum number of tasks that can run concurrently
# across all DAGs
parallelism = 32
# Maximum number of task instances allowed to run concurrently
# across all DAGs
max_active_tasks_per_dag = 16
# Maximum number of DAG runs per DAG
max_active_runs_per_dag = 16
# Pool settings
default_pool_slots = 128
"""
# DAG-level concurrency
DAG_CONCURRENCY = """
@dag(
dag_id='high_throughput_dag',
schedule_interval='@hourly',
max_active_runs=5, # Allow multiple runs
max_active_tasks=50, # High task concurrency
)
def high_throughput_dag():
pass
"""
# Worker concurrency (Celery)
WORKER_CONCURRENCY = """
[celery]
# Worker concurrency (tasks per worker)
worker_concurrency = 16
# Worker prefetch multiplier
worker_prefetch_multiplier = 1
# Worker max tasks per child
worker_max_tasks_per_child = 1000
# Worker max memory per child (bytes)
worker_max_memory_per_child = 2000000 # 2GB
"""
# Worker sizing calculation
def calculate_workers(
tasks_per_hour: int,
avg_task_duration_minutes: int,
target_utilization: float = 0.8,
) -> int:
"""Calculate number of workers needed"""
tasks_per_worker_per_hour = 60 / avg_task_duration_minutes
required_workers = tasks_per_hour / tasks_per_worker_per_hour
return int(required_workers / target_utilization) + 1
βΉοΈPro Tip
Monitor your pipeline's throughput and adjust parallelism settings accordingly. Start with conservative settings and increase based on actual usage patterns.
2. Database Optimization
# database_optimization.py
"""
Database Optimization:
Optimize Airflow's metadata database for performance.
"""
# Connection pooling
DATABASE_CONFIG = """
[database]
# Connection pool settings
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 30
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
# For PostgreSQL with PgBouncer
# sql_alchemy_pool_size = 100
# sql_alchemy_max_overflow = 50
"""
# PostgreSQL optimization
POSTGRES_OPTIMIZATION = """
# postgresql.conf optimizations
# Memory
shared_buffers = '4GB'
effective_cache_size = '12GB'
work_mem = '256MB'
maintenance_work_mem = '1GB'
# Query optimization
random_page_cost = 1.1
effective_io_concurrency = 200
default_statistics_target = 100
# WAL
wal_buffers = '64MB'
checkpoint_completion_target = 0.9
max_wal_size = '4GB'
# Connections
max_connections = 200
"""
# Index management
INDEX_MANAGEMENT = """
-- Key indexes for Airflow performance
-- Task instance lookups
CREATE INDEX IF NOT EXISTS idx_task_instance_dag_id
ON task_instance(dag_id);
CREATE INDEX IF NOT EXISTS idx_task_instance_execution_date
ON task_instance(execution_date);
CREATE INDEX IF NOT EXISTS idx_task_instance_state
ON task_instance(state);
-- DAG run lookups
CREATE INDEX IF NOT EXISTS idx_dag_run_dag_id
ON dag_run(dag_id);
CREATE INDEX IF NOT EXISTS idx_dag_run_execution_date
ON dag_run(execution_date);
-- XCom lookups
CREATE INDEX IF NOT EXISTS idx_xcom_task_id
ON xcom(task_id, dag_id, run_id);
"""
# Query optimization
QUERY_OPTIMIZATION = """
-- Optimize slow queries
-- 1. Use EXPLAIN ANALYZE to identify bottlenecks
EXPLAIN ANALYZE
SELECT * FROM task_instance
WHERE dag_id = 'my_dag'
AND execution_date > '2024-01-01';
-- 2. Add appropriate indexes
CREATE INDEX CONCURRENTLY idx_task_instance_dag_execution
ON task_instance(dag_id, execution_date);
-- 3. Vacuum and analyze regularly
VACUUM ANALYZE task_instance;
"""
3. DAG Optimization
# dag_optimization.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
# Bad: Too many small tasks
@dag(
dag_id='bad_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def bad_dag():
@task
def step1():
return {'data': 'value'}
@task
def step2(data):
return {'processed': data}
@task
def step3(data):
return {'final': data}
# Too many tasks for simple operations
s1 = step1()
s2 = step2(s1)
step3(s2)
# Good: Consolidated tasks
@dag(
dag_id='good_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def good_dag():
@task
def process_data() -> Dict[str, Any]:
"""Consolidate multiple steps into one task"""
# Step 1
data = {'data': 'value'}
# Step 2
processed = {'processed': data}
# Step 3
final = {'final': processed}
return final
process_data()
# Good: Parallel tasks where possible
@dag(
dag_id='parallel_dag',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def parallel_dag():
@task
def extract_source_a() -> Dict[str, Any]:
return {'source': 'a'}
@task
def extract_source_b() -> Dict[str, Any]:
return {'source': 'b'}
@task
def extract_source_c() -> Dict[str, Any]:
return {'source': 'c'}
@task
def combine(
a: Dict[str, Any],
b: Dict[str, Any],
c: Dict[str, Any],
) -> Dict[str, Any]:
return {'combined': [a, b, c]}
# Parallel extraction
a = extract_source_a()
b = extract_source_b()
c = extract_source_c()
# Combine after all complete
combine(a, b, c)
# Good: Use TaskFlow API for cleaner code
@dag(
dag_id='taskflow_optimized',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
)
def taskflow_optimized():
@task
def extract() -> list:
return [{'id': 1}, {'id': 2}]
@task
def transform(data: list) -> list:
return [{'id': d['id'], 'processed': True} for d in data]
@task
def load(data: list) -> bool:
return True
# Clean, readable pipeline
data = extract()
transformed = transform(data)
load(transformed)
taskflow_optimized()
β οΈImportant
Avoid creating too many small tasks. Each task has overhead for scheduling and execution. Consolidate related operations into fewer tasks when possible.
4. Resource Management
# resource_management.py
"""
Resource Management:
Optimize CPU, memory, and network usage.
"""
# CPU optimization
CPU_OPTIMIZATION = """
# For CPU-bound tasks
@task(
pool='cpu_pool',
pool_slots=2,
executor_config={
"KubernetesExecutor": {
"request_cpu": "2",
"limit_cpu": "4",
"request_memory": "4Gi",
"limit_memory": "8Gi",
}
},
)
def cpu_intensive_task():
pass
"""
# Memory optimization
MEMORY_OPTIMIZATION = """
# For memory-intensive tasks
@task(
pool='memory_pool',
pool_slots=1,
executor_config={
"KubernetesExecutor": {
"request_cpu": "1",
"limit_cpu": "2",
"request_memory": "8Gi",
"limit_memory": "16Gi",
}
},
)
def memory_intensive_task():
pass
"""
# Network optimization
NETWORK_OPTIMIZATION = """
# For I/O-bound tasks
@task(
pool='io_pool',
pool_slots=1,
executor_config={
"KubernetesExecutor": {
"request_cpu": "0.5",
"limit_cpu": "1",
"request_memory": "1Gi",
"limit_memory": "2Gi",
}
},
)
def io_intensive_task():
pass
"""
# Caching strategies
CACHING_STRATEGIES = """
# 1. XCom caching for small data
@task
def cached_task():
# Use XCom for small, frequently accessed data
return {'cached': 'data'}
# 2. File caching for larger data
@task
def file_cached_task():
import tempfile
import json
# Cache to file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as f:
json.dump({'large': 'data'}, f)
return f.name
# 3. Database caching
@task
def db_cached_task():
# Cache to database
pass
"""
Real-World Scenarios
Scenario 1: Google's Scale Optimization
# google_optimization.py
"""
Google-style scale optimization:
- Dynamic resource allocation
- Intelligent caching
- Performance monitoring
"""
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any
@dag(
dag_id='google_optimized_pipeline',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=10,
max_active_tasks=100,
tags=['google', 'optimized', 'production'],
)
def google_optimized():
@task(
pool='extraction_pool',
pool_slots=1,
priority_weight=10,
)
def extract_data() -> Dict[str, Any]:
"""Extract with caching"""
import redis
r = redis.Redis(host='redis-cluster')
# Check cache
cached = r.get('extracted_data')
if cached:
return json.loads(cached)
# Extract
data = perform_extraction()
# Cache result
r.setex('extracted_data', 3600, json.dumps(data))
return data
@task(
pool='transformation_pool',
pool_slots=2,
priority_weight=5,
)
def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform with parallel processing"""
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
results = pool.map(transform_chunk, data['chunks'])
return {'transformed': results}
@task(
pool='loading_pool',
pool_slots=1,
priority_weight=8,
)
def load_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Load with batch processing"""
batch_size = 1000
for i in range(0, len(data['transformed']), batch_size):
batch = data['transformed'][i:i + batch_size]
load_batch(batch)
return {'loaded': len(data['transformed'])}
data = extract_data()
transformed = transform_data(data)
load_data(transformed)
google_optimized()
QuizBox
Best Practices
# best_practices.py
"""
Performance Best Practices:
1. Parallelism:
- Set appropriate global parallelism
- Use DAG-level concurrency
- Optimize worker concurrency
2. Database:
- Use connection pooling
- Optimize queries
- Maintain indexes
3. DAG Design:
- Consolidate small tasks
- Parallelize independent tasks
- Use TaskFlow API
4. Resources:
- Allocate appropriate resources
- Use pools for isolation
- Implement caching
5. Monitoring:
- Track performance metrics
- Set up alerts
- Optimize based on data
"""
βΉοΈGoogle Interview Tip
At Google, they emphasize data-driven optimization. When discussing performance, highlight the importance of metrics-based tuning, resource isolation, and caching strategies. Also mention how they use parallel processing for large-scale transformations.
Summary
Performance optimization is critical for large-scale Airflow deployments. Key takeaways:
- Parallelism for throughput
- Database optimization for speed
- DAG design for efficiency
- Resource management for utilization
- Monitoring for continuous improvement
For Google and Apple interviews, focus on:
- Scaling strategies
- Resource optimization
- Caching patterns
- Performance monitoring
- Data-driven tuning
This question is part of the Apache Airflow Advanced interview preparation series. Practice explaining these concepts before your interview.