Environment Sizes
DAG Patterns
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.sensors.external_task import ExternalTaskSensor
default_args = {
'owner': 'data-engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='production_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
tags=['production', 'etl'],
) as dag:
# Wait for upstream DAG
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_pipeline',
external_task_id='load_complete',
mode='reschedule',
poke_interval=300,
timeout=3600,
)
# Transform data
transform = BigQueryInsertJobOperator(
task_id='transform_data',
configuration={
'query': {
'query': '''
CREATE OR REPLACE TABLE `project.analytics.summary` AS
SELECT * FROM `project.staging.raw_data`
''',
'useLegacySql': False,
}
},
location='us-central1',
)
wait_for_upstream >> transform
β¨
Best Practice: Use ExternalTaskSensor for cross-DAG dependencies. Implement SLAs and timeouts. Use TaskGroups for organization. Tag DAGs for filtering. Use Jinja templates for dynamic parameters. Implement proper error handling with retries.
Common Interview Questions
Q1: What is the difference between Cloud Composer 1 and 2?
Answer: Composer 2 runs on GKE with better isolation, autoscaling, and custom worker pools. It supports Airflow 2.x with modern features. Composer 1 runs on Compute Engine with less flexibility. Use Composer 2 for new deployments.
Q2: How do you handle cross-DAG dependencies?
Answer: Use ExternalTaskSensor to wait for tasks in other DAGs. Use ExternalTaskMarker to mark completion. For complex dependencies, use Airflow's Dataset feature (Airflow 2.4+). Implement proper timeout handling.
Q3: What are sensors in Airflow?
Answer: Sensors are operators that wait for conditions before executing. Common sensors: FileSensor, ExternalTaskSensor, HttpSensor, BigQueryTableSensor. Use poke_interval and timeout to control polling behavior.
Q4: How do you optimize Cloud Composer costs?
Answer: 1) Use appropriate environment size, 2) Enable autoscaling, 3) Optimize DAG parsing (avoid heavy imports), 4) Implement SLAs, 5) Use TaskGroups to reduce task count, 6) Schedule during off-peak hours.
Q5: How do you implement CI/CD for Cloud Composer?
Answer: 1) Store DAGs in Git, 2) Use Cloud Build for CI/CD, 3) Deploy to GCS bucket via Composer's DAG folder, 4) Test DAGs with pytest, 5) Use environment variables for configuration, 6) Implement rollback procedures.