CI/CD Pipeline Design: Blue-Green, Canary, Feature Flags
Difficulty: Senior Level | Companies: Netflix, Amazon, Google, Facebook, Microsoft
Interview Question
"Design a CI/CD pipeline for a microservices platform with 50+ services. How do you handle deployments, rollbacks, and feature flags?"
โน๏ธKey Concepts
This question tests your understanding of deployment strategies, release management, and safe deployment practices.
Complete CI/CD Architecture
Pipeline Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CI/CD PIPELINE ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโ SOURCE CONTROL โโโโโโโโโโโโโโโโโโโ โ
โ โ GitHub/GitLab โ Branch Strategy โ Code Review โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ BUILD STAGE โโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Compile โ Unit Tests โ Lint โ Security Scan โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ TEST STAGE โโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Integration โ E2E โ Performance โ Contract โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ DEPLOYMENT STAGE โโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Deployment Strategies โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โBlue-Greenโ โ Canary โ โ Rolling โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โFeature โ โ A/B โ โ Shadow โ โ โ โ
โ โ โ โ Flags โ โ Testing โ โ Testing โ โ โ โ
โ โ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ โ โ
โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโ PRODUCTION โโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Monitoring โ Rollback โ Alerting โ Analytics โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Mathematical Foundation: Deployment Risk
Deployment Risk Score:
- Change size: S = lines_changed / avg_changes_per_deploy
- Test coverage: T = test_coverage_percentage
- Time since last deploy: D = days_since_last_deploy
- Risk score: R = S ร (1 - T) ร (1 + 1/D)
Canary Success Metrics:
- Error rate threshold: E_threshold = 0.1%
- Latency threshold: L_threshold = 1.2 ร baseline_latency
- Success criteria: E_actual < E_threshold AND L_actual < L_threshold
Rollback Cost:
- Rollback time: T_rollback = deployment_time ร 0.3
- Data migration cost: C_migration = affected_rows ร migration_cost_per_row
- Total rollback cost: C_total = T_rollback + C_migration
GitHub Actions Pipeline
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
AWS_REGION: us-east-1
ECR_REPOSITORY: my-app
KUBERNETES_NAMESPACE: production
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run unit tests
run: |
pytest tests/unit/ --cov=src --cov-report=xml
- name: Run integration tests
run: |
pytest tests/integration/ --cov=src --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
security:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@master
with:
scan-type: 'fs'
scan-ref: '.'
format: 'sarif'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'trivy-results.sarif'
build:
needs: [test, security]
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Build, tag, and push image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
IMAGE_TAG: ${{ github.sha }}
run: |
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:latest .
docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
docker push $ECR_REGISTRY/$ECR_REPOSITORY:latest
deploy-staging:
needs: build
runs-on: ubuntu-latest
environment: staging
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Deploy to staging
run: |
kubectl set image deployment/my-app \
my-app=${{ secrets.ECR_REGISTRY }}:${{ github.sha }} \
--namespace=staging
- name: Run smoke tests
run: |
python tests/smoke/run_smoke_tests.py --env staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Deploy to production (canary)
run: |
# Deploy canary with 10% traffic
kubectl apply -f k8s/canary-deployment.yml
- name: Monitor canary
run: |
python scripts/monitor_canary.py --duration 300
- name: Promote to full deployment
if: success()
run: |
kubectl set image deployment/my-app \
my-app=${{ secrets.ECR_REGISTRY }}:${{ github.sha }} \
--namespace=production
Blue-Green Deployment
# Blue-green deployment manager
import boto3
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class DeploymentState(Enum):
IDLE = "IDLE"
BLUE_ACTIVE = "BLUE_ACTIVE"
GREEN_ACTIVE = "GREEN_ACTIVE"
SWITCHING = "SWITCHING"
ROLLING_BACK = "ROLLING_BACK"
@dataclass
class DeploymentConfig:
service_name: str
blue_target_group: str
green_target_group: str
listener_arn: str
health_check_path: str = "/health"
class BlueGreenDeployer:
"""Blue-green deployment manager"""
def __init__(self, config: DeploymentConfig):
self.config = config
self.elbv2 = boto3.client('elbv2')
self.ecs = boto3.client('ecs')
self.state = DeploymentState.IDLE
def deploy(self, new_image: str) -> Dict[str, Any]:
"""Execute blue-green deployment"""
# Determine current active target group
current_tg = self._get_active_target_group()
new_tg = self.config.green_target_group if current_tg == self.config.blue_target_group \
else self.config.blue_target_group
# Deploy new version to inactive target group
self._deploy_to_target_group(new_tg, new_image)
# Wait for new deployment to be healthy
if not self._wait_for_healthy(new_tg):
return {'success': False, 'error': 'New deployment unhealthy'}
# Switch traffic
self._switch_traffic(new_tg)
# Wait for stabilization
time.sleep(60)
# Verify health
if self._verify_health():
return {'success': True, 'active_target_group': new_tg}
else:
# Rollback
self._switch_traffic(current_tg)
return {'success': False, 'error': 'Health check failed, rolled back'}
def rollback(self) -> Dict[str, Any]:
"""Rollback to previous version"""
current_tg = self._get_active_target_group()
previous_tg = self.config.green_target_group if current_tg == self.config.blue_target_group \
else self.config.blue_target_group
self._switch_traffic(previous_tg)
return {'success': True, 'rolled_back_to': previous_tg}
def _get_active_target_group(self) -> str:
"""Get currently active target group"""
response = self.elbv2.describe_rules(
ListenerArns=[self.config.listener_arn]
)
for rule in response['Rules']:
for action in rule['Actions']:
if action['Type'] == 'forward':
return action['TargetGroupArn'].split('/')[-1]
return self.config.blue_target_group
def _deploy_to_target_group(self, target_group: str, image: str):
"""Deploy new version to target group"""
# Update ECS service with new task definition
task_definition = self._register_task_definition(image)
service_name = f"{self.config.service_name}-{target_group}"
self.ecs.update_service(
cluster='production',
service=service_name,
taskDefinition=task_definition,
forceNewDeployment=True
)
def _register_task_definition(self, image: str) -> str:
"""Register new task definition"""
response = self.ecs.register_task_definition(
family=self.config.service_name,
requiresCompatibilities=['FARGATE'],
networkMode='awsvpc',
cpu='512',
memory='1024',
containerDefinitions=[
{
'name': self.config.service_name,
'image': image,
'portMappings': [
{
'containerPort': 8080,
'protocol': 'tcp'
}
],
'healthCheck': {
'command': [
'CMD-SHELL',
f'curl -f http://localhost:8080{self.config.health_check_path} || exit 1'
],
'interval': 30,
'timeout': 5,
'retries': 3
}
}
]
)
return response['taskDefinition']['taskDefinitionArn']
def _wait_for_healthy(self, target_group: str, timeout: int = 300) -> bool:
"""Wait for target group to become healthy"""
start_time = time.time()
while time.time() - start_time < timeout:
response = self.elbv2.describe_target_health(
TargetGroupArn=f"arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/{target_group}"
)
healthy_count = sum(
1 for target in response['TargetHealthDescriptions']
if target['TargetHealth']['State'] == 'healthy'
)
total_count = len(response['TargetHealthDescriptions'])
if healthy_count == total_count and total_count > 0:
return True
time.sleep(10)
return False
def _switch_traffic(self, target_group: str):
"""Switch traffic to target group"""
self.elbv2.modify_listener(
ListenerArn=self.config.listener_arn,
DefaultActions=[
{
'Type': 'forward',
'TargetGroupArn': f"arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/{target_group}"
}
]
)
def _verify_health(self) -> bool:
"""Verify deployment health"""
# Check CloudWatch metrics
cloudwatch = boto3.client('cloudwatch')
response = cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='HTTPCode_Target_5XX_Count',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': 'app/my-lb/1234567890'
}
],
StartTime=time.time() - 300,
EndTime=time.time(),
Period=60,
Statistics=['Sum']
)
# Check if 5xx errors are below threshold
total_5xx = sum(point['Sum'] for point in response['Datapoints'])
return total_5xx < 10 # Less than 10 5xx errors
โ ๏ธBlue-Green Benefits
Blue-green deployments provide zero-downtime deployments and instant rollback capability. The tradeoff is doubled infrastructure cost during deployment.
Canary Deployment
# Canary deployment with traffic shifting
import boto3
import time
from typing import Dict, Any, List
from dataclasses import dataclass
@dataclass
class CanaryConfig:
service_name: str
initial_traffic_percent: int = 10
traffic_increment: int = 10
interval_seconds: int = 300
max_error_rate: float = 0.1
max_latency_ms: float = 200
class CanaryDeployer:
"""Canary deployment with automated promotion"""
def __init__(self, config: CanaryConfig):
self.config = config
self.appmesh = boto3.client('appmesh')
self.cloudwatch = boto3.client('cloudwatch')
def deploy(self, new_version: str) -> Dict[str, Any]:
"""Execute canary deployment"""
# Create canary route
self._create_canary_route(new_version)
# Gradually increase traffic
current_traffic = self.config.initial_traffic_percent
while current_traffic < 100:
# Update traffic split
self._update_traffic_split(current_traffic)
# Wait and monitor
time.sleep(self.config.interval_seconds)
# Check health
if not self._check_canary_health():
return {
'success': False,
'error': 'Canary health check failed',
'traffic_at_failure': current_traffic
}
# Increase traffic
current_traffic += self.config.traffic_increment
# Finalize deployment
self._finalize_deployment(new_version)
return {'success': True, 'final_version': new_version}
def _create_canary_route(self, new_version: str):
"""Create canary route in App Mesh"""
self.appmesh.update_route(
meshName='production',
virtualRouterName=self.config.service_name,
routeName='primary',
spec={
'httpRoute': {
'action': {
'weightedTargets': [
{
'virtualNode': f'{self.config.service_name}-v1',
'weight': 100 - self.config.initial_traffic_percent
},
{
'virtualNode': f'{self.config.service_name}-{new_version}',
'weight': self.config.initial_traffic_percent
}
]
},
'match': {
'prefix': '/'
}
}
}
)
def _update_traffic_split(self, canary_percent: int):
"""Update traffic split between stable and canary"""
self.appmesh.update_route(
meshName='production',
virtualRouterName=self.config.service_name,
routeName='primary',
spec={
'httpRoute': {
'action': {
'weightedTargets': [
{
'virtualNode': f'{self.config.service_name}-stable',
'weight': 100 - canary_percent
},
{
'virtualNode': f'{self.config.service_name}-canary',
'weight': canary_percent
}
]
}
}
}
)
def _check_canary_health(self) -> bool:
"""Check canary deployment health"""
# Check error rate
error_rate = self._get_error_rate()
if error_rate > self.config.max_error_rate:
return False
# Check latency
latency = self._get_latency()
if latency > self.config.max_latency_ms:
return False
return True
def _get_error_rate(self) -> float:
"""Get canary error rate"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/AppMesh',
MetricName='5xxErrorRate',
Dimensions=[
{
'Name': 'MeshName',
'Value': 'production'
},
{
'Name': 'VirtualRouterName',
'Value': self.config.service_name
}
],
StartTime=time.time() - 300,
EndTime=time.time(),
Period=60,
Statistics=['Average']
)
if response['Datapoints']:
return response['Datapoints'][-1]['Average']
return 0.0
def _get_latency(self) -> float:
"""Get canary latency"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/AppMesh',
MetricName='Latency',
Dimensions=[
{
'Name': 'MeshName',
'Value': 'production'
},
{
'Name': 'VirtualRouterName',
'Value': self.config.service_name
}
],
StartTime=time.time() - 300,
EndTime=time.time(),
Period=60,
Statistics=['p99']
)
if response['Datapoints']:
return response['Datapoints'][-1]['p99']
return 0.0
def _finalize_deployment(self, new_version: str):
"""Finalize canary deployment"""
# Route all traffic to new version
self.appmesh.update_route(
meshName='production',
virtualRouterName=self.config.service_name,
routeName='primary',
spec={
'httpRoute': {
'action': {
'weightedTargets': [
{
'virtualNode': f'{self.config.service_name}-{new_version}',
'weight': 100
}
]
}
}
}
)
Feature Flags
# Feature flag management
import json
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import hashlib
from datetime import datetime
class FlagState(Enum):
DISABLED = "DISABLED"
ENABLED = "ENABLED"
PERCENTAGE = "PERCENTAGE"
USER_SEGMENT = "USER_SEGMENT"
@dataclass
class FeatureFlag:
name: str
state: FlagState
description: str
percentage: int = 0
user_segments: list = None
created_at: datetime = None
expires_at: datetime = None
class FeatureFlagManager:
"""Feature flag management system"""
def __init__(self):
self.flags: Dict[str, FeatureFlag] = {}
self.overrides: Dict[str, bool] = {}
def create_flag(self, name: str, description: str,
initial_state: FlagState = FlagState.DISABLED) -> FeatureFlag:
"""Create new feature flag"""
flag = FeatureFlag(
name=name,
state=initial_state,
description=description,
created_at=datetime.utcnow()
)
self.flags[name] = flag
return flag
def is_enabled(self, flag_name: str, user_id: str = None,
context: Dict[str, Any] = None) -> bool:
"""Check if feature flag is enabled"""
# Check overrides first
if flag_name in self.overrides:
return self.overrides[flag_name]
flag = self.flags.get(flag_name)
if not flag:
return False
# Check expiration
if flag.expires_at and datetime.utcnow() > flag.expires_at:
return False
if flag.state == FlagState.DISABLED:
return False
elif flag.state == FlagState.ENABLED:
return True
elif flag.state == FlagState.PERCENTAGE:
return self._is_in_percentage(flag_name, user_id, flag.percentage)
elif flag.state == FlagState.USER_SEGMENT:
return self._is_in_user_segment(user_id, flag.user_segments)
return False
def _is_in_percentage(self, flag_name: str, user_id: str,
percentage: int) -> bool:
"""Check if user is in percentage rollout"""
if not user_id:
return False
# Consistent hashing for deterministic rollout
hash_value = int(hashlib.md5(
f"{flag_name}:{user_id}".encode()
).hexdigest(), 16)
return (hash_value % 100) < percentage
def _is_in_user_segment(self, user_id: str, segments: list) -> bool:
"""Check if user is in specified segment"""
if not segments or not user_id:
return False
# Simplified segment check
for segment in segments:
if segment == 'beta_users':
return user_id.startswith('beta_')
elif segment == 'internal_users':
return user_id.endswith('@company.com')
elif segment == 'premium_users':
return user_id.startswith('premium_')
return False
def set_percentage(self, flag_name: str, percentage: int):
"""Set flag to percentage rollout"""
if flag_name in self.flags:
self.flags[flag_name].state = FlagState.PERCENTAGE
self.flags[flag_name].percentage = percentage
def set_user_segment(self, flag_name: str, segments: list):
"""Set flag to user segment"""
if flag_name in self.flags:
self.flags[flag_name].state = FlagState.USER_SEGMENT
self.flags[flag_name].user_segments = segments
def override(self, flag_name: str, enabled: bool):
"""Override flag value"""
self.overrides[flag_name] = enabled
def get_flag_status(self) -> Dict[str, Any]:
"""Get status of all flags"""
return {
name: {
'state': flag.state.value,
'percentage': flag.percentage,
'user_segments': flag.user_segments
}
for name, flag in self.flags.items()
}
# Feature flag decorator
def feature_flag(flag_name: str, flag_manager: FeatureFlagManager):
"""Decorator for feature flagging"""
def decorator(func: Callable):
def wrapper(*args, **kwargs):
user_id = kwargs.get('user_id') or (args[0] if args else None)
if flag_manager.is_enabled(flag_name, user_id):
return func(*args, **kwargs)
else:
# Return default behavior
return func(*args, **kwargs, feature_disabled=True)
return wrapper
return decorator
# Example usage
flag_manager = FeatureFlagManager()
flag_manager.create_flag('new_checkout_flow', 'New checkout flow')
flag_manager.set_percentage('new_checkout_flow', 25)
@feature_flag('new_checkout_flow', flag_manager)
def checkout(user_id: str, cart: dict):
# New checkout flow
return {'status': 'new_flow'}
โ Feature Flags Benefits
Feature flags enable safe deployments, A/B testing, and gradual rollouts. Use them to decouple deployment from release and reduce deployment risk.
Summary
| Strategy | Downtime | Rollback Speed | Resource Cost | Risk Level |
|---|---|---|---|---|
| Rolling Update | Zero | Minutes | Low | Medium |
| Blue-Green | Zero | Seconds | High | Low |
| Canary | Zero | Seconds | Medium | Low |
| Feature Flags | Zero | Instant | Low | Very Low |