CW

Kubernetes Executor Deep Dive in Apache Airflow

Free Lesson

Advertisement

Kubernetes Executor Deep Dive

Architecture Diagram

Formal Definitions

DfKubernetes Executor

The Kubernetes Executor is an Airflow executor that creates a separate Kubernetes pod for each task execution. It provides dynamic resource allocation, isolation, and automatic cleanup. Formally, it maps tasks to pods: fk8s:TPf_{\text{k8s}}: T \rightarrow P where TT is the set of tasks and PP is the set of pods.

DfPod Template

A Pod Template is a YAML specification defining the configuration for worker pods. It includes container image, resources, volumes, environment variables, and node selectors. The template is Mpod=(I,R,V,E,N)M_{\text{pod}} = (I, R, V, E, N) where II is the image, RR is resources, VV is volumes, EE is env vars, and NN is node selector.

DfDynamic Scaling

Dynamic Scaling in Kubernetes adjusts the number of worker pods based on workload. The scaling function is S(t)=max(Nmin,min(Nmax,Npending(t)))S(t) = \max(N_{\min}, \min(N_{\max}, N_{\text{pending}}(t))) where NpendingN_{\text{pending}} is the number of tasks waiting for execution.

Detailed Explanation

Basic K8s Executor Configuration

# airflow.cfg - Kubernetes executor configuration
[core]
executor = KubernetesExecutor

[kubernetes]
# Namespace for worker pods
namespace = airflow

# Worker pod image
worker_container_image = apache/airflow:2.8.0

# Service account name
worker_service_account_name = airflow-worker

# Resource limits for worker pods
worker_resource_request_cpu = 500m
worker_resource_request_memory = 1Gi
worker_resource_limit_cpu = 1000m
worker_resource_limit_memory = 2Gi

# Pod template file
pod_template_file = /opt/airflow/pod_templates/default_template.yaml

# Delete worker pods on success
delete_worker_pods = True

# Delete worker pods on failure
delete_worker_pods_on_failure = True

# Pod creation timeout (seconds)
pod_creation_timeout = 600

# Maximum number of pods per task
max_pod_pending_timeout = 600

# Image pull policy
image_pull_policy = IfNotPresent

# Node selector for worker pods
# worker_node_selector = {"node-type": "worker"}

# Tolerations for worker pods
# worker_tolerations = [{"key": "dedicated", "operator": "Equal", "value": "worker", "effect": "NoSchedule"}]

Pod Template Configuration

# pod_templates/default_template.yaml
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: airflow-worker
    component: worker
spec:
  serviceAccountName: airflow-worker
  containers:
    - name: base
      image: apache/airflow:2.8.0
      command:
        - "airflow"
        - "serve-logs"
      resources:
        requests:
          cpu: 500m
          memory: 1Gi
        limits:
          cpu: 1000m
          memory: 2Gi
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: database-url
      volumeMounts:
        - name: airflow-config
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: dags
          mountPath: /opt/airflow/dags
      securityContext:
        runAsUser: 50000
        runAsGroup: 0
        fsGroup: 0
  volumes:
    - name: airflow-config
      configMap:
        name: airflow-config
    - name: dags
      persistentVolumeClaim:
        claimName: airflow-dags-pvc
  nodeSelector:
    node-type: worker
  tolerations:
    - key: "dedicated"
      operator: "Equal"
      value: "worker"
      effect: "NoSchedule"
  restartPolicy: Never

Custom Pod Templates per Task

from airflow.decorators import task, dag
from datetime import datetime

@dag(
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['kubernetes', 'custom-pods'],
)
def custom_pod_dag():
    
    @task(
        kubernetes_config={
            'pod_template_file': '/opt/airflow/pod_templates/heavy_computation.yaml',
            'resources': {
                'requests': {'cpu': '2', 'memory': '4Gi'},
                'limits': {'cpu': '4', 'memory': '8Gi'},
            },
        }
    )
    def heavy_computation():
        """Task requiring heavy compute resources."""
        import time
        time.sleep(10)
        return {"status": "completed"}
    
    @task(
        kubernetes_config={
            'pod_template_file': '/opt/airflow/pod_templates/light_task.yaml',
            'resources': {
                'requests': {'cpu': '250m', 'memory': '512Mi'},
                'limits': {'cpu': '500m', 'memory': '1Gi'},
            },
        }
    )
    def light_task():
        """Task requiring minimal resources."""
        return {"status": "completed"}
    
    heavy_computation() >> light_task()

custom_pod_dag()
Pod Scaling Function
S(t)=max(Nmin,min(Nmax,Npending(t)))S(t) = \max(N_{\min}, \min(N_{\max}, N_{\text{pending}}(t)))

Here,

  • S(t)S(t)=Number of pods at time t
  • NminN_{min}=Minimum pod count
  • NmaxN_{max}=Maximum pod count
  • Nextpending(t)N_{ ext{pending}}(t)=Pending tasks at time t

Pod Resource Utilization

Upod=RusedRrequested×100%U_{\text{pod}} = \frac{R_{\text{used}}}{R_{\text{requested}}} \times 100\%

Here,

  • UextpodU_{ ext{pod}}=Pod resource utilization
  • RextusedR_{ ext{used}}=Resources actually used
  • RextrequestedR_{ ext{requested}}=Resources requested in pod spec

Cluster Cost Optimization

Ctotal=i=1NCpod,iTrun,iC_{\text{total}} = \sum_{i=1}^{N} C_{\text{pod},i} \cdot T_{\text{run},i}

Here,

  • CexttotalC_{ ext{total}}=Total cluster cost
  • Cextpod,iC_{ ext{pod},i}=Cost per pod-hour for pod i
  • Textrun,iT_{ ext{run},i}=Runtime of pod i in hours
  • NN=Total number of pods created

The Kubernetes executor creates a new pod for each task, providing complete isolation. This is ideal for multi-tenant environments but has higher startup overhead (~30-60s per pod).

Use pod templates to customize worker pods per task. Heavy computation tasks can request more resources, while light tasks can use minimal resources, optimizing cluster costs.

Key Concepts Table

ComponentPurposeConfigurationImpact
ExecutorTask dispatchexecutor = KubernetesExecutorCore
NamespaceIsolationnamespace = airflowSecurity
Pod TemplatePod configurationpod_template_fileFlexibility
ResourcesCPU/Memory limitsworker_resource_*Performance
Node SelectorPod placementworker_node_selectorCost
TolerationsSchedule on tainted nodesworker_tolerationsAvailability
Service AccountRBACworker_service_account_nameSecurity

Code Examples

Advanced K8s Executor Configuration

# k8s_executor_advanced.py
from airflow.configuration import conf
from airflow.kubernetes.pod_generator import PodGenerator
from kubernetes.client import V1Pod, V1Container, V1ResourceRequirements

def create_custom_pod_template():
    """Create custom pod template for specific workloads."""
    
    container = V1Container(
        name='base',
        image='apache/airflow:2.8.0-python3.10',
        command=['airflow', 'serve-logs'],
        resources=V1ResourceRequirements(
            requests={'cpu': '1', 'memory': '2Gi'},
            limits={'cpu': '2', 'memory': '4Gi'},
        ),
        env=[
            {'name': 'AIRFLOW__CORE__EXECUTOR', 'value': 'KubernetesExecutor'},
        ],
        volume_mounts=[
            {'name': 'dags', 'mountPath': '/opt/airflow/dags'},
        ],
    )
    
    pod = V1Pod(
        metadata={'labels': {'component': 'worker', 'workload': 'heavy'}},
        spec={
            'containers': [container],
            'volumes': [
                {'name': 'dags', 'persistentVolumeClaim': {'claimName': 'airflow-dags'}},
            ],
            'nodeSelector': {'node-type': 'compute-optimized'},
            'tolerations': [
                {'key': 'workload', 'operator': 'Equal', 'value': 'heavy', 'effect': 'NoSchedule'},
            ],
        },
    )
    
    return PodGenerator.serialize_pod(pod)

# Use in airflow.cfg
# pod_template_file = /opt/airflow/pod_templates/heavy_computation.yaml

Pod Monitoring and Cleanup

# pod_monitoring.py
from kubernetes import client, config
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class PodMonitor:
    """Monitor and manage Airflow worker pods."""
    
    def __init__(self, namespace='airflow'):
        config.load_incluster_config()
        self.v1 = client.CoreV1Api()
        self.namespace = namespace
    
    def list_worker_pods(self):
        """List all Airflow worker pods."""
        pods = self.v1.list_namespaced_pod(
            namespace=self.namespace,
            label_selector='component=worker',
        )
        return pods.items
    
    def get_pod_status(self, pod_name):
        """Get status of a specific pod."""
        pod = self.v1.read_namespaced_pod(
            name=pod_name,
            namespace=self.namespace,
        )
        return {
            'name': pod.metadata.name,
            'status': pod.status.phase,
            'start_time': pod.status.start_time,
            'node_name': pod.spec.node_name,
        }
    
    def cleanup_old_pods(self, max_age_hours=24):
        """Clean up pods older than max_age_hours."""
        cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
        
        pods = self.list_worker_pods()
        cleaned = 0
        
        for pod in pods:
            if pod.status.start_time and pod.status.start_time.replace(tzinfo=None) < cutoff_time:
                if pod.status.phase in ['Succeeded', 'Failed']:
                    try:
                        self.v1.delete_namespaced_pod(
                            name=pod.metadata.name,
                            namespace=self.namespace,
                        )
                        cleaned += 1
                        logger.info(f"Deleted pod: {pod.metadata.name}")
                    except Exception as e:
                        logger.error(f"Failed to delete pod {pod.metadata.name}: {e}")
        
        return cleaned
    
    def get_pod_metrics(self):
        """Get metrics for all worker pods."""
        pods = self.list_worker_pods()
        
        metrics = {
            'total': len(pods),
            'running': 0,
            'pending': 0,
            'succeeded': 0,
            'failed': 0,
        }
        
        for pod in pods:
            status = pod.status.phase
            if status == 'Running':
                metrics['running'] += 1
            elif status == 'Pending':
                metrics['pending'] += 1
            elif status == 'Succeeded':
                metrics['succeeded'] += 1
            elif status == 'Failed':
                metrics['failed'] += 1
        
        return metrics

if __name__ == "__main__":
    monitor = PodMonitor()
    
    # Get pod metrics
    metrics = monitor.get_pod_metrics()
    print(f"Pod metrics: {metrics}")
    
    # Cleanup old pods
    cleaned = monitor.cleanup_old_pods(max_age_hours=12)
    print(f"Cleaned up {cleaned} pods")

Resource-Aware Scheduling

from airflow.decorators import task, dag
from kubernetes import client, config
from datetime import datetime

@dag(
    schedule_interval="@hourly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['kubernetes', 'resource-aware'],
)
def resource_aware_k8s_dag():
    
    @task
    def check_cluster_resources():
        """Check available cluster resources."""
        config.load_incluster_config()
        v1 = client.CoreV1Api()
        
        # Get node resources
        nodes = v1.list_node()
        total_cpu = 0
        total_memory = 0
        available_cpu = 0
        available_memory = 0
        
        for node in nodes.items:
            if node.status.allocatable:
                total_cpu += int(node.status.allocatable.get('cpu', 0))
                total_memory += int(node.status.allocatable.get('memory', '0Gi').replace('Gi', ''))
        
        # Get resource requests from running pods
        pods = v1.list_namespaced_pod(namespace='airflow')
        for pod in pods.items:
            if pod.status.phase == 'Running' and pod.spec.containers:
                for container in pod.spec.containers:
                    if container.resources.requests:
                        available_cpu -= int(container.resources.requests.get('cpu', '0').replace('m', '') or 0) / 1000
                        available_memory -= int(container.resources.requests.get('memory', '0Gi').replace('Gi', '') or 0)
        
        return {
            'total_cpu': total_cpu,
            'total_memory': total_memory,
            'available_cpu': max(0, total_cpu - available_cpu),
            'available_memory': max(0, total_memory - available_memory),
        }
    
    @task
    def adaptive_resource_allocation(resources: dict):
        """Allocate resources based on cluster availability."""
        if resources['available_cpu'] > 4:
            return {'cpu': '2', 'memory': '4Gi', 'priority': 'high'}
        elif resources['available_cpu'] > 2:
            return {'cpu': '1', 'memory': '2Gi', 'priority': 'medium'}
        else:
            return {'cpu': '500m', 'memory': '1Gi', 'priority': 'low'}
    
    @task(
        kubernetes_config={
            'resources': '{{ resources }}',  # Dynamic from previous task
        }
    )
    def process_with_adaptive_resources(resources: dict):
        """Process with adaptive resource allocation."""
        return {
            'allocated_cpu': resources['cpu'],
            'allocated_memory': resources['memory'],
            'status': 'processing',
        }
    
    cluster_resources = check_cluster_resources()
    resource_allocation = adaptive_resource_allocation(cluster_resources)
    process_with_adaptive_resources(resource_allocation)

resource_aware_k8s_dag()

Performance Metrics

K8s Executor vs Other Executors

MetricSequentialLocalCeleryKubernetes
Startup Time0s0s10-30s30-60s
IsolationNoneProcessContainerPod
ScalingManualManualStaticDynamic
Resource EfficiencyLowMediumMediumHigh
CostLowLowMediumVariable
Multi-tenancyNoNoLimitedYes

Pod Resource Optimization

Workload TypeCPU RequestMemory RequestCost/Pod-Hour
Light Task250m512Mi$0.02
Medium Task1 CPU2Gi$0.08
Heavy Task2 CPU4Gi$0.16
GPU Task2 CPU + 1 GPU8Gi$1.50

Key Takeaways:

  • Kubernetes executor creates isolated pods per task with dynamic resource allocation
  • Use pod templates to customize worker pods for different workload types
  • Configure resource requests and limits to optimize cluster costs
  • Enable pod cleanup to prevent resource leaks
  • Monitor pod metrics and cluster resources for capacity planning
  • Use node selectors and tolerations for workload placement

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement