CW

Managed Airflow: MWAA and Cloud Composer

Free Lesson

Advertisement

Managed Airflow: MWAA and Cloud Composer

Architecture Diagram

Formal Definitions

DfManaged Airflow

A managed Airflow service is a cloud-provider-hosted deployment of Apache Airflow that abstracts infrastructure management (scheduling, web server, workers, database) while allowing users to focus on DAG development. Providers handle upgrades, patching, scaling, and high availability.

DfMWAA

Amazon Managed Workflows for Apache Airflow (MWAA) is AWS's managed Airflow service. It provisions scheduler, web server, workers, and metadata database (RDS PostgreSQL) with IAM-based security, CloudWatch monitoring, and S3-based DAG storage.

DfCloud Composer

Cloud Composer is Google Cloud's managed Airflow service. It provides a fully-managed Airflow environment with GCS-based DAG storage, Cloud SQL metadata database, and integration with GCP monitoring and IAM services.

Detailed Explanation

MWAA Setup

# mwaa_environment.tf (Terraform)
resource "aws_mwaa_environment" "airflow" {
  name               = "production-airflow"
  airflow_version    = "2.8.1"
  environment_class  = "mw1.medium"

  source_bucket_arn  = aws_s3_bucket.dags.arn
  dag_s3_path        = "dags/"

  execution_role_arn = aws_iam_role.mwaa_role.arn

  network_configuration {
    security_group_ids = [aws_security_group.mwaa.id]
    subnet_ids         = var.private_subnet_ids
  }

  logging_configuration {
    dag_processing_logs {
      enabled   = true
      log_level = "INFO"
    }
    scheduler_logs {
      enabled   = true
      log_level = "INFO"
    }
    task_logs {
      enabled   = true
      log_level = "INFO"
    }
    webserver_logs {
      enabled   = true
      log_level = "WARNING"
    }
    worker_logs {
      enabled   = true
      log_level = "INFO"
    }
  }

  webserver_access_mode = "PUBLIC_ONLY"

  max_workers = 10
  min_workers = 2

  environment_variables = {
    AIRFLOW__CORE__LOAD_EXAMPLES = "False"
    AIRFLOW__WEBSERVER__EXPOSE_CONFIG = "True"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN = "postgresql+psycopg2://..."
  }

  webserver_url = aws_mwaa_environment.airflow.webserver_url
  arn           = aws_mwaa_environment.airflow.arn
}

Cloud Composer Setup

# composer_environment.tf (Terraform)
resource "google_composer_environment" "airflow" {
  name   = "production-airflow"
  region = "us-central1"

  config {
    node_count = 4

    software_config {
      image_version = "composer-2.6.1-airflow-2.8.1"
      pypi_packages = {
        "apache-airflow-providers-google" = ">=10.0.0"
        "apache-airflow-providers-amazon" = ">=8.0.0"
        "pandas" = ">=2.0.0"
      }
      env_variables = {
        AIRFLOW__CORE__LOAD_EXAMPLES = "False"
      }
    }

    workloads_config {
      scheduler {
        cpu        = 2
        memory_gb  = 4
        storage_gb = 10
        count      = 2
      }
      worker {
        cpu        = 2
        memory_gb  = 8
        storage_gb = 20
        min_count  = 2
        max_count  = 10
      }
      triggerer {
        cpu        = 0.5
        memory_gb  = 1
        count      = 2
      }
    }

    database_config {
      machine_type = "db-n1-standard-2"
    }

    web_server_config {
      machine_type = "composer-web-server-medium"
    }

    environment_size = "ENVIRONMENT_SIZE_MEDIUM"

    private_environment_config {
      enable_private_endpoint = true
      master_ipv4_cidr_block  = "172.16.0.0/28"
    }
  }
}

MWAA DAG with S3 Storage

# dags/mwaa_example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator


def extract_to_s3(**context):
    """Extract data and upload to S3 DAG bucket."""
    import pandas as pd
    import io

    data = pd.DataFrame({
        'date': [context['ds']],
        'record_count': [1000],
        'status': ['completed'],
    })

    csv_buffer = io.StringIO()
    data.to_csv(csv_buffer, index=False)

    hook = S3Hook(aws_conn_id='aws_default')
    hook.load_string(
        string_data=csv_buffer.getvalue(),
        key=f'extracted/{context["ds"]}/data.csv',
        bucket_name='data-lake',
        replace=True,
    )


with DAG(
    dag_id='mwaa_managed_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    },
    tags=['mwaa', 'managed'],
) as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_to_s3,
    )

    load = S3ToRedshiftOperator(
        task_id='load_to_redshift',
        schema='public',
        table='daily_metrics',
        s3_bucket='data-lake',
        s3_key='extracted/{{ ds }}/data.csv',
        copy_options=['FORMAT AS CSV', 'IGNOREHEADER 1'],
        aws_conn_id='aws_default',
        redshift_conn_id='redshift_default',
    )

    extract >> load

Service Comparison

FeatureMWAACloud ComposerAzure Managed AA
Airflow Version2.x2.x2.x
DAG StorageS3GCSBlob Storage
Metadata DBRDS PostgreSQLCloud SQLAzure Database
ScalingManual (workers)Auto-scalingManual
MonitoringCloudWatchCloud MonitoringAzure Monitor
SecurityIAM, VPCIAM, VPCAzure AD, VNet
Version UpgradeManualAuto or manualManual
Min Workers222
Max WorkersConfigurableAuto-scaledConfigurable
Price ModelPer environment + workersPer environment + workersPer environment + workers

Scaling Thresholds

MetricSmallMediumLarge
DAG Count<5050-200200+
Task Count/Day<10001000-1000010000+
Workers2-44-1010-20
Scheduler11-22+
DB Instancedb.t3.mediumdb.r5.largedb.r5.xlarge

Best Practices

  1. Use managed secrets: Leverage AWS Secrets Manager, GCP Secret Manager, or Azure Key Vault instead of environment variables for sensitive data.
  2. Enable VPC peering: Run MWAA/Composer in private subnets with VPC peering for secure database and storage access.
  3. Monitor worker utilization: Scale workers based on task queue depth and execution time, not just DAG count.
  4. Use Airflow variables for environment-specific configuration — avoid hardcoding in DAG files.
  5. Implement CI/CD: Use S3 sync, GCS sync, or Git-based deployment for DAG updates.
  6. Enable logging: Configure all log types (scheduler, task, webserver) for debugging.
  7. Test upgrades: Use staging environments to test Airflow version upgrades before production.
  8. Optimize parse time: Use dynamic DAG generation and minimize imports at module level.

MWAA and Cloud Composer charge per environment hour plus worker hours. For cost optimization, use min_workers=0 during off-peak hours and enable auto-scaling based on task queue depth. Monitor with provider-specific billing dashboards.

Managed Airflow services handle infrastructure management but not DAG logic. You are still responsible for writing idempotent, well-tested DAGs. Use the same best practices as self-managed Airflow for DAG development and testing.

Key Takeaways:

  • MWAA (AWS), Cloud Composer (GCP), and Azure Managed AA provide fully-managed Airflow
  • DAG storage uses cloud object storage (S3, GCS, Blob) while metadata DB uses managed relational DBs
  • Scaling is controlled through worker count and environment class
  • Use managed secret backends and VPC configurations for security
  • Monitor with provider-native tools (CloudWatch, Cloud Monitoring, Azure Monitor)
  • CI/CD for DAG deployment uses cloud-native storage sync or Git integration

See Also

Advertisement

Need Expert Airflow Help?

Get personalized DAG design, scheduling optimization, or production Airflow consulting.

Advertisement