Apache Airflow Executors Comparison
Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EXECUTOR ARCHITECTURE OVERVIEW β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXECUTOR HIERARCHY β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β BaseExecutor β β β
β β β βββ execute_async() β β β
β β β βββ change_state() β β β
β β β βββ get_task_log() β β β
β β β βββ try_adopt_task_instances() β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β ββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ β β
β β βΌ βΌ βΌ β β
β β βββββββββββββββ βββββββββββββββ ββββββββββββββββ β
β β β Sequential β β Local β β Celery ββ β
β β β Executor β β Executor β β Executor ββ β
β β βββββββββββββββ βββββββββββββββ ββββββββββββββββ β
β β β β β β β
β β β β β β β
β β β β β β β
β β βββββββββββββββ βββββββββββββββ ββββββββββββββββ β
β β β Sequential β β Local β β Celery ββ β
β β β Execution β β Processes β β Workers ββ β
β β βββββββββββββββ βββββββββββββββ ββββββββββββββββ β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CELERY EXECUTOR ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CELERY COMPONENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Scheduler β β β
β β β βββ CeleryExecutor β β β
β β β βββ Task Queue Management β β β
β β β βββ Result Backend β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Message Broker (RabbitMQ/Redis) β β β
β β β βββ Task Distribution β β β
β β β βββ Result Storage β β β
β β β βββ Worker Coordination β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Workers β β β
β β β βββ Worker 1 (Node 1) β β β
β β β βββ Worker 2 (Node 2) β β β
β β β βββ Worker 3 (Node 3) β β β
β β β βββ Worker N (Node N) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CELERY FLOW β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βSchedulerβββββββΆβ Broker βββββββΆβ Worker βββββββΆβ Result β β β
β β β β β β β β β Backend β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βEnqueue β βRoute β βExecute β βStore β β β
β β βTask β βTask β βTask β βResult β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KUBERNETES EXECUTOR ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β KUBERNETES COMPONENTS β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Airflow Components β β β
β β β βββ Scheduler β β β
β β β βββ Web Server β β β
β β β βββ Triggerer β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Kubernetes Resources β β β
β β β βββ Worker Pods (Dynamic) β β β
β β β βββ Init Containers β β β
β β β βββ Sidecar Containers β β β
β β β βββ Service Accounts β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Kubernetes API β β β
β β β βββ Pod Management β β β
β β β βββ Resource Allocation β β β
β β β βββ Service Discovery β β β
β β β βββ Secrets Management β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β POD LIFECYCLE β β
β β β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βSchedulerβββββββΆβ Pod βββββββΆβ Worker βββββββΆβCleanup β β β
β β βCreates β β Scheduledβ β Executesβ β & Deleteβ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β β β β β
β β βΌ βΌ βΌ βΌ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β βAPI Call β βInit β βRun Task β βPod β β β
β β β β βContainerβ β β βTerminatedβ β β
β β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Formal Definitions
DfExecutor
An executor is the component that determines how task instances are dispatched and executed. An executor is characterized by its parallelism capacity , scheduling strategy , and fault-tolerance mechanism . Airflow supports Sequential, Local, Celery, and Kubernetes executors.
DfTask Parallelism
Task parallelism is the number of tasks executing simultaneously across all workers. It is bounded by where is the executor slot count, is the number of workers, and is the per-worker concurrency limit.
Detailed Explanation
Executor Selection Criteria
Choosing the right executor is critical for Airflow performance and scalability. The decision depends on several factors including scale, infrastructure, team expertise, and operational requirements.
Scale Considerations: For small teams with fewer than 50 DAGs, LocalExecutor is often sufficient. For medium-scale deployments (50-500 DAGs), CeleryExecutor provides good scalability. For large-scale deployments (500+ DAGs) or dynamic workloads, KubernetesExecutor offers the best flexibility.
Infrastructure: Existing infrastructure plays a significant role. If you already have a Kubernetes cluster, KubernetesExecutor is natural. If you have Redis or RabbitMQ infrastructure, CeleryExecutor integrates well. For simple setups, LocalExecutor requires minimal infrastructure.
Operational Complexity: SequentialExecutor is simplest but least scalable. LocalExecutor adds process management complexity. CeleryExecutor requires managing message brokers and workers. KubernetesExecutor requires Kubernetes expertise and cluster management.
Resource Utilization: SequentialExecutor uses minimal resources but runs tasks serially. LocalExecutor uses local machine resources efficiently. CeleryExecutor distributes across nodes. KubernetesExecutor provides dynamic resource allocation and isolation.
Task Throughput (Steady State)
Here,
- =Average number of concurrently executing tasks
- =Mean task execution time
Executor Overhead Ratio
Here,
- =Time for scheduler to queue the task
- =Worker/container startup time
- =Actual task execution time
ThParallelism Bound
For any executor, the effective parallelism satisfies where is the configured parallelism and is the actual available compute resources. Oversubscription beyond leads to context-switching overhead and degraded throughput.
The KubernetesExecutor has the highest startup overhead (2-5s per pod) but provides complete resource isolation. The CeleryExecutor has moderate overhead (500ms) with shared worker resources. LocalExecutor has minimal overhead (200ms) but is limited to a single machine.
For CPU-intensive tasks, set lower worker_concurrency (4-8) on Celery workers. For I/O-intensive tasks, use higher concurrency (16-32). Match the executor configuration to your workload characteristics for optimal throughput.
SequentialExecutor
Architecture: SequentialExecutor runs tasks one at a time in a single process. It's the simplest executor with no external dependencies. Tasks execute sequentially, waiting for each task to complete before starting the next.
Use Cases: Development environments, testing, simple workflows with minimal tasks, learning Airflow concepts. It's not recommended for production due to its serial nature.
Limitations: No parallelism, single point of failure, limited throughput, not suitable for time-sensitive workflows.
LocalExecutor
Architecture: LocalExecutor runs tasks as separate processes on the same machine. It uses Python's multiprocessing module to execute tasks in parallel. Each task runs in its own process with independent memory space.
Use Cases: Small to medium production environments, single-server deployments, cost-sensitive environments. It provides good performance without external dependencies.
Considerations: Limited by single machine resources, no fault tolerance across machines, resource contention on shared infrastructure.
CeleryExecutor
Architecture: CeleryExecutor distributes tasks across multiple worker nodes using a message broker (RabbitMQ or Redis). Workers are long-running processes that pull tasks from queues and execute them. Results can be stored in a result backend for monitoring.
Use Cases: Medium to large-scale deployments, environments with existing Celery infrastructure, need for horizontal scaling, multi-node execution.
Benefits: Horizontal scaling, fault tolerance, resource isolation, monitoring capabilities, queue-based task distribution.
Challenges: Requires message broker management, worker monitoring, result backend configuration, network considerations.
KubernetesExecutor
Architecture: KubernetesExecutor creates dynamic worker pods for each task. It uses the Kubernetes API to manage pod lifecycle. Each task runs in an isolated container with defined resource limits.
Use Cases: Large-scale deployments, dynamic workloads, environments requiring strict resource isolation, cloud-native architectures.
Benefits: Dynamic scaling, resource isolation, cost optimization (pay per use), environment consistency, Kubernetes ecosystem integration.
Challenges: Kubernetes expertise required, pod startup overhead, network complexity, storage management, monitoring across pods.
Key Concepts Table
| Executor | Parallelism | Scalability | Complexity | Fault Tolerance | Use Case |
|---|---|---|---|---|---|
| Sequential | None | None | Very Low | Single point | Development |
| Local | Limited | Single node | Low | Single node | Small production |
| Celery | High | Horizontal | Medium | High | Medium production |
| Kubernetes | Very High | Dynamic | High | Very High | Large scale |
Code Examples
Executor Configuration Examples
# executor_configuration.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.state import State
# Sequential Executor Configuration
# airflow.cfg:
# [core]
# executor = SequentialExecutor
# Local Executor Configuration
# airflow.cfg:
# [core]
# executor = LocalExecutor
# parallelism = 32
# max_active_tasks_per_dag = 16
# Celery Executor Configuration
# airflow.cfg:
# [core]
# executor = CeleryExecutor
# [celery]
# broker_url = redis://redis:6379/0
# result_backend = db+postgresql://airflow:airflow@postgres/airflow
# worker_concurrency = 16
# Kubernetes Executor Configuration
# airflow.cfg:
# [core]
# executor = KubernetesExecutor
# [kubernetes_executor]
# namespace = airflow
# worker_container_repository = apache/airflow
# worker_container_tag = 2.8.0
# delete_worker_pods = True
# delete_worker_pods_on_failure = False
def executor_specific_task(**context):
"""Task that behaves differently based on executor."""
from airflow.configuration import conf
executor = conf.get('core', 'EXECUTOR')
print(f"Running on executor: {executor}")
if executor == 'KubernetesExecutor':
# Kubernetes-specific logic
import os
pod_name = os.environ.get('POD_NAME', 'unknown')
namespace = os.environ.get('NAMESPACE', 'default')
print(f"Running in pod: {pod_name} in namespace: {namespace}")
elif executor == 'CeleryExecutor':
# Celery-specific logic
import celery
print(f"Worker hostname: {celery.current_app.connection().hostname}")
elif executor == 'LocalExecutor':
# Local executor logic
import multiprocessing
print(f"Running in process: {multiprocessing.current_process().name}")
else:
# Sequential executor logic
print("Running in sequential mode")
def resource_monitoring_task(**context):
"""Monitor resource usage based on executor type."""
import psutil
import os
# Get system resource information
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
resource_info = {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': disk.percent,
'hostname': os.uname().nodename,
'pid': os.getpid(),
}
print(f"Resource usage: {resource_info}")
return resource_info
with DAG(
'executor_example_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Executor configuration examples',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['executor', 'configuration'],
) as dag:
# Task to demonstrate executor behavior
executor_task = PythonOperator(
task_id='executor_specific_task',
python_callable=executor_specific_task,
)
# Resource monitoring task
resource_monitor = PythonOperator(
task_id='resource_monitor',
python_callable=resource_monitoring_task,
)
# Bash task to demonstrate executor capabilities
bash_task = BashOperator(
task_id='bash_task',
command='echo "Hostname: $(hostname) | PID: $$ | Executor: $AIRFLOW__CORE__EXECUTOR"',
)
# Set dependencies
executor_task >> resource_monitor >> bash_task
Kubernetes Executor Advanced Configuration
# kubernetes_executor_advanced.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.volume import EmptyDirVolume, VolumeMount
from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import V1ResourceRequirements, V1Toleration
def kubernetes_specific_task(**context):
"""Task designed for Kubernetes executor."""
import os
import json
# Get Kubernetes-specific environment variables
pod_info = {
'pod_name': os.environ.get('POD_NAME', 'unknown'),
'namespace': os.environ.get('NAMESPACE', 'default'),
'node_name': os.environ.get('NODE_NAME', 'unknown'),
'pod_ip': os.environ.get('POD_IP', 'unknown'),
'service_account': os.environ.get('SERVICE_ACCOUNT', 'default'),
}
print(f"Kubernetes pod info: {json.dumps(pod_info, indent=2)}")
# Access Kubernetes API if needed
try:
from kubernetes import client, config
config.load_incluster_config()
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace=pod_info['namespace'])
print(f"Total pods in namespace: {len(pods.items)}")
except Exception as e:
print(f"Could not access Kubernetes API: {e}")
return pod_info
# Advanced Kubernetes configuration
kubernetes_config = {
# Resource requirements
'resources': V1ResourceRequirements(
requests={'cpu': '100m', 'memory': '128Mi'},
limits={'cpu': '500m', 'memory': '512Mi'},
),
# Volume mounts
'volumes': [
EmptyDirVolume(name='shared-data'),
],
'volume_mounts': [
VolumeMount(
name='shared-data',
mount_path='/tmp/shared',
sub_path=None,
),
],
# Secrets
'secrets': [
Secret('env', 'API_KEY', 'kubernetes-secrets', 'api-key'),
Secret('volume', 'db-cert', 'kubernetes-secrets', 'db-cert',
mount_path='/etc/db-cert'),
],
# Tolerations
'tolerations': [
V1Toleration(
key='dedicated',
operator='Equal',
value='airflow',
effect='NoSchedule',
),
],
# Node selectors
'node_selector': {
'node-type': 'airflow-worker',
'zone': 'us-west-2a',
},
# Image pull secrets
'image_pull_secrets': ['airflow-registry'],
# Service account
'service_account_name': 'airflow-worker',
# Pod lifecycle hooks
'on_finish_action': 'delete_pod', # or 'keep_pod'
# Retry configuration
'get_logs': True,
'log_events_on_failure': True,
'tolerations': [],
'node_selectors': {},
}
with DAG(
'kubernetes_advanced_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
description='Advanced Kubernetes executor configuration',
schedule_interval=timedelta(hours=2),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['kubernetes', 'advanced'],
) as dag:
# KubernetesPodOperator with advanced configuration
k8s_task = KubernetesPodOperator(
task_id='kubernetes_pod_task',
name='kubernetes-task-pod',
namespace='airflow',
image='apache/airflow:2.8.0',
cmds=['python', '-c'],
arguments=['''
import os
print(f"Running in pod: {os.environ.get('POD_NAME', 'unknown')}")
print(f"Node: {os.environ.get('NODE_NAME', 'unknown')}")
print("Task completed successfully")
'''],
**kubernetes_config,
)
# Python operator for Kubernetes-specific logic
python_k8s_task = PythonOperator(
task_id='python_kubernetes_task',
python_callable=kubernetes_specific_task,
)
# Set dependencies
k8s_task >> python_k8s_task
Celery Executor Optimization
# celery_executor_optimization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
def celery_optimized_task(**context):
"""Task optimized for Celery executor."""
import os
import socket
import time
# Get Celery worker information
worker_info = {
'hostname': socket.gethostname(),
'pid': os.getpid(),
'celery_task_id': context.get('task_instance').task_id,
'queue': context.get('task_instance').queue or 'default',
}
print(f"Running on Celery worker: {worker_info}")
# Simulate CPU-intensive work
start_time = time.time()
result = sum(i * i for i in range(1000000))
end_time = time.time()
worker_info['computation_time'] = end_time - start_time
worker_info['result'] = result
return worker_info
def queue_monitoring_task(**context):
"""Monitor Celery queue health."""
from airflow.providers.celery.hooks.celery import CeleryHook
try:
hook = CeleryHook()
inspector = hook.connection.control.inspect()
# Get queue information
queues = inspector.active_queues()
stats = inspector.stats()
monitoring_data = {
'active_queues': len(queues) if queues else 0,
'workers': len(stats) if stats else 0,
'timestamp': datetime.now().isoformat(),
}
print(f"Celery monitoring data: {monitoring_data}")
return monitoring_data
except Exception as e:
print(f"Error monitoring Celery: {e}")
return {'error': str(e)}
# Celery queue configuration
celery_queues = {
'default': {
'description': 'Default queue for general tasks',
'routing_key': 'default',
},
'cpu_intensive': {
'description': 'Queue for CPU-intensive tasks',
'routing_key': 'cpu_intensive',
'worker_concurrency': 4, # Lower concurrency for CPU tasks
},
'io_intensive': {
'description': 'Queue for I/O-intensive tasks',
'routing_key': 'io_intensive',
'worker_concurrency': 32, # Higher concurrency for I/O tasks
},
'priority': {
'description': 'Queue for high-priority tasks',
'routing_key': 'priority',
},
}
with DAG(
'celery_optimization_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Celery executor optimization examples',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['celery', 'optimization'],
) as dag:
# CPU-intensive task
cpu_task = PythonOperator(
task_id='cpu_intensive_task',
python_callable=celery_optimized_task,
queue='cpu_intensive', # Route to specific queue
pool='cpu_pool', # Use specific pool
priority_weight=10,
)
# I/O-intensive task
io_task = PythonOperator(
task_id='io_intensive_task',
python_callable=celery_optimized_task,
queue='io_intensive',
pool='io_pool',
priority_weight=5,
)
# High-priority task
priority_task = PythonOperator(
task_id='priority_task',
python_callable=celery_optimized_task,
queue='priority',
priority_weight=100,
)
# Queue monitoring task
monitor_task = PythonOperator(
task_id='queue_monitor',
python_callable=queue_monitoring_task,
)
# Set dependencies
[cpu_task, io_task, priority_task] >> monitor_task
Executor Comparison Benchmark
# executor_benchmark.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import time
import json
import psutil
def benchmark_task(task_id: str, iterations: int = 1000, **context):
"""Benchmark task to measure executor performance."""
start_time = time.time()
start_memory = psutil.virtual_memory().used
# Simulate work
result = 0
for i in range(iterations):
result += i * i
end_time = time.time()
end_memory = psutil.virtual_memory().used
benchmark_results = {
'task_id': task_id,
'iterations': iterations,
'execution_time': end_time - start_time,
'memory_used': end_memory - start_memory,
'cpu_percent': psutil.cpu_percent(),
'timestamp': datetime.now().isoformat(),
}
print(f"Benchmark results: {json.dumps(benchmark_results, indent=2)}")
return benchmark_results
def parallel_benchmark(num_tasks: int = 10, **context):
"""Benchmark parallel execution across executors."""
import multiprocessing
def worker(task_num):
"""Worker function for parallel benchmark."""
start_time = time.time()
result = sum(i * i for i in range(100000))
return {
'task_num': task_num,
'execution_time': time.time() - start_time,
'result': result,
}
# Execute tasks in parallel
start_time = time.time()
with multiprocessing.Pool(processes=num_tasks) as pool:
results = pool.map(worker, range(num_tasks))
total_time = time.time() - start_time
parallel_results = {
'num_tasks': num_tasks,
'total_time': total_time,
'avg_time_per_task': total_time / num_tasks,
'parallel_efficiency': (sum(r['execution_time'] for r in results) / total_time),
'results': results,
}
print(f"Parallel benchmark results: {json.dumps(parallel_results, indent=2)}")
return parallel_results
with DAG(
'executor_benchmark_dag',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Executor performance benchmark',
schedule_interval=timedelta(hours=1),
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['benchmark', 'performance'],
) as dag:
# Single task benchmark
single_benchmark = PythonOperator(
task_id='single_task_benchmark',
python_callable=benchmark_task,
op_kwargs={'task_id': 'single', 'iterations': 10000},
)
# Parallel benchmark
parallel_benchmark_task = PythonOperator(
task_id='parallel_benchmark',
python_callable=parallel_benchmark,
op_kwargs={'num_tasks': 5},
)
# Bash benchmark
bash_benchmark = BashOperator(
task_id='bash_benchmark',
command='time (for i in {1..100}; do echo "Processing $i"; done)',
)
# Set dependencies
single_benchmark >> parallel_benchmark_task >> bash_benchmark
Performance Metrics
| Executor | Task Startup Time | Parallelism | Resource Usage | Fault Tolerance | Scalability |
|---|---|---|---|---|---|
| Sequential | ~100ms | None | Minimal | Single point | None |
| Local | ~200ms | Limited | Single node | Single node | Limited |
| Celery | ~500ms | High | Distributed | High | Horizontal |
| Kubernetes | ~2-5s | Very High | Dynamic | Very High | Dynamic |
Best Practices
-
Executor Selection: Choose executor based on scale requirements. Start with LocalExecutor for development, migrate to Celery or Kubernetes for production.
-
Resource Planning: Estimate resource requirements for each executor type. Monitor resource usage and adjust configurations accordingly.
-
Queue Management: Use queues effectively in Celery to separate different types of workloads. Implement priority queues for critical tasks.
-
Kubernetes Configuration: Use appropriate resource limits for Kubernetes pods. Implement node selectors and tolerations for workload placement.
-
Monitoring: Implement comprehensive monitoring for all executors. Track task success rates, execution times, and resource utilization.
-
Scaling: Configure auto-scaling for Celery workers and Kubernetes pods. Set appropriate scaling policies based on workload patterns.
-
Fault Tolerance: Configure retries and failure handling. Implement dead letter queues for failed tasks. Use Kubernetes pod disruption budgets.
-
Cost Optimization: Right-size resources based on actual usage. Use spot instances for non-critical workloads. Implement resource quotas.
-
Security: Implement proper authentication and authorization. Use network policies for Kubernetes. Secure message brokers for Celery.
-
Testing: Test executor configurations in staging environments. Validate failover and scaling scenarios. Performance test before production deployment.
Key Takeaways:
- Maximum parallelism:
- Throughput depends on executor parallelism and task duration
- Overhead ratio varies by executor type
- Sequential: no parallelism; Local: single-node; Celery: horizontal; Kubernetes: dynamic
- Kubernetes provides full isolation but highest startup cost (~2-5s per pod)
- Celery is the most common production choice for medium-to-large deployments
See also: Kafka Connect (kafka/03), PySpark Submit (pyspark/19), Data Engineering Orchestration (data-engineering/017)