Interview Question
βΉοΈInterview Context
Company: Microsoft / Meta Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Design a CI/CD pipeline for Airflow DAGs. How do you test, deploy, and monitor DAG changes? What tools and practices would you use?"
Detailed Theory
CI/CD Fundamentals
# cicd_fundamentals.py
"""
Airflow CI/CD Pipeline:
1. Source Control:
- Git for version control
- Branching strategy
- Code review process
2. Testing:
- Unit tests
- Integration tests
- DAG validation
3. Build:
- Docker image build
- DAG packaging
- Dependency management
4. Deployment:
- Git-sync
- Airflow REST API
- Helm charts
5. Monitoring:
- Deployment status
- Rollback triggers
- Health checks
"""
1. GitHub Actions Pipeline
# github_actions.py
"""
GitHub Actions CI/CD for Airflow:
Automated testing and deployment pipeline.
"""
# .github/workflows/airflow-ci.yml
GITHUB_ACTIONS_CONFIG = """
name: Airflow CI/CD
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Lint with flake8
run: |
flake8 dags/ --max-line-length=120
- name: Type check with mypy
run: |
mypy dags/ --ignore-missing-imports
- name: Run unit tests
run: |
pytest tests/unit/ -v --cov=dags/ --cov-report=xml
- name: Run DAG validation
run: |
python scripts/validate_dags.py
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build:
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t my-airflow:${{ github.sha }} .
docker tag my-airflow:${{ github.sha }} registry.example.com/my-airflow:latest
- name: Push to registry
run: |
echo ${{ secrets.REGISTRY_PASSWORD }} | docker login registry.example.com -u ${{ secrets.REGISTRY_USERNAME }} --password-stdin
docker push registry.example.com/my-airflow:latest
deploy:
needs: build
runs-on: ubuntu-latest
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
steps:
- name: Deploy to staging
run: |
helm upgrade my-airflow apache-airflow/airflow \
--namespace airflow-staging \
--set image.tag=${{ github.sha }} \
--wait --timeout 300s
- name: Run integration tests
run: |
python scripts/integration_tests.py --env staging
- name: Deploy to production
if: success()
run: |
helm upgrade my-airflow apache-airflow/airflow \
--namespace airflow-production \
--set image.tag=${{ github.sha }} \
--wait --timeout 300s
"""
βΉοΈPro Tip
Use GitHub Actions for CI/CD pipelines. It provides tight integration with GitHub repositories and supports multiple deployment targets.
2. DAG Validation
# dag_validation.py
"""
DAG Validation Script:
Validates DAGs before deployment.
"""
import ast
import importlib.util
import sys
from pathlib import Path
def validate_dag(dag_path: str) -> bool:
"""Validate a single DAG file"""
try:
# Parse AST
with open(dag_path, 'r') as f:
tree = ast.parse(f.read())
# Check for DAG definitions
has_dag = False
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if hasattr(node, 'func'):
if hasattr(node.func, 'id'):
if node.func.id == 'DAG':
has_dag = True
if not has_dag:
print(f"WARNING: No DAG found in {dag_path}")
return False
# Import and validate
spec = importlib.util.spec_from_file_location("dag_module", dag_path)
module = importlib.util.module_from_spec(spec)
sys.modules["dag_module"] = module
spec.loader.exec_module(module)
print(f"SUCCESS: {dag_path} validated")
return True
except Exception as e:
print(f"ERROR: {dag_path} - {str(e)}")
return False
def validate_all_dags(dag_folder: str) -> bool:
"""Validate all DAGs in folder"""
dag_files = Path(dag_folder).glob("*.py")
results = []
for dag_file in dag_files:
if not dag_file.name.startswith('__'):
results.append(validate_dag(str(dag_file)))
return all(results)
if __name__ == "__main__":
dag_folder = sys.argv[1] if len(sys.argv) > 1 else "dags/"
success = validate_all_dags(dag_folder)
sys.exit(0 if success else 1)
3. Git-Sync Deployment
# git_sync.py
"""
Git-Sync Deployment:
Automatically sync DAGs from Git repository.
"""
# Git-Sync sidecar configuration
GIT_SYNC_CONFIG = """
# Kubernetes Pod with Git-Sync
apiVersion: v1
kind: Pod
metadata:
name: airflow-dags-sync
spec:
containers:
- name: git-sync
image: registry.k8s.io/git-sync:v4.0.0
args:
- --repo=https://github.com/example/airflow-dags.git
- --root=/dags
- --one-time
- --depth=1
volumeMounts:
- name: dags
mountPath: /dags
- name: airflow
image: apache/airflow:2.7.0
volumeMounts:
- name: dags
mountPath: /opt/airflow/dags
volumes:
- name: dags
emptyDir: {}
"""
# Airflow configuration for Git-Sync
AIRFLOW_GIT_SYNC_CONFIG = """
[dags]
# Git-Sync settings
git_sync_enabled = True
git_sync_container_repository = registry.k8s.io/git-sync
git_sync_container_tag = v4.0.0
git_sync_run_as_user = 65533
git_sync_dest = /git/dags
git_sync_root = /git
git_sync_repo = https://github.com/example/airflow-dags.git
git_sync_branch = main
git_sync_subpath = dags
git_sync_wait = 60
"""
4. Airflow REST API Deployment
# rest_api_deployment.py
"""
Airflow REST API Deployment:
Deploy DAGs using Airflow's REST API.
"""
import requests
import json
from typing import Dict, Any
class AirflowDeployer:
"""Deploy DAGs using Airflow REST API"""
def __init__(
self,
base_url: str,
username: str,
password: str,
):
self.base_url = base_url
self.session = requests.Session()
self.session.auth = (username, password)
def upload_dag(
self,
dag_path: str,
dag_id: str,
) -> bool:
"""Upload DAG file"""
try:
with open(dag_path, 'rb') as f:
response = self.session.post(
f"{self.base_url}/api/v1/dags",
files={'file': (f'{dag_id}.py', f)},
)
return response.status_code == 200
except Exception as e:
print(f"Upload failed: {e}")
return False
def pause_dag(self, dag_id: str) -> bool:
"""Pause DAG"""
response = self.session.patch(
f"{self.base_url}/api/v1/dags/{dag_id}",
json={'is_paused': True},
)
return response.status_code == 200
def unpause_dag(self, dag_id: str) -> bool:
"""Unpause DAG"""
response = self.session.patch(
f"{self.base_url}/api/v1/dags/{dag_id}",
json={'is_paused': False},
)
return response.status_code == 200
def trigger_dag(
self,
dag_id: str,
conf: Dict[str, Any] = None,
) -> str:
"""Trigger DAG run"""
payload = {}
if conf:
payload['conf'] = conf
response = self.session.post(
f"{self.base_url}/api/v1/dags/{dag_id}/dagRuns",
json=payload,
)
if response.status_code == 200:
return response.json()['dag_run_id']
return None
def get_dag_status(self, dag_id: str) -> Dict[str, Any]:
"""Get DAG status"""
response = self.session.get(
f"{self.base_url}/api/v1/dags/{dag_id}",
)
if response.status_code == 200:
return response.json()
return {}
# Usage
deployer = AirflowDeployer(
base_url='http://airflow-webserver:8080',
username='admin',
password='admin',
)
# Deploy DAG
deployer.upload_dag('dags/my_dag.py', 'my_dag')
# Trigger DAG
run_id = deployer.trigger_dag('my_dag', {'param': 'value'})
β οΈImportant
Always test DAG deployments in staging before production. Use deployment strategies like blue-green or canary deployments for safety.
5. Deployment Strategies
# deployment_strategies.py
"""
Deployment Strategies:
1. Blue-Green Deployment:
- Maintain two identical environments
- Switch traffic between them
- Easy rollback
2. Canary Deployment:
- Deploy to small subset first
- Monitor for issues
- Gradually increase traffic
3. Rolling Deployment:
- Deploy incrementally
- Update one instance at a time
- Zero downtime
"""
# Blue-Green deployment script
def blue_green_deploy(
deployer: AirflowDeployer,
dag_path: str,
dag_id: str,
):
"""Blue-green deployment"""
# Deploy to green environment
deployer.upload_dag(dag_path, dag_id)
# Run smoke tests
success = run_smoke_tests(dag_id)
if success:
# Switch traffic
deployer.unpause_dag(dag_id)
print(f"Deployed {dag_id} successfully")
else:
# Rollback
deployer.pause_dag(dag_id)
print(f"Deployment failed, rolled back {dag_id}")
# Canary deployment script
def canary_deploy(
deployer: AirflowDeployer,
dag_path: str,
dag_id: str,
canary_percentage: int = 10,
):
"""Canary deployment"""
# Deploy canary
deployer.upload_dag(dag_path, dag_id)
# Trigger with canary flag
run_id = deployer.trigger_dag(dag_id, {'canary': True})
# Monitor
monitor_canary(dag_id, run_id, canary_percentage)
# Full deployment if successful
deployer.unpause_dag(dag_id)
Real-World Scenarios
Scenario 1: Microsoft's CI/CD Pipeline
# microsoft_cicd.py
"""
Microsoft-style CI/CD pipeline:
- Azure DevOps integration
- Multi-stage deployment
- Automated testing
"""
# Azure DevOps YAML
AZURE_DEVOPS_CONFIG = """
trigger:
branches:
include:
- main
- develop
pool:
vmImage: 'ubuntu-latest'
stages:
- stage: Test
jobs:
- job: Test
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.9'
- script: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
displayName: 'Install dependencies'
- script: |
flake8 dags/ --max-line-length=120
mypy dags/ --ignore-missing-imports
displayName: 'Lint and type check'
- script: |
pytest tests/unit/ -v --cov=dags/
displayName: 'Run unit tests'
- stage: Build
dependsOn: Test
jobs:
- job: Build
steps:
- task: Docker@2
inputs:
containerRegistry: 'ACR'
repository: 'airflow'
command: 'buildAndPush'
Dockerfile: '**/Dockerfile'
tags: '$(Build.BuildId)'
- stage: Deploy
dependsOn: Build
jobs:
- job: Deploy
steps:
- task: HelmInstaller@1
inputs:
helmVersion: '3.12.0'
- script: |
helm upgrade my-airflow apache-airflow/airflow \
--namespace airflow \
--set image.tag=$(Build.BuildId) \
--wait
displayName: 'Deploy to Kubernetes'
"""
Scenario 2: Meta's Automated Pipeline
# meta_automated.py
"""
Meta-style automated pipeline:
- Monorepo structure
- Automated testing
- Continuous deployment
"""
# Monorepo structure
MONOREPO_STRUCTURE = """
airflow-pipelines/
"""
<div className="my-6 flex justify-center">
<svg viewBox="0 0 800 380" width="100%" style={{ maxWidth: 800 }} xmlns="http://www.w3.org/2000/svg">
<defs>
<linearGradient id="af2-root" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#f59e0b" />
<stop offset="100%" stopColor="#d97706" />
</linearGradient>
<linearGradient id="af2-dir" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#3b82f6" />
<stop offset="100%" stopColor="#1d4ed8" />
</linearGradient>
<linearGradient id="af2-file" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#10b981" />
<stop offset="100%" stopColor="#059669" />
</linearGradient>
<linearGradient id="af2-sub" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor="#8b5cf6" />
<stop offset="100%" stopColor="#6d28d9" />
</linearGradient>
<filter id="af2-sh"><feDropShadow dx="0" dy="1" stdDeviation="2" floodOpacity="0.1" /></filter>
</defs>
<rect x="40" y="10" width="160" height="32" rx="14" fill="url(#af2-root)" filter="url(#af2-sh)" />
<text x="120" y="31" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="11" fontWeight="600">airflow-pipelines/</text>
<line x1="120" y1="42" x2="120" y2="56" stroke="#64748b" strokeWidth="1.5" />
<line x1="120" y1="56" x2="650" y2="56" stroke="#64748b" strokeWidth="1.5" />
<rect x="160" y="46" width="70" height="24" rx="10" fill="url(#af2-dir)" filter="url(#af2-sh)" />
<text x="195" y="63" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">dags/</text>
<line x1="195" y1="70" x2="195" y2="82" stroke="#64748b" strokeWidth="1.5" />
<line x1="195" y1="82" x2="400" y2="82" stroke="#64748b" strokeWidth="1.5" />
<rect x="230" y="72" width="80" height="22" rx="8" fill="url(#af2-sub)" filter="url(#af2-sh)" />
<text x="270" y="88" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="9" fontWeight="600">team_a/</text>
<line x1="270" y1="94" x2="270" y2="106" stroke="#64748b" strokeWidth="1" />
<line x1="270" y1="106" x2="370" y2="106" stroke="#64748b" strokeWidth="1" />
<rect x="300" y="100" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
<text x="335" y="113" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag1.py</text>
<rect x="300" y="122" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
<text x="335" y="135" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag2.py</text>
<rect x="230" y="146" width="80" height="22" rx="8" fill="url(#af2-sub)" filter="url(#af2-sh)" />
<text x="270" y="162" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="9" fontWeight="600">team_b/</text>
<line x1="270" y1="168" x2="270" y2="180" stroke="#64748b" strokeWidth="1" />
<line x1="270" y1="180" x2="370" y2="180" stroke="#64748b" strokeWidth="1" />
<rect x="300" y="174" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
<text x="335" y="187" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag3.py</text>
<rect x="300" y="196" width="70" height="18" rx="6" fill="url(#af2-file)" filter="url(#af2-sh)" />
<text x="335" y="209" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8">dag4.py</text>
<rect x="160" y="224" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
<text x="200" y="240" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">tests/</text>
<line x1="200" y1="246" x2="200" y2="258" stroke="#64748b" strokeWidth="1" />
<line x1="200" y1="258" x2="320" y2="258" stroke="#64748b" strokeWidth="1" />
<rect x="240" y="252" width="60" height="18" rx="6" fill="url(#af2-sub)" filter="url(#af2-sh)" />
<text x="270" y="265" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8" fontWeight="600">unit/</text>
<rect x="240" y="274" width="110" height="18" rx="6" fill="url(#af2-sub)" filter="url(#af2-sh)" />
<text x="295" y="287" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="8" fontWeight="600">integration/</text>
<rect x="160" y="302" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
<text x="200" y="318" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">scripts/</text>
<rect x="160" y="332" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
<text x="200" y="348" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">docker/</text>
<rect x="160" y="362" width="80" height="22" rx="8" fill="url(#af2-dir)" filter="url(#af2-sh)" />
<text x="200" y="378" textAnchor="middle" fill="#fff" fontFamily="Inter,system-ui" fontSize="10" fontWeight="600">.github/</text>
</svg>
MONOREPO_STRUCTURE = """
# Automated deployment
def auto_deploy():
"""Automated deployment pipeline"""
# 1. Validate DAGs
if not validate_all_dags('dags/'):
raise Exception("DAG validation failed")
# 2. Run tests
if not run_tests():
raise Exception("Tests failed")
# 3. Build image
build_docker_image()
# 4. Deploy to staging
deploy_to_staging()
# 5. Run integration tests
if not run_integration_tests():
raise Exception("Integration tests failed")
# 6. Deploy to production
deploy_to_production()
print("Deployment successful!")
QuizBox
Best Practices
# best_practices.py
"""
CI/CD Best Practices:
1. Version Control:
- Use Git for all DAGs
- Implement branching strategy
- Code review process
2. Testing:
- Unit tests for tasks
- Integration tests for pipelines
- DAG validation
3. Deployment:
- Automated deployments
- Blue-green or canary strategy
- Rollback procedures
4. Monitoring:
- Track deployment status
- Monitor DAG health
- Alert on failures
5. Documentation:
- Deployment procedures
- Rollback procedures
- Troubleshooting guides
"""
βΉοΈMicrosoft Interview Tip
At Microsoft, they use Azure DevOps for CI/CD with multi-stage pipelines. When discussing CI/CD, emphasize automated testing, safe deployment strategies, and monitoring. Also mention how they handle rollback and incident response.
Summary
CI/CD is critical for reliable Airflow deployments. Key takeaways:
- Git for version control
- Automated testing for quality
- Safe deployment strategies for reliability
- Monitoring for observability
- Documentation for maintainability
For Microsoft and Meta interviews, focus on:
- CI/CD pipeline design
- Testing strategies
- Deployment safety
- Monitoring and alerting
- Incident response
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.