πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow on Kubernetes

Apache Airflow AdvancedKubernetes⭐ Premium

Advertisement

Airflow on Kubernetes

Cloud-Native Orchestration Patterns

NetflixAmazonDifficulty: Advanced

Interview Question

ℹ️Interview Context

Company: Netflix / Amazon Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes

Question: "Explain how Airflow works on Kubernetes. Compare the KubernetesPodOperator with the KubernetesExecutor. When would you use each approach?"


Detailed Theory

Kubernetes Fundamentals

# kubernetes_fundamentals.py
"""
Airflow on Kubernetes:

1. KubernetesPodOperator:
   - Runs each task in a separate Pod
   - Full control over Pod configuration
   - Good for: Isolated workloads, custom images

2. KubernetesExecutor:
   - Dynamically scales workers
   - Each task gets its own Pod
   - Good for: Elastic scaling, resource efficiency

3. Helm Chart:
   - Official Airflow Helm chart
   - Simplifies deployment
   - Good for: Standard deployments

4. Docker Images:
   - Custom Airflow images
   - Include dependencies
   - Good for: Consistent environments
"""

1. KubernetesPodOperator

# kubernetes_pod_operator.py
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.decorators import dag, task
from datetime import datetime
from kubernetes.client import V1ResourceRequirements, V1EnvVar

@dag(
    dag_id='k8s_pod_operator',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def k8s_dag():
    # Simple Pod
    simple_pod = KubernetesPodOperator(
        task_id='simple_pod',
        name='simple-pod',
        namespace='airflow',
        image='python:3.9-slim',
        cmds=['python', '-c'],
        arguments=['print("Hello from Pod")'],
        get_logs=True,
    )
    
    # Pod with resources
    resource_pod = KubernetesPodOperator(
        task_id='resource_pod',
        name='resource-pod',
        namespace='airflow',
        image='my-image:latest',
        cmds=['python', 'process.py'],
        resources=V1ResourceRequirements(
            requests={'cpu': '1', 'memory': '2Gi'},
            limits={'cpu': '2', 'memory': '4Gi'},
        ),
        env_vars=[
            V1EnvVar(name='DATABASE_URL', value='postgresql://...'),
            V1EnvVar(name='API_KEY', valueFrom={'secretKeyRef': {'name': 'api-secret', 'key': 'key'}}),
        ],
        get_logs=True,
    )
    
    # Pod with volume mounts
    volume_pod = KubernetesPodOperator(
        task_id='volume_pod',
        name='volume-pod',
        namespace='airflow',
        image='my-image:latest',
        cmds=['python', 'process.py'],
        volume_mounts=[
            {'name': 'data-volume', 'mountPath': '/data'},
        ],
        volumes=[
            {'name': 'data-volume', 'persistentVolumeClaim': {'claimName': 'data-pvc'}},
        ],
        get_logs=True,
    )
    
    simple_pod >> resource_pod >> volume_pod

k8s_dag()

ℹ️Pro Tip

Use KubernetesPodOperator for tasks that need isolated environments or custom dependencies. Each task runs in its own Pod with its own resources.

2. KubernetesExecutor

# kubernetes_executor.py
"""
KubernetesExecutor Configuration:

The KubernetesExecutor dynamically creates Pods for each task.
It's more efficient than running all tasks in fixed workers.
"""

# Configuration
K8S_EXECUTOR_CONFIG = """
[core]
executor = airflow.executors.kubernetes_executor.KubernetesExecutor

[kubernetes]
# Namespace for worker Pods
namespace = airflow

# Worker container image
worker_container_image = apache/airflow:2.7.0

# Worker resource requests
worker_container_request_memory = 2Gi
worker_container_request_cpu = 1

# Worker resource limits
worker_container_limit_memory = 4Gi
worker_container_limit_cpu = 2

# Delete worker Pods after completion
delete_worker_pods = True

# Delete worker Pods on failure
delete_worker_pods_on_failure = True

# Pod template file
pod_template_file = /opt/airflow/pod_templates/basic_template.yaml

# Service account name
service_account_name = airflow-worker

# Image pull policy
image_pull_policy = IfNotPresent

# Annotations for worker Pods
annotations = {
    "sidecar.istio.io/inject": "true"
}

# Labels for worker Pods
labels = {
    "app": "airflow-worker",
    "environment": "production"
}
"""

# Pod template
POD_TEMPLATE = """
apiVersion: v1
kind: Pod
metadata:
  name: airflow-worker
  labels:
    app: airflow-worker
spec:
  serviceAccountName: airflow-worker
  containers:
    - name: worker
      image: apache/airflow:2.7.0
      command: ["airflow", "celery", "worker"]
      resources:
        requests:
          memory: "2Gi"
          cpu: "1"
        limits:
          memory: "4Gi"
          cpu: "2"
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: database-url
  nodeSelector:
    node-type: airflow-worker
  tolerations:
    - key: "airflow"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"
"""

3. Custom Airflow Images

# custom_images.py
"""
Custom Airflow Images:

Build custom images with your dependencies
for consistent environments across all tasks.
"""

# Dockerfile
DOCKERFILE = """
FROM apache/airflow:2.7.0-python3.9

# Install system dependencies
USER root
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Switch back to airflow user
USER airflow

# Install Python dependencies
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt

# Copy DAGs and scripts
COPY dags/ /opt/airflow/dags/
COPY scripts/ /opt/airflow/scripts/

# Set environment variables
ENV PYTHONPATH=/opt/airflow
ENV AIRFLOW_HOME=/opt/airflow
"""

# requirements.txt
REQUIREMENTS = """
apache-airflow[celery,kubernetes]==2.7.0
apache-airflow-providers-postgres==10.0.0
apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-google==10.0.0
pandas==2.1.0
psycopg2-binary==2.9.7
boto3==1.28.0
"""

# Build and push
BUILD_COMMANDS = """
# Build image
docker build -t my-airflow:latest .

# Tag for registry
docker tag my-airflow:latest registry.example.com/my-airflow:latest

# Push to registry
docker push registry.example.com/my-airflow:latest
"""

4. Helm Chart Configuration

# helm_chart.py
"""
Airflow Helm Chart Configuration:

The official Airflow Helm chart simplifies deployment
on Kubernetes with proper defaults.
"""

# values.yaml
HELM_VALUES = """
# Airflow image
airflowImage:
  repository: apache/airflow
  tag: 2.7.0
  pullPolicy: IfNotPresent

# DAGs
dags:
  gitSync:
    enabled: true
    repo: https://github.com/example/airflow-dags.git
    branch: main
    subPath: dags
    period: 60

# Workers
workers:
  enabled: true
  replicas: 3
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 2
      memory: 4Gi
  autoscaling:
    enabled: true
    minReplicas: 1
    maxReplicas: 10
    targetCPUUtilization: 80

# Scheduler
scheduler:
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 2
      memory: 4Gi

# Webserver
webserver:
  resources:
    requests:
      cpu: 0.5
      memory: 1Gi
    limits:
      cpu: 1
      memory: 2Gi
  service:
    type: LoadBalancer

# Database
postgresql:
  enabled: true
  primary:
    persistence:
      enabled: true
      size: 10Gi

# Redis
redis:
  enabled: true
  master:
    persistence:
      enabled: true
      size: 1Gi

# Prometheus
prometheus:
  enabled: true

# Grafana
grafana:
  enabled: true
"""

# Install commands
INSTALL_COMMANDS = """
# Add Airflow Helm repo
helm repo add apache-airflow https://airflow.apache.org/helm
helm repo update

# Install Airflow
helm install my-airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  -f values.yaml

# Upgrade Airflow
helm upgrade my-airflow apache-airflow/airflow \
  --namespace airflow \
  -f values.yaml

# Uninstall Airflow
helm uninstall my-airflow --namespace airflow
"""

⚠️Important

Always test Helm chart changes in a staging environment before applying to production. Use helm diff to preview changes.

5. Resource Management

# resource_management.py
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import V1ResourceRequirements

# Resource requests and limits
RESOURCE_EXAMPLES = {
    # CPU-bound task
    'cpu_bound': {
        'requests': {'cpu': '2', 'memory': '4Gi'},
        'limits': {'cpu': '4', 'memory': '8Gi'},
    },
    
    # Memory-bound task
    'memory_bound': {
        'requests': {'cpu': '1', 'memory': '8Gi'},
        'limits': {'cpu': '2', 'memory': '16Gi'},
    },
    
    # I/O-bound task
    'io_bound': {
        'requests': {'cpu': '0.5', 'memory': '1Gi'},
        'limits': {'cpu': '1', 'memory': '2Gi'},
    },
}

# Usage in operator
cpu_task = KubernetesPodOperator(
    task_id='cpu_task',
    name='cpu-task',
    namespace='airflow',
    image='my-image:latest',
    resources=V1ResourceRequirements(
        requests=RESOURCE_EXAMPLES['cpu_bound']['requests'],
        limits=RESOURCE_EXAMPLES['cpu_bound']['limits'],
    ),
)

# Resource quotas
RESOURCE_QUOTA = """
apiVersion: v1
kind: ResourceQuota
metadata:
  name: airflow-resource-quota
  namespace: airflow
spec:
  hard:
    requests.cpu: "20"
    requests.memory: 40Gi
    limits.cpu: "40"
    limits.memory: 80Gi
    pods: "100"
"""

Real-World Scenarios

Scenario 1: Netflix's K8s Deployment

# netflix_k8s.py
"""
Netflix-style Kubernetes deployment:
- Dynamic scaling
- Resource isolation
- Custom images per workload
"""

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='netflix_k8s_pipeline',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['netflix', 'kubernetes', 'production'],
)
def netflix_k8s_pipeline():
    # Data processing Pod
    data_processing = KubernetesPodOperator(
        task_id='data_processing',
        name='data-processing',
        namespace='netflix-data',
        image='registry.netflix.com/data-processing:latest',
        cmds=['python', 'process.py'],
        arguments=['--date', '{{ ds }}'],
        resources={
            'requests': {'cpu': '4', 'memory': '8Gi'},
            'limits': {'cpu': '8', 'memory': '16Gi'},
        },
        env_vars=[
            {'name': 'ENVIRONMENT', 'value': 'production'},
            {'name': 'REGION', 'value': 'us-east-1'},
        ],
        node_selector={'workload-type': 'data-processing'},
        tolerations=[
            {'key': 'data-processing', 'operator': 'Exists', 'effect': 'NoSchedule'}
        ],
        get_logs=True,
    )
    
    # ML training Pod
    ml_training = KubernetesPodOperator(
        task_id='ml_training',
        name='ml-training',
        namespace='netflix-ml',
        image='registry.netflix.com/ml-training:latest',
        cmds=['python', 'train.py'],
        resources={
            'requests': {'cpu': '8', 'memory': '32Gi', 'nvidia.com/gpu': '1'},
            'limits': {'cpu': '16', 'memory': '64Gi', 'nvidia.com/gpu': '2'},
        },
        node_selector={'workload-type': 'ml-training', 'gpu': 'true'},
        get_logs=True,
    )
    
    data_processing >> ml_training

netflix_k8s_pipeline()

Scenario 2: Amazon's Multi-Tenant K8s

# amazon_k8s.py
"""
Amazon-style multi-tenant Kubernetes:
- Team isolation
- Resource quotas
- Custom scheduling
"""

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.decorators import dag
from datetime import datetime

@dag(
    dag_id='amazon_k8s_multi_tenant',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['amazon', 'kubernetes', 'multi-tenant'],
)
def amazon_k8s_multi_tenant():
    # Team A Pod
    team_a = KubernetesPodOperator(
        task_id='team_a_processing',
        name='team-a-processing',
        namespace='airflow-team-a',
        image='registry.amazon.com/team-a:latest',
        resources={
            'requests': {'cpu': '2', 'memory': '4Gi'},
            'limits': {'cpu': '4', 'memory': '8Gi'},
        },
        service_account_name='team-a-sa',
        get_logs=True,
    )
    
    # Team B Pod
    team_b = KubernetesPodOperator(
        task_id='team_b_processing',
        name='team-b-processing',
        namespace='airflow-team-b',
        image='registry.amazon.com/team-b:latest',
        resources={
            'requests': {'cpu': '2', 'memory': '4Gi'},
            'limits': {'cpu': '4', 'memory': '8Gi'},
        },
        service_account_name='team-b-sa',
        get_logs=True,
    )
    
    # Different namespaces for isolation
    [team_a, team_b]

amazon_k8s_multi_tenant()

Edge Cases

⚠️Common Pitfalls

  1. Resource Requests: Always set resource requests to ensure proper scheduling.

  2. Image Pull Policies: Use appropriate pull policies to avoid stale images.

  3. Pod Cleanup: Enable delete_worker_pods to prevent resource leaks.

  4. Network Policies: Implement network policies for security.

# edge_cases.py
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

# Resource request issue
bad_pod = KubernetesPodOperator(
    task_id='bad_pod',
    name='bad-pod',
    namespace='airflow',
    image='my-image:latest',
    # BAD: No resource requests
)

# Correct resource requests
good_pod = KubernetesPodOperator(
    task_id='good_pod',
    name='good-pod',
    namespace='airflow',
    image='my-image:latest',
    resources={
        'requests': {'cpu': '1', 'memory': '2Gi'},
        'limits': {'cpu': '2', 'memory': '4Gi'},
    },
)

# Image pull policy issue
bad_pull_policy = KubernetesPodOperator(
    task_id='bad_pull_policy',
    name='bad-pull-policy',
    namespace='airflow',
    image='my-image:latest',
    image_pull_policy='Always',  # BAD: Always pull
)

# Correct pull policy
good_pull_policy = KubernetesPodOperator(
    task_id='good_pull_policy',
    name='good-pull-policy',
    namespace='airflow',
    image='my-image:latest',
    image_pull_policy='IfNotPresent',  # GOOD: Pull only if not present
)

QuizBox


Best Practices

# best_practices.py
"""
Kubernetes Best Practices:

1. Resource Management:
   - Always set resource requests and limits
   - Use appropriate resource profiles
   - Implement resource quotas

2. Image Management:
   - Use specific image tags (not latest)
   - Implement image pull policies
   - Scan images for vulnerabilities

3. Security:
   - Use service accounts with minimal permissions
   - Implement network policies
   - Enable pod security policies

4. Monitoring:
   - Monitor Pod resource usage
   - Alert on resource exhaustion
   - Track Pod lifecycle events

5. Scaling:
   - Use Horizontal Pod Autoscaler
   - Configure Cluster Autoscaler
   - Monitor scaling events
"""

ℹ️Netflix Interview Tip

At Netflix, they use custom Kubernetes operators for specialized workloads. When discussing Kubernetes, emphasize resource isolation, dynamic scaling, and custom scheduling. Also mention how they handle multi-tenancy with namespace isolation.


Summary

Kubernetes provides powerful infrastructure for Airflow. Key takeaways:

  1. KubernetesPodOperator for isolated workloads
  2. KubernetesExecutor for dynamic scaling
  3. Custom images for consistent environments
  4. Helm charts for simplified deployment
  5. Resource management for efficiency

For Netflix and Amazon interviews, focus on:

  • Resource isolation and scheduling
  • Dynamic scaling patterns
  • Multi-tenancy with namespaces
  • Custom image building
  • Security best practices

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.

Advertisement