Kubernetes Executor Deep Dive
Architecture Diagram
Formal Definitions
DfKubernetes Executor
The Kubernetes Executor is an Airflow executor that creates a separate Kubernetes pod for each task execution. It provides dynamic resource allocation, isolation, and automatic cleanup. Formally, it maps tasks to pods: where is the set of tasks and is the set of pods.
DfPod Template
A Pod Template is a YAML specification defining the configuration for worker pods. It includes container image, resources, volumes, environment variables, and node selectors. The template is where is the image, is resources, is volumes, is env vars, and is node selector.
DfDynamic Scaling
Dynamic Scaling in Kubernetes adjusts the number of worker pods based on workload. The scaling function is where is the number of tasks waiting for execution.
Detailed Explanation
Basic K8s Executor Configuration
# airflow.cfg - Kubernetes executor configuration
[core]
executor = KubernetesExecutor
[kubernetes]
# Namespace for worker pods
namespace = airflow
# Worker pod image
worker_container_image = apache/airflow:2.8.0
# Service account name
worker_service_account_name = airflow-worker
# Resource limits for worker pods
worker_resource_request_cpu = 500m
worker_resource_request_memory = 1Gi
worker_resource_limit_cpu = 1000m
worker_resource_limit_memory = 2Gi
# Pod template file
pod_template_file = /opt/airflow/pod_templates/default_template.yaml
# Delete worker pods on success
delete_worker_pods = True
# Delete worker pods on failure
delete_worker_pods_on_failure = True
# Pod creation timeout (seconds)
pod_creation_timeout = 600
# Maximum number of pods per task
max_pod_pending_timeout = 600
# Image pull policy
image_pull_policy = IfNotPresent
# Node selector for worker pods
# worker_node_selector = {"node-type": "worker"}
# Tolerations for worker pods
# worker_tolerations = [{"key": "dedicated", "operator": "Equal", "value": "worker", "effect": "NoSchedule"}]
Pod Template Configuration
# pod_templates/default_template.yaml
apiVersion: v1
kind: Pod
metadata:
labels:
app: airflow-worker
component: worker
spec:
serviceAccountName: airflow-worker
containers:
- name: base
image: apache/airflow:2.8.0
command:
- "airflow"
- "serve-logs"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
env:
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: database-url
volumeMounts:
- name: airflow-config
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: dags
mountPath: /opt/airflow/dags
securityContext:
runAsUser: 50000
runAsGroup: 0
fsGroup: 0
volumes:
- name: airflow-config
configMap:
name: airflow-config
- name: dags
persistentVolumeClaim:
claimName: airflow-dags-pvc
nodeSelector:
node-type: worker
tolerations:
- key: "dedicated"
operator: "Equal"
value: "worker"
effect: "NoSchedule"
restartPolicy: Never
Custom Pod Templates per Task
from airflow.decorators import task, dag
from datetime import datetime
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['kubernetes', 'custom-pods'],
)
def custom_pod_dag():
@task(
kubernetes_config={
'pod_template_file': '/opt/airflow/pod_templates/heavy_computation.yaml',
'resources': {
'requests': {'cpu': '2', 'memory': '4Gi'},
'limits': {'cpu': '4', 'memory': '8Gi'},
},
}
)
def heavy_computation():
"""Task requiring heavy compute resources."""
import time
time.sleep(10)
return {"status": "completed"}
@task(
kubernetes_config={
'pod_template_file': '/opt/airflow/pod_templates/light_task.yaml',
'resources': {
'requests': {'cpu': '250m', 'memory': '512Mi'},
'limits': {'cpu': '500m', 'memory': '1Gi'},
},
}
)
def light_task():
"""Task requiring minimal resources."""
return {"status": "completed"}
heavy_computation() >> light_task()
custom_pod_dag()
Here,
- =Number of pods at time t
- =Minimum pod count
- =Maximum pod count
- =Pending tasks at time t
Pod Resource Utilization
Here,
- =Pod resource utilization
- =Resources actually used
- =Resources requested in pod spec
Cluster Cost Optimization
Here,
- =Total cluster cost
- =Cost per pod-hour for pod i
- =Runtime of pod i in hours
- =Total number of pods created
The Kubernetes executor creates a new pod for each task, providing complete isolation. This is ideal for multi-tenant environments but has higher startup overhead (~30-60s per pod).
Use pod templates to customize worker pods per task. Heavy computation tasks can request more resources, while light tasks can use minimal resources, optimizing cluster costs.
Key Concepts Table
| Component | Purpose | Configuration | Impact |
|---|---|---|---|
| Executor | Task dispatch | executor = KubernetesExecutor | Core |
| Namespace | Isolation | namespace = airflow | Security |
| Pod Template | Pod configuration | pod_template_file | Flexibility |
| Resources | CPU/Memory limits | worker_resource_* | Performance |
| Node Selector | Pod placement | worker_node_selector | Cost |
| Tolerations | Schedule on tainted nodes | worker_tolerations | Availability |
| Service Account | RBAC | worker_service_account_name | Security |
Code Examples
Advanced K8s Executor Configuration
# k8s_executor_advanced.py
from airflow.configuration import conf
from airflow.kubernetes.pod_generator import PodGenerator
from kubernetes.client import V1Pod, V1Container, V1ResourceRequirements
def create_custom_pod_template():
"""Create custom pod template for specific workloads."""
container = V1Container(
name='base',
image='apache/airflow:2.8.0-python3.10',
command=['airflow', 'serve-logs'],
resources=V1ResourceRequirements(
requests={'cpu': '1', 'memory': '2Gi'},
limits={'cpu': '2', 'memory': '4Gi'},
),
env=[
{'name': 'AIRFLOW__CORE__EXECUTOR', 'value': 'KubernetesExecutor'},
],
volume_mounts=[
{'name': 'dags', 'mountPath': '/opt/airflow/dags'},
],
)
pod = V1Pod(
metadata={'labels': {'component': 'worker', 'workload': 'heavy'}},
spec={
'containers': [container],
'volumes': [
{'name': 'dags', 'persistentVolumeClaim': {'claimName': 'airflow-dags'}},
],
'nodeSelector': {'node-type': 'compute-optimized'},
'tolerations': [
{'key': 'workload', 'operator': 'Equal', 'value': 'heavy', 'effect': 'NoSchedule'},
],
},
)
return PodGenerator.serialize_pod(pod)
# Use in airflow.cfg
# pod_template_file = /opt/airflow/pod_templates/heavy_computation.yaml
Pod Monitoring and Cleanup
# pod_monitoring.py
from kubernetes import client, config
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class PodMonitor:
"""Monitor and manage Airflow worker pods."""
def __init__(self, namespace='airflow'):
config.load_incluster_config()
self.v1 = client.CoreV1Api()
self.namespace = namespace
def list_worker_pods(self):
"""List all Airflow worker pods."""
pods = self.v1.list_namespaced_pod(
namespace=self.namespace,
label_selector='component=worker',
)
return pods.items
def get_pod_status(self, pod_name):
"""Get status of a specific pod."""
pod = self.v1.read_namespaced_pod(
name=pod_name,
namespace=self.namespace,
)
return {
'name': pod.metadata.name,
'status': pod.status.phase,
'start_time': pod.status.start_time,
'node_name': pod.spec.node_name,
}
def cleanup_old_pods(self, max_age_hours=24):
"""Clean up pods older than max_age_hours."""
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
pods = self.list_worker_pods()
cleaned = 0
for pod in pods:
if pod.status.start_time and pod.status.start_time.replace(tzinfo=None) < cutoff_time:
if pod.status.phase in ['Succeeded', 'Failed']:
try:
self.v1.delete_namespaced_pod(
name=pod.metadata.name,
namespace=self.namespace,
)
cleaned += 1
logger.info(f"Deleted pod: {pod.metadata.name}")
except Exception as e:
logger.error(f"Failed to delete pod {pod.metadata.name}: {e}")
return cleaned
def get_pod_metrics(self):
"""Get metrics for all worker pods."""
pods = self.list_worker_pods()
metrics = {
'total': len(pods),
'running': 0,
'pending': 0,
'succeeded': 0,
'failed': 0,
}
for pod in pods:
status = pod.status.phase
if status == 'Running':
metrics['running'] += 1
elif status == 'Pending':
metrics['pending'] += 1
elif status == 'Succeeded':
metrics['succeeded'] += 1
elif status == 'Failed':
metrics['failed'] += 1
return metrics
if __name__ == "__main__":
monitor = PodMonitor()
# Get pod metrics
metrics = monitor.get_pod_metrics()
print(f"Pod metrics: {metrics}")
# Cleanup old pods
cleaned = monitor.cleanup_old_pods(max_age_hours=12)
print(f"Cleaned up {cleaned} pods")
Resource-Aware Scheduling
from airflow.decorators import task, dag
from kubernetes import client, config
from datetime import datetime
@dag(
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['kubernetes', 'resource-aware'],
)
def resource_aware_k8s_dag():
@task
def check_cluster_resources():
"""Check available cluster resources."""
config.load_incluster_config()
v1 = client.CoreV1Api()
# Get node resources
nodes = v1.list_node()
total_cpu = 0
total_memory = 0
available_cpu = 0
available_memory = 0
for node in nodes.items:
if node.status.allocatable:
total_cpu += int(node.status.allocatable.get('cpu', 0))
total_memory += int(node.status.allocatable.get('memory', '0Gi').replace('Gi', ''))
# Get resource requests from running pods
pods = v1.list_namespaced_pod(namespace='airflow')
for pod in pods.items:
if pod.status.phase == 'Running' and pod.spec.containers:
for container in pod.spec.containers:
if container.resources.requests:
available_cpu -= int(container.resources.requests.get('cpu', '0').replace('m', '') or 0) / 1000
available_memory -= int(container.resources.requests.get('memory', '0Gi').replace('Gi', '') or 0)
return {
'total_cpu': total_cpu,
'total_memory': total_memory,
'available_cpu': max(0, total_cpu - available_cpu),
'available_memory': max(0, total_memory - available_memory),
}
@task
def adaptive_resource_allocation(resources: dict):
"""Allocate resources based on cluster availability."""
if resources['available_cpu'] > 4:
return {'cpu': '2', 'memory': '4Gi', 'priority': 'high'}
elif resources['available_cpu'] > 2:
return {'cpu': '1', 'memory': '2Gi', 'priority': 'medium'}
else:
return {'cpu': '500m', 'memory': '1Gi', 'priority': 'low'}
@task(
kubernetes_config={
'resources': '{{ resources }}', # Dynamic from previous task
}
)
def process_with_adaptive_resources(resources: dict):
"""Process with adaptive resource allocation."""
return {
'allocated_cpu': resources['cpu'],
'allocated_memory': resources['memory'],
'status': 'processing',
}
cluster_resources = check_cluster_resources()
resource_allocation = adaptive_resource_allocation(cluster_resources)
process_with_adaptive_resources(resource_allocation)
resource_aware_k8s_dag()
Performance Metrics
K8s Executor vs Other Executors
| Metric | Sequential | Local | Celery | Kubernetes |
|---|---|---|---|---|
| Startup Time | 0s | 0s | 10-30s | 30-60s |
| Isolation | None | Process | Container | Pod |
| Scaling | Manual | Manual | Static | Dynamic |
| Resource Efficiency | Low | Medium | Medium | High |
| Cost | Low | Low | Medium | Variable |
| Multi-tenancy | No | No | Limited | Yes |
Pod Resource Optimization
| Workload Type | CPU Request | Memory Request | Cost/Pod-Hour |
|---|---|---|---|
| Light Task | 250m | 512Mi | $0.02 |
| Medium Task | 1 CPU | 2Gi | $0.08 |
| Heavy Task | 2 CPU | 4Gi | $0.16 |
| GPU Task | 2 CPU + 1 GPU | 8Gi | $1.50 |
Key Takeaways:
- Kubernetes executor creates isolated pods per task with dynamic resource allocation
- Use pod templates to customize worker pods for different workload types
- Configure resource requests and limits to optimize cluster costs
- Enable pod cleanup to prevent resource leaks
- Monitor pod metrics and cluster resources for capacity planning
- Use node selectors and tolerations for workload placement
See Also
- Executors Comparison — Comparing all executor types
- Performance Tuning — Optimizing executor performance
- Monitoring and Alerting — Monitoring pod metrics
- Multi-Tenancy — Isolating workloads with K8s