Managed Airflow: MWAA and Cloud Composer
Architecture Diagram
Formal Definitions
DfManaged Airflow
A managed Airflow service is a cloud-provider-hosted deployment of Apache Airflow that abstracts infrastructure management (scheduling, web server, workers, database) while allowing users to focus on DAG development. Providers handle upgrades, patching, scaling, and high availability.
DfMWAA
Amazon Managed Workflows for Apache Airflow (MWAA) is AWS's managed Airflow service. It provisions scheduler, web server, workers, and metadata database (RDS PostgreSQL) with IAM-based security, CloudWatch monitoring, and S3-based DAG storage.
DfCloud Composer
Cloud Composer is Google Cloud's managed Airflow service. It provides a fully-managed Airflow environment with GCS-based DAG storage, Cloud SQL metadata database, and integration with GCP monitoring and IAM services.
Detailed Explanation
MWAA Setup
# mwaa_environment.tf (Terraform)
resource "aws_mwaa_environment" "airflow" {
name = "production-airflow"
airflow_version = "2.8.1"
environment_class = "mw1.medium"
source_bucket_arn = aws_s3_bucket.dags.arn
dag_s3_path = "dags/"
execution_role_arn = aws_iam_role.mwaa_role.arn
network_configuration {
security_group_ids = [aws_security_group.mwaa.id]
subnet_ids = var.private_subnet_ids
}
logging_configuration {
dag_processing_logs {
enabled = true
log_level = "INFO"
}
scheduler_logs {
enabled = true
log_level = "INFO"
}
task_logs {
enabled = true
log_level = "INFO"
}
webserver_logs {
enabled = true
log_level = "WARNING"
}
worker_logs {
enabled = true
log_level = "INFO"
}
}
webserver_access_mode = "PUBLIC_ONLY"
max_workers = 10
min_workers = 2
environment_variables = {
AIRFLOW__CORE__LOAD_EXAMPLES = "False"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG = "True"
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN = "postgresql+psycopg2://..."
}
webserver_url = aws_mwaa_environment.airflow.webserver_url
arn = aws_mwaa_environment.airflow.arn
}
Cloud Composer Setup
# composer_environment.tf (Terraform)
resource "google_composer_environment" "airflow" {
name = "production-airflow"
region = "us-central1"
config {
node_count = 4
software_config {
image_version = "composer-2.6.1-airflow-2.8.1"
pypi_packages = {
"apache-airflow-providers-google" = ">=10.0.0"
"apache-airflow-providers-amazon" = ">=8.0.0"
"pandas" = ">=2.0.0"
}
env_variables = {
AIRFLOW__CORE__LOAD_EXAMPLES = "False"
}
}
workloads_config {
scheduler {
cpu = 2
memory_gb = 4
storage_gb = 10
count = 2
}
worker {
cpu = 2
memory_gb = 8
storage_gb = 20
min_count = 2
max_count = 10
}
triggerer {
cpu = 0.5
memory_gb = 1
count = 2
}
}
database_config {
machine_type = "db-n1-standard-2"
}
web_server_config {
machine_type = "composer-web-server-medium"
}
environment_size = "ENVIRONMENT_SIZE_MEDIUM"
private_environment_config {
enable_private_endpoint = true
master_ipv4_cidr_block = "172.16.0.0/28"
}
}
}
MWAA DAG with S3 Storage
# dags/mwaa_example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
def extract_to_s3(**context):
"""Extract data and upload to S3 DAG bucket."""
import pandas as pd
import io
data = pd.DataFrame({
'date': [context['ds']],
'record_count': [1000],
'status': ['completed'],
})
csv_buffer = io.StringIO()
data.to_csv(csv_buffer, index=False)
hook = S3Hook(aws_conn_id='aws_default')
hook.load_string(
string_data=csv_buffer.getvalue(),
key=f'extracted/{context["ds"]}/data.csv',
bucket_name='data-lake',
replace=True,
)
with DAG(
dag_id='mwaa_managed_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
tags=['mwaa', 'managed'],
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_to_s3,
)
load = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='public',
table='daily_metrics',
s3_bucket='data-lake',
s3_key='extracted/{{ ds }}/data.csv',
copy_options=['FORMAT AS CSV', 'IGNOREHEADER 1'],
aws_conn_id='aws_default',
redshift_conn_id='redshift_default',
)
extract >> load
Service Comparison
| Feature | MWAA | Cloud Composer | Azure Managed AA |
|---|---|---|---|
| Airflow Version | 2.x | 2.x | 2.x |
| DAG Storage | S3 | GCS | Blob Storage |
| Metadata DB | RDS PostgreSQL | Cloud SQL | Azure Database |
| Scaling | Manual (workers) | Auto-scaling | Manual |
| Monitoring | CloudWatch | Cloud Monitoring | Azure Monitor |
| Security | IAM, VPC | IAM, VPC | Azure AD, VNet |
| Version Upgrade | Manual | Auto or manual | Manual |
| Min Workers | 2 | 2 | 2 |
| Max Workers | Configurable | Auto-scaled | Configurable |
| Price Model | Per environment + workers | Per environment + workers | Per environment + workers |
Scaling Thresholds
| Metric | Small | Medium | Large |
|---|---|---|---|
| DAG Count | <50 | 50-200 | 200+ |
| Task Count/Day | <1000 | 1000-10000 | 10000+ |
| Workers | 2-4 | 4-10 | 10-20 |
| Scheduler | 1 | 1-2 | 2+ |
| DB Instance | db.t3.medium | db.r5.large | db.r5.xlarge |
Best Practices
- Use managed secrets: Leverage AWS Secrets Manager, GCP Secret Manager, or Azure Key Vault instead of environment variables for sensitive data.
- Enable VPC peering: Run MWAA/Composer in private subnets with VPC peering for secure database and storage access.
- Monitor worker utilization: Scale workers based on task queue depth and execution time, not just DAG count.
- Use Airflow variables for environment-specific configuration — avoid hardcoding in DAG files.
- Implement CI/CD: Use S3 sync, GCS sync, or Git-based deployment for DAG updates.
- Enable logging: Configure all log types (scheduler, task, webserver) for debugging.
- Test upgrades: Use staging environments to test Airflow version upgrades before production.
- Optimize parse time: Use dynamic DAG generation and minimize imports at module level.
MWAA and Cloud Composer charge per environment hour plus worker hours. For cost optimization, use min_workers=0 during off-peak hours and enable auto-scaling based on task queue depth. Monitor with provider-specific billing dashboards.
Managed Airflow services handle infrastructure management but not DAG logic. You are still responsible for writing idempotent, well-tested DAGs. Use the same best practices as self-managed Airflow for DAG development and testing.
Key Takeaways:
- MWAA (AWS), Cloud Composer (GCP), and Azure Managed AA provide fully-managed Airflow
- DAG storage uses cloud object storage (S3, GCS, Blob) while metadata DB uses managed relational DBs
- Scaling is controlled through worker count and environment class
- Use managed secret backends and VPC configurations for security
- Monitor with provider-native tools (CloudWatch, Cloud Monitoring, Azure Monitor)
- CI/CD for DAG deployment uses cloud-native storage sync or Git integration
See Also
- Airflow Architecture — Core architecture and component overview
- Executors Comparison — Sequential, Local, Celery, and Kubernetes executors
- Databricks Provider — Databricks cluster and job management
- BigQuery Provider — Google BigQuery integration patterns