Cloud Composer Architecture
Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow. It automates data pipeline scheduling, monitoring, and management.
Architecture Overview
Environment Creation
# Create Cloud Composer environment
gcloud composer environments create my-composer-env \
--location=us-central1 \
--airflow-version=2.7.3 \
--python-version=3.11 \
--environment-size=medium \
--disk-size=30GB \
--master-machine-type=n1-standard-2 \
--worker-machine-type=n1-standard-2 \
--num-workers=3 \
--image-version=composer-2.7.3-airflow-2.7.3 \
--pypi-packages apache-airflow-providers-google \
--env-variables=CORE_LOAD_EXAMPLES=false \
--web-server-allowlist=0.0.0.0/0
# Check environment status
gcloud composer environments describe my-composer-env \
--location=us-central1 \
--format="value(state)"
DAG Development
Basic DAG Structure
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.task_group import TaskGroup
# Default DAG arguments
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'email': ['data-team@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
'sla': timedelta(hours=4),
}
# Define DAG
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline for sales data',
schedule_interval='0 2 * * *', # Daily at 2 AM UTC
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'daily', 'sales'],
params={
'source_date': '{{ ds }}',
'target_date': '{{ ds }}',
}
) as dag:
# Task 1: Extract data from GCS
extract_task = GCSToBigQueryOperator(
task_id='extract_to_bronze',
bucket='my-data-lake',
source_objects=['raw/sales/{{ ds }}/*.csv'],
destination_project_dataset_table='project.bronze.sales',
source_format='CSV',
skip_leading_rows=1,
autodetect=True,
write_disposition='WRITE_TRUNCATE',
location='us-central1',
)
# Task 2: Transform with SQL
with TaskGroup('transformations') as transform_group:
transform_sql = BigQueryInsertJobOperator(
task_id='silver_transform',
configuration={
'query': {
'query': '''
CREATE OR REPLACE TABLE `project.silver.sales` AS
SELECT
SAFE_CAST(order_id AS STRING) as order_id,
SAFE_CAST(customer_id AS STRING) as customer_id,
SAFE_CAST(product_id AS STRING) as product_id,
SAFE_CAST(amount AS FLOAT64) as amount,
SAFE_CAST(order_date AS DATE) as order_date,
TIMESTAMP '{{ ts }}' as processed_at
FROM `project.bronze.sales`
WHERE amount > 0
AND customer_id IS NOT NULL
''',
'useLegacySql': False,
}
},
location='us-central1',
)
quality_check = BigQueryInsertJobOperator(
task_id='quality_check',
configuration={
'query': {
'query': '''
SELECT
COUNT(*) as total_rows,
COUNTIF(amount IS NULL) as null_amounts,
COUNTIF(amount <= 0) as invalid_amounts,
COUNT(DISTINCT customer_id) as unique_customers
FROM `project.silver.sales`
WHERE DATE(processed_at) = '{{ ds }}'
''',
'useLegacySql': False,
}
},
location='us-central1',
)
transform_sql >> quality_check
# Task 3: Load to Gold
load_gold = BigQueryInsertJobOperator(
task_id='load_gold',
configuration={
'query': {
'query': '''
CREATE OR REPLACE TABLE `project.gold.sales_summary`
PARTITION BY order_date
CLUSTER BY product_id
AS
SELECT
order_date,
product_id,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM `project.silver.sales`
WHERE DATE(processed_at) = '{{ ds }}'
GROUP BY 1, 2
''',
'useLegacySql': False,
}
},
location='us-central1',
)
# Define dependencies
extract_task >> transform_group >> load_gold
β¨
Best Practice: Use TaskGroups to organize related tasks and improve DAG readability. Implement SLAs and timeouts to prevent stuck tasks. Use Jinja templates for dynamic parameters. Tag DAGs for better organization and filtering.
Advanced DAG Patterns
Dynamic Task Generation
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
def generate_table_tasks(**context):
"""Dynamically generate tasks based on table list."""
tables = ['orders', 'customers', 'products', 'inventory']
return tables
with DAG(
'dynamic_table_pipeline',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
) as dag:
get_tables = PythonOperator(
task_id='get_tables',
python_callable=generate_table_tasks,
)
def process_table(table_name, **context):
"""Process individual table."""
print(f"Processing table: {table_name}")
# Table-specific logic here
# Dynamic task generation
@dag.task_group
def process_all_tables(tables):
for table in tables:
PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
op_kwargs={'table_name': table},
)
tables = get_tables.output
process_all_tables(tables)
Error Handling and Retries
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
def validate_data(**context):
"""Validate data quality."""
# Validation logic
is_valid = True # From actual validation
return is_valid
def on_success_callback(context):
"""Handle successful task completion."""
print(f"Task {context['task'].task_id} succeeded")
def on_failure_callback(context):
"""Handle task failure."""
print(f"Task {context['task'].task_id} failed")
# Send alert, update monitoring, etc.
with DAG(
'error_handling_pipeline',
start_date=datetime(2025, 1, 1),
on_success_callback=on_success_callback,
on_failure_callback=on_failure_callback,
) as dag:
validate = PythonOperator(
task_id='validate',
python_callable=validate_data,
)
def branch_on_validation(is_valid, **context):
if is_valid:
return 'process_valid'
return 'handle_invalid'
branch = BranchPythonOperator(
task_id='branch_on_validation',
python_callable=branch_on_validation,
op_args=[validate.output],
)
process_valid = PythonOperator(
task_id='process_valid',
python_callable=lambda: print("Processing valid data"),
)
handle_invalid = PythonOperator(
task_id='handle_invalid',
python_callable=lambda: print("Handling invalid data"),
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)
validate >> branch >> [process_valid, handle_invalid]
GKE Worker Configuration
# Custom node pools for different workloads
gcloud composer environments update my-composer-env \
--location=us-central1 \
--update-worker-pool-config=type=CUSTOM,customize=cpu=4:memory=16:disk=50
# Pre-emptible workers for cost optimization
gcloud composer environments update my-composer-env \
--location=us-central1 \
--update-worker-pool-config=type=PREEMPTIBLE
Monitoring and Alerting
# Cloud Monitoring alert for DAG failures
from google.cloud import monitoring_v3
def create_alert_policy(project_id):
"""Create alert policy for DAG failures."""
client = monitoring_v3.AlertPolicyServiceClient()
alert_policy = monitoring_v3.AlertPolicy(
display_name="Cloud Composer DAG Failure",
conditions=[
monitoring_v3.AlertPolicy.Condition(
display_name="DAG Run Failure",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter='resource.type="cloud_composer_environment" AND '
'metric.type="composer.googleapis.com/environment/dag_runs/status" AND '
'metric.labels.state="failed"',
comparison=monitoring_v3.ComparisonType.COMPARISON_GT,
threshold_value=0,
duration="60s",
aggregations=[
monitoring_v3.Aggregation(
alignment_period="300s",
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_RATE,
)
],
),
)
],
notification_channels=[],
alert_strategy=monitoring_v3.AlertPolicy.AlertStrategy(
auto_close="1800s"
),
)
request = monitoring_v3.CreateAlertPolicyRequest(
name=f"projects/{project_id}",
alert_policy=alert_policy,
)
response = client.create_alert_policy(request=request)
print(f"Created alert policy: {response.name}")
βΉοΈ
Cost Tip: Cloud Composer charges based on environment size and worker configuration. Use small/medium environments for development, large for production. Enable autoscaling and use pre-emptible workers for cost optimization. Schedule DAGs during off-peak hours when possible.
Common Interview Questions
Q1: What is the difference between Cloud Composer and Cloud Workflows?
Answer: Cloud Composer is for complex data pipeline orchestration with dependencies, branching, and error handling. Cloud Workflows is for simpler serverless workflow automation. Use Composer for ETL/ELT pipelines requiring Apache Airflow's rich features. Use Workflows for API orchestration and simple automation.
Q2: How do you handle dependencies in Cloud Composer?
Answer: Use the >> operator to define task dependencies. For complex dependencies, use trigger_rule settings. Use BranchPythonOperator for conditional execution. Use TaskGroups for logical grouping. Implement sensors for external dependencies (file existence, API availability).
Q3: What are sensors in Airflow?
Answer: Sensors are operators that wait for a condition to be met before executing. Common sensors: FileSensor (wait for file), ExternalTaskSensor (wait for another DAG), HttpSensor (wait for API endpoint), BigQueryTableSensor (wait for table). Use poke_interval and timeout to control polling behavior.
Q4: How do you optimize Cloud Composer costs?
Answer: 1) Use appropriate environment size (small/medium/large), 2) Enable autoscaling for worker nodes, 3) Use pre-emptible workers, 4) Optimize DAG parsing (avoid heavy imports at module level), 5) Implement SLAs to prevent stuck tasks, 6) Use TaskGroups to reduce task count.
Q5: How do you implement CI/CD for Cloud Composer?
Answer: Store DAGs in a Git repository, use Cloud Build for CI/CD, deploy to GCS bucket via Composer's DAG folder. Use Cloud Composer's environment versioning for rollback. Implement testing frameworks (pytest, unittest) for DAG validation. Use environment variables for configuration management.