Cloud Composer - Managed Airflow for Orchestration
Cloud Composer Architecture
Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. It enables you to author, schedule, and monitor complex data pipelines.
Core Components
Airflow Web Server:
- Provides web UI for monitoring and managing DAGs
- Shows task status, logs, and history
- Enables manual task triggering
Scheduler:
- Triggers scheduled workflows
- Monitors task instances
- Manages task dependencies
Cloud SQL:
- Stores Airflow metadata
- Tracks DAG runs, task instances, and logs
- Manages connections and variables
GCS Bucket:
- Stores DAG files
- Stores plugins and configuration
- Manages logs and temporary files
Workers:
- Execute Airflow tasks
- Handle long-running operations
- Support various operator types
Environment Setup
Creating an Environment
# Create a Cloud Composer environment
gcloud composer environments create my-environment \
--location=us-central1 \
--airflow-version=2.6.3 \
--python-version=3.11 \
--machine-type=e2-medium \
--node-count=3 \
--disk-size=30GB
# Create environment with custom configuration
gcloud composer environments create my-production-env \
--location=us-central1 \
--airflow-version=2.6.3 \
--python-version=3.11 \
--machine-type=e2-standard-4 \
--node-count=5 \
--disk-size=100GB \
--web-server-machines=e2-medium \
--scheduler-machines=e2-medium \
--environment-size=ENVIRONMENT_SIZE_MEDIUM
Environment Configuration
# Configure environment via API
from google.cloud import composer_v1
client = composer_v1.EnvironmentsClient()
environment = {
'name': 'my-environment',
'config': {
'node_config': {
'machine_type': 'e2-standard-4',
'disk_size_gb': 100,
'network': 'projects/my-project/global/networks/default',
'subnetwork': 'projects/my-project/regions/us-central1/subnetworks/default',
'service_account': 'my-composer-sa@my-project.iam.gserviceaccount.com'
},
'software_config': {
'airflow_config_overrides': {
'core': {
'parallelism': 32,
'max_active_runs_per_dag': 16,
'dag_file_processor_timeout': 300
}
},
'pypi_packages': {
'apache-airflow-providers-google': '10.0.0',
'pandas': '2.0.0'
},
'env_variables': {
'PROJECT_ID': 'my-project',
'REGION': 'us-central1'
}
}
}
}
operation = client.create_environment(
request={'parent': 'projects/my-project/locations/us-central1', 'environment': environment}
)
DAG Management
Creating DAGs
# Basic DAG structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['alerts@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2024, 1, 1),
}
dag = DAG(
'daily_data_pipeline',
default_args=default_args,
description='Daily data processing pipeline',
schedule_interval='@daily',
catchup=False,
tags=['data-engineering', 'production'],
)
# Define tasks
def extract_data(**context):
"""Extract data from source."""
# Your extraction logic
pass
def transform_data(**context):
"""Transform data."""
# Your transformation logic
pass
# Task definitions
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
load_task = GCSToBigQueryOperator(
task_id='load_to_bigquery',
bucket='my-bucket',
source_objects=['data/{{ ds }}/data.csv'],
destination_project_dataset_table='my-project:analytics.raw_data',
schema_fields=[
{'name': 'id', 'type': 'STRING'},
{'name': 'value', 'type': 'FLOAT64'},
],
write_disposition='WRITE_APPEND',
dag=dag,
)
transform_task = BigQueryInsertJobOperator(
task_id='transform_data',
configuration={
'query': {
'query': '''
CREATE OR REPLACE TABLE analytics.processed_data AS
SELECT * FROM analytics.raw_data
WHERE value > 0
''',
'useLegacySql': False,
}
},
dag=dag,
)
# Set task dependencies
extract_task >> load_task >> transform_task
DAG Configuration Options
# Advanced DAG configuration
dag = DAG(
'advanced_pipeline',
default_args=default_args,
description='Advanced pipeline with complex scheduling',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 12, 31),
catchup=True,
max_active_runs=1,
concurrency=16,
tags=['production', 'critical'],
doc_md="""
## Daily Data Pipeline
This pipeline processes daily data from multiple sources.
### Schedule
- Runs daily at 2 AM UTC
- Backfills historical data
### Dependencies
- Source system availability
- GCS bucket permissions
- BigQuery project access
""",
)
Connections
Managing Connections
# Create connection via Airflow CLI
# airflow connections add 'google_cloud_default' \
# --conn-type 'google_cloud_platform' \
# --conn-extra '{"keyfile_path": "/path/to/key.json"}'
# Use connection in DAG
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
def query_data():
"""Query BigQuery using connection."""
hook = BigQueryHook(gcp_conn_id='google_cloud_default')
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("SELECT * FROM analytics.sales LIMIT 10")
results = cursor.fetchall()
return results
Connection Types
# Common connection types for data engineering
connection_types = {
'google_cloud_platform': 'GCP services (BigQuery, GCS, Dataflow)',
'postgres': 'PostgreSQL databases',
'mysql': 'MySQL databases',
'http': 'HTTP endpoints',
's3': 'AWS S3 storage',
'gcs': 'Google Cloud Storage',
'bigquery': 'BigQuery specific connection',
'dataflow': 'Dataflow specific connection',
}
# Create connections programmatically
from airflow.models import Connection
from airflow import settings
def create_gcp_connection():
"""Create GCP connection."""
conn = Connection(
conn_id='google_cloud_default',
conn_type='google_cloud_platform',
extra='{"keyfile_path": "/path/to/key.json"}'
)
session = settings.Session()
session.add(conn)
session.commit()
Variables
Using Variables
# Set variable via Airflow CLI
# airflow variables set 'my_variable' 'my_value'
# Use variable in DAG
from airflow.models import Variable
def use_variable():
"""Use variable in task."""
config_value = Variable.get('config_value', default_var='default')
# Use in task logic
print(f"Config value: {config_value}")
# Variable with JSON
config = Variable.get('pipeline_config', deserialize_json=True)
# Returns: {"key1": "value1", "key2": "value2"}
Variable Best Practices
# Best practices for variables
best_practices = {
'naming': 'Use descriptive, hierarchical names (e.g., prod_analytics_db_host)',
'security': 'Use Airflow Connections for sensitive data, not Variables',
'versioning': 'Include version in variable names for schema changes',
'documentation': 'Document variable purpose and expected values',
'default_values': 'Always provide default values for non-critical variables',
}
# Example variable structure
variables = {
'prod_analytics_bigquery_project': 'my-project',
'prod_analytics_gcs_bucket': 'my-bucket',
'prod_analytics_dataflow_region': 'us-central1',
'prod_analytics_composer_environment': 'my-environment',
}
Operators and Hooks
Common Operators
# Google Cloud operators
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCreateEmptyDatasetOperator,
BigQueryGetDataOperator,
)
from airflow.providers.google.cloud.operators.dataflow import (
DataflowCreateJavaFlexTemplateOperator,
DataflowCreatePythonFlexTemplateOperator,
)
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator,
DataprocDeleteClusterOperator,
)
# Example: BigQuery operations
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id='create_dataset',
dataset_id='analytics',
location='US',
gcp_conn_id='google_cloud_default',
)
run_query = BigQueryInsertJobOperator(
task_id='run_query',
configuration={
'query': {
'query': 'SELECT * FROM analytics.sales',
'useLegacySql': False,
}
},
gcp_conn_id='google_cloud_default',
)
Custom Operators
# Create custom operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
class CustomBigQueryOperator(BaseOperator):
"""Custom BigQuery operator."""
@apply_defaults
def __init__(self, sql, destination_table, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sql = sql
self.destination_table = destination_table
def execute(self, context):
"""Execute the operator."""
hook = BigQueryHook(gcp_conn_id='google_cloud_default')
conn = hook.get_conn()
cursor = conn.cursor()
# Execute query
cursor.execute(self.sql)
# Get results
results = cursor.fetchall()
# Write to destination
if self.destination_table:
write_to_bigquery(results, self.destination_table)
return results
Monitoring and Alerting
Monitoring Dashboard
# Monitor Cloud Composer metrics
from google.cloud import monitoring_v3
import time
def monitor_composer_environment(project_id, environment_name):
"""Monitor Cloud Composer environment."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"
# Monitor DAG count
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600
results = client.list_time_series(
request={
'name': project_name,
'filter': f'resource.label.environment_name = "{environment_name}" AND metric.type = "composer.googleapis.com/environment/dag_count"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'DAG count: {result.points[0].value.int64_value}')
Alerting Policies
# Create alert for failed tasks
from google.cloud import monitoring_v3
client = monitoring_v3.AlertPolicyServiceClient()
project_name = f"projects/my-project"
alert_policy = monitoring_v3.AlertPolicy(
display_name="Composer Failed Tasks Alert",
conditions=[
monitoring_v3.AlertPolicy.Condition(
display_name="High failed task rate",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter='metric.type="composer.googleapis.com/environment/failed_task_count"',
comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
threshold_value=5,
duration={"seconds": 300},
aggregations=[
monitoring_v3.Aggregation(
alignment_period={"seconds": 60},
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_RATE
)
]
)
)
],
notification_channels=[],
alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
auto_close={"seconds": 1800}
)
)
response = client.create_alert_policy(
request={"name": project_name, "alert_policy": alert_policy}
)
Best Practices
- Use version control - Store DAGs in Git repositories
- Implement testing - Test DAGs before deployment
- Monitor performance - Track DAG execution times and resource usage
- Use proper error handling - Implement retries and alerting
- Optimize scheduling - Avoid scheduling conflicts and resource contention
- Document pipelines - Provide clear documentation for each DAG
- Implement security - Use IAM and encrypted connections