πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Cloud Composer - Managed Airflow for Orchestration

🟒 Free Lesson

Advertisement

Cloud Composer - Managed Airflow for Orchestration

Cloud Composer ArchitectureAirflowWeb Server + SchedulerCloud SQLMetadata DatabaseGCS BucketDAGs + ConfigWorkersTask ExecutionMonitoringStackdriver + LogsEnvironment ConfigVersion ControlPackage ManagementDependenciesConnection ManagementGCP ServicesSecurityIAM + Encryption

Cloud Composer Architecture

Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. It enables you to author, schedule, and monitor complex data pipelines.

Core Components

Airflow Web Server:

  • Provides web UI for monitoring and managing DAGs
  • Shows task status, logs, and history
  • Enables manual task triggering

Scheduler:

  • Triggers scheduled workflows
  • Monitors task instances
  • Manages task dependencies

Cloud SQL:

  • Stores Airflow metadata
  • Tracks DAG runs, task instances, and logs
  • Manages connections and variables

GCS Bucket:

  • Stores DAG files
  • Stores plugins and configuration
  • Manages logs and temporary files

Workers:

  • Execute Airflow tasks
  • Handle long-running operations
  • Support various operator types

Environment Setup

Creating an Environment

# Create a Cloud Composer environment
gcloud composer environments create my-environment \
    --location=us-central1 \
    --airflow-version=2.6.3 \
    --python-version=3.11 \
    --machine-type=e2-medium \
    --node-count=3 \
    --disk-size=30GB

# Create environment with custom configuration
gcloud composer environments create my-production-env \
    --location=us-central1 \
    --airflow-version=2.6.3 \
    --python-version=3.11 \
    --machine-type=e2-standard-4 \
    --node-count=5 \
    --disk-size=100GB \
    --web-server-machines=e2-medium \
    --scheduler-machines=e2-medium \
    --environment-size=ENVIRONMENT_SIZE_MEDIUM

Environment Configuration

# Configure environment via API
from google.cloud import composer_v1

client = composer_v1.EnvironmentsClient()

environment = {
    'name': 'my-environment',
    'config': {
        'node_config': {
            'machine_type': 'e2-standard-4',
            'disk_size_gb': 100,
            'network': 'projects/my-project/global/networks/default',
            'subnetwork': 'projects/my-project/regions/us-central1/subnetworks/default',
            'service_account': 'my-composer-sa@my-project.iam.gserviceaccount.com'
        },
        'software_config': {
            'airflow_config_overrides': {
                'core': {
                    'parallelism': 32,
                    'max_active_runs_per_dag': 16,
                    'dag_file_processor_timeout': 300
                }
            },
            'pypi_packages': {
                'apache-airflow-providers-google': '10.0.0',
                'pandas': '2.0.0'
            },
            'env_variables': {
                'PROJECT_ID': 'my-project',
                'REGION': 'us-central1'
            }
        }
    }
}

operation = client.create_environment(
    request={'parent': 'projects/my-project/locations/us-central1', 'environment': environment}
)

DAG Management

Creating DAGs

# Basic DAG structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['alerts@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
}

dag = DAG(
    'daily_data_pipeline',
    default_args=default_args,
    description='Daily data processing pipeline',
    schedule_interval='@daily',
    catchup=False,
    tags=['data-engineering', 'production'],
)

# Define tasks
def extract_data(**context):
    """Extract data from source."""
    # Your extraction logic
    pass

def transform_data(**context):
    """Transform data."""
    # Your transformation logic
    pass

# Task definitions
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

load_task = GCSToBigQueryOperator(
    task_id='load_to_bigquery',
    bucket='my-bucket',
    source_objects=['data/{{ ds }}/data.csv'],
    destination_project_dataset_table='my-project:analytics.raw_data',
    schema_fields=[
        {'name': 'id', 'type': 'STRING'},
        {'name': 'value', 'type': 'FLOAT64'},
    ],
    write_disposition='WRITE_APPEND',
    dag=dag,
)

transform_task = BigQueryInsertJobOperator(
    task_id='transform_data',
    configuration={
        'query': {
            'query': '''
                CREATE OR REPLACE TABLE analytics.processed_data AS
                SELECT * FROM analytics.raw_data
                WHERE value > 0
            ''',
            'useLegacySql': False,
        }
    },
    dag=dag,
)

# Set task dependencies
extract_task >> load_task >> transform_task

DAG Configuration Options

# Advanced DAG configuration
dag = DAG(
    'advanced_pipeline',
    default_args=default_args,
    description='Advanced pipeline with complex scheduling',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 12, 31),
    catchup=True,
    max_active_runs=1,
    concurrency=16,
    tags=['production', 'critical'],
    doc_md="""
    ## Daily Data Pipeline
    
    This pipeline processes daily data from multiple sources.
    
    ### Schedule
    - Runs daily at 2 AM UTC
    - Backfills historical data
    
    ### Dependencies
    - Source system availability
    - GCS bucket permissions
    - BigQuery project access
    """,
)

Connections

Managing Connections

# Create connection via Airflow CLI
# airflow connections add 'google_cloud_default' \
#     --conn-type 'google_cloud_platform' \
#     --conn-extra '{"keyfile_path": "/path/to/key.json"}'

# Use connection in DAG
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

def query_data():
    """Query BigQuery using connection."""
    hook = BigQueryHook(gcp_conn_id='google_cloud_default')
    conn = hook.get_conn()
    cursor = conn.cursor()
    
    cursor.execute("SELECT * FROM analytics.sales LIMIT 10")
    results = cursor.fetchall()
    
    return results

Connection Types

# Common connection types for data engineering
connection_types = {
    'google_cloud_platform': 'GCP services (BigQuery, GCS, Dataflow)',
    'postgres': 'PostgreSQL databases',
    'mysql': 'MySQL databases',
    'http': 'HTTP endpoints',
    's3': 'AWS S3 storage',
    'gcs': 'Google Cloud Storage',
    'bigquery': 'BigQuery specific connection',
    'dataflow': 'Dataflow specific connection',
}

# Create connections programmatically
from airflow.models import Connection
from airflow import settings

def create_gcp_connection():
    """Create GCP connection."""
    conn = Connection(
        conn_id='google_cloud_default',
        conn_type='google_cloud_platform',
        extra='{"keyfile_path": "/path/to/key.json"}'
    )
    
    session = settings.Session()
    session.add(conn)
    session.commit()

Variables

Using Variables

# Set variable via Airflow CLI
# airflow variables set 'my_variable' 'my_value'

# Use variable in DAG
from airflow.models import Variable

def use_variable():
    """Use variable in task."""
    config_value = Variable.get('config_value', default_var='default')
    
    # Use in task logic
    print(f"Config value: {config_value}")

# Variable with JSON
config = Variable.get('pipeline_config', deserialize_json=True)
# Returns: {"key1": "value1", "key2": "value2"}

Variable Best Practices

# Best practices for variables
best_practices = {
    'naming': 'Use descriptive, hierarchical names (e.g., prod_analytics_db_host)',
    'security': 'Use Airflow Connections for sensitive data, not Variables',
    'versioning': 'Include version in variable names for schema changes',
    'documentation': 'Document variable purpose and expected values',
    'default_values': 'Always provide default values for non-critical variables',
}

# Example variable structure
variables = {
    'prod_analytics_bigquery_project': 'my-project',
    'prod_analytics_gcs_bucket': 'my-bucket',
    'prod_analytics_dataflow_region': 'us-central1',
    'prod_analytics_composer_environment': 'my-environment',
}

Operators and Hooks

Common Operators

# Google Cloud operators
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
    BigQueryCreateEmptyDatasetOperator,
    BigQueryGetDataOperator,
)
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowCreateJavaFlexTemplateOperator,
    DataflowCreatePythonFlexTemplateOperator,
)
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateClusterOperator,
    DataprocSubmitJobOperator,
    DataprocDeleteClusterOperator,
)

# Example: BigQuery operations
create_dataset = BigQueryCreateEmptyDatasetOperator(
    task_id='create_dataset',
    dataset_id='analytics',
    location='US',
    gcp_conn_id='google_cloud_default',
)

run_query = BigQueryInsertJobOperator(
    task_id='run_query',
    configuration={
        'query': {
            'query': 'SELECT * FROM analytics.sales',
            'useLegacySql': False,
        }
    },
    gcp_conn_id='google_cloud_default',
)

Custom Operators

# Create custom operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

class CustomBigQueryOperator(BaseOperator):
    """Custom BigQuery operator."""
    
    @apply_defaults
    def __init__(self, sql, destination_table, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.sql = sql
        self.destination_table = destination_table
    
    def execute(self, context):
        """Execute the operator."""
        hook = BigQueryHook(gcp_conn_id='google_cloud_default')
        conn = hook.get_conn()
        cursor = conn.cursor()
        
        # Execute query
        cursor.execute(self.sql)
        
        # Get results
        results = cursor.fetchall()
        
        # Write to destination
        if self.destination_table:
            write_to_bigquery(results, self.destination_table)
        
        return results

Monitoring and Alerting

Monitoring Dashboard

# Monitor Cloud Composer metrics
from google.cloud import monitoring_v3
import time

def monitor_composer_environment(project_id, environment_name):
    """Monitor Cloud Composer environment."""
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{project_id}"
    
    # Monitor DAG count
    interval = monitoring_v3.TimeInterval()
    interval.end_time = time.time()
    interval.start_time = time.time() - 3600
    
    results = client.list_time_series(
        request={
            'name': project_name,
            'filter': f'resource.label.environment_name = "{environment_name}" AND metric.type = "composer.googleapis.com/environment/dag_count"',
            'interval': interval,
            'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )
    
    for result in results:
        print(f'DAG count: {result.points[0].value.int64_value}')

Alerting Policies

# Create alert for failed tasks
from google.cloud import monitoring_v3

client = monitoring_v3.AlertPolicyServiceClient()
project_name = f"projects/my-project"

alert_policy = monitoring_v3.AlertPolicy(
    display_name="Composer Failed Tasks Alert",
    conditions=[
        monitoring_v3.AlertPolicy.Condition(
            display_name="High failed task rate",
            condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
                filter='metric.type="composer.googleapis.com/environment/failed_task_count"',
                comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
                threshold_value=5,
                duration={"seconds": 300},
                aggregations=[
                    monitoring_v3.Aggregation(
                        alignment_period={"seconds": 60},
                        per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_RATE
                    )
                ]
            )
        )
    ],
    notification_channels=[],
    alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
        auto_close={"seconds": 1800}
    )
)

response = client.create_alert_policy(
    request={"name": project_name, "alert_policy": alert_policy}
)

Best Practices

  1. Use version control - Store DAGs in Git repositories
  2. Implement testing - Test DAGs before deployment
  3. Monitor performance - Track DAG execution times and resource usage
  4. Use proper error handling - Implement retries and alerting
  5. Optimize scheduling - Avoid scheduling conflicts and resource contention
  6. Document pipelines - Provide clear documentation for each DAG
  7. Implement security - Use IAM and encrypted connections
⭐

Premium Content

Cloud Composer - Managed Airflow for Orchestration

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement