Interview Question
ℹ️Interview Context
Company: Amazon AWS / Uber Role: Data Engineer / Software Engineer Difficulty: Intermediate Time: 30-45 minutes
Question: "Explain the difference between Operators, Sensors, and Hooks in Airflow. When would you use each? Provide examples of custom operators and when you'd need to create one."
Detailed Theory
Operator Fundamentals
# operator_fundamentals.py
"""
Operator Types in Airflow:
1. Operators: Execute a single task
- BashOperator: Run shell commands
- PythonOperator: Run Python callables
- EmailOperator: Send emails
- HttpOperator: Make HTTP requests
2. Sensors: Wait for a condition to be met
- FileSensor: Wait for file
- S3KeySensor: Wait for S3 object
- ExternalTaskSensor: Wait for external task
3. Hooks: Interface to external systems
- S3Hook: AWS S3 operations
- PostgresHook: PostgreSQL operations
- HttpHook: HTTP requests
Key Difference:
- Operators DO work
- Sensors WAIT for work
- Hooks CONNECT to systems
"""
1. BashOperator
# bash_operator.py
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.decorators import dag
@dag(dag_id='bash_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def bash_examples():
# Simple command
simple_cmd = BashOperator(
task_id='simple_command',
bash_command='echo "Hello World"',
)
# Command with environment variables
env_cmd = BashOperator(
task_id='env_command',
bash_command='echo "Processing $INPUT_FILE"',
env={'INPUT_FILE': '/data/input.csv'},
)
# Command with XCom push
xcom_cmd = BashOperator(
task_id='xcom_push_command',
bash_command='echo "{{ ti.xcom_push(key="result", value="success") }}"',
)
# Multi-line command
multi_cmd = BashOperator(
task_id='multi_line_command',
bash_command="""
echo "Step 1: Processing"
python /opt/scripts/process.py
echo "Step 2: Complete"
""",
)
# Command with error handling
error_cmd = BashOperator(
task_id='error_handling_command',
bash_command='python /opt/scripts/process.py || echo "Script failed"',
cwd='/opt/airflow', # Working directory
)
simple_cmd >> env_cmd >> xcom_cmd >> multi_cmd >> error_cmd
bash_examples()
⚠️Important
BashOperator executes commands in a subprocess. Environment variables from the Airflow process are NOT automatically inherited. Use the env parameter to pass specific variables.
2. PythonOperator
# python_operator.py
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
from airflow.decorators import dag
from typing import Dict, Any
def process_data(**context):
"""Process data with access to Airflow context"""
# Get task instance
ti = context['ti']
# Get XCom from upstream task
upstream_data = ti.xcom_pull(task_ids='extract_data')
# Process data
result = {
'processed': True,
'records': len(upstream_data),
'timestamp': str(datetime.now())
}
# Push result to XCom
ti.xcom_push(key='result', value=result)
return result
def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform data without context"""
return {
'transformed': True,
'input': data,
'output': 'transformed_value'
}
@dag(dag_id='python_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def python_examples():
from airflow.operators.python import PythonOperator
# Simple PythonOperator
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
op_kwargs={'extra_param': 'value'}, # Additional kwargs
)
# PythonOperator with op_args
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
op_args=[{'input': 'value'}], # Positional args
)
# BranchPythonOperator
branch_task = BranchPythonOperator(
task_id='branch_decision',
python_callable=lambda: 'path_a',
)
process_task >> transform_task >> branch_task
python_examples()
3. EmailOperator
# email_operator.py
from airflow.operators.email import EmailOperator
from datetime import datetime
from airflow.decorators import dag
@dag(dag_id='email_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def email_examples():
# Simple email
simple_email = EmailOperator(
task_id='simple_email',
to=['team@company.com'],
subject='Pipeline Complete',
html_content='<h1>DAG completed successfully</h1>',
)
# Email with file attachment
email_with_attachment = EmailOperator(
task_id='email_with_attachment',
to=['team@company.com'],
subject='Daily Report',
html_content='<h1>Report attached</h1>',
files=['/tmp/report.csv'],
)
# Email with templated content
templated_email = EmailOperator(
task_id='templated_email',
to=['{{ params.recipient }}'],
subject='Report for {{ ds }}',
html_content="""
<h2>Daily Report</h2>
<p>Date: {{ ds }}</p>
<p>Records processed: {{ ti.xcom_pull(task_ids='process') }}</p>
""",
params={'recipient': 'team@company.com'},
)
simple_email >> email_with_attachment >> templated_email
email_examples()
4. HttpOperator
# http_operator.py
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime
from airflow.decorators import dag
@dag(dag_id='http_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def http_examples():
# Simple HTTP GET
http_get = SimpleHttpOperator(
task_id='http_get',
http_conn_id='api_default',
endpoint='/api/data',
method='GET',
headers={'Content-Type': 'application/json'},
response_check=lambda response: response.status_code == 200,
)
# HTTP POST with JSON body
http_post = SimpleHttpOperator(
task_id='http_post',
http_conn_id='api_default',
endpoint='/api/submit',
method='POST',
data='{"key": "value"}',
headers={'Content-Type': 'application/json'},
)
# HTTP with XCom data
http_with_xcom = SimpleHttpOperator(
task_id='http_with_xcom',
http_conn_id='api_default',
endpoint='/api/process',
method='POST',
data='{{ ti.xcom_pull(task_ids="extract") }}',
)
# HTTP Sensor (wait for API)
http_sensor = HttpSensor(
task_id='http_sensor',
http_conn_id='api_default',
endpoint='/api/health',
poke_interval=30,
timeout=300,
)
http_get >> http_post >> http_with_xcom
http_examples()
5. S3 Operators
# s3_operators.py
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateObjectOperator,
S3CopyObjectOperator,
S3DeleteObjectsOperator,
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime
from airflow.decorators import dag
@dag(dag_id='s3_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def s3_examples():
# Create S3 object
create_object = S3CreateObjectOperator(
task_id='create_object',
aws_conn_id='aws_default',
bucket_name='my-bucket',
key='data/output.csv',
data=b'col1,col2\nvalue1,value2',
)
# Copy S3 object
copy_object = S3CopyObjectOperator(
task_id='copy_object',
aws_conn_id='aws_default',
source_bucket_name='my-bucket',
source_bucket_key='data/input.csv',
dest_bucket_name='my-bucket',
dest_bucket_key='data/backup/input.csv',
)
# Delete S3 objects
delete_objects = S3DeleteObjectsOperator(
task_id='delete_objects',
aws_conn_id='aws_default',
bucket_name='my-bucket',
prefix='data/temp/',
)
# Wait for S3 object
wait_for_object = S3KeySensor(
task_id='wait_for_object',
aws_conn_id='aws_default',
bucket_name='my-bucket',
key='data/input.csv',
poke_interval=30,
timeout=300,
)
wait_for_object >> create_object >> copy_object >> delete_objects
s3_examples()
6. BigQuery Operators
# bigquery_operators.py
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCheckOperator,
BigQueryValueCheckOperator,
BigQueryTableCheckOperator,
)
from datetime import datetime
from airflow.decorators import dag
@dag(dag_id='bigquery_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def bigquery_examples():
# Insert job
insert_job = BigQueryInsertJobOperator(
task_id='insert_job',
configuration={
'query': {
'query': '''
SELECT * FROM `project.dataset.table`
WHERE date = '{{ ds }}'
''',
'useLegacySql': False,
}
},
location='US',
)
# Check operator
check = BigQueryCheckOperator(
task_id='check_data',
sql='SELECT COUNT(*) FROM `project.dataset.table` WHERE date = "{{ ds }}"',
use_legacy_sql=False,
)
# Value check
value_check = BigQueryValueCheckOperator(
task_id='value_check',
sql='SELECT COUNT(*) FROM `project.dataset.table`',
pass_value=1000,
tolerance=0.1,
use_legacy_sql=False,
)
insert_job >> check >> value_check
bigquery_examples()
ℹ️Pro Tip
When using BigQuery operators, always set use_legacy_sql=False to use Standard SQL. Legacy SQL has limited functionality and is deprecated.
7. Custom Operators
# custom_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict, Optional
import logging
class CustomDataProcessor(BaseOperator):
"""
Custom operator for data processing.
:param input_path: Path to input data
:param output_path: Path to output data
:param processing_options: Additional processing options
"""
template_fields = ('input_path', 'output_path') # Templatable fields
template_ext = ('.sql', '.json') # Template extensions
@apply_defaults
def __init__(
self,
input_path: str,
output_path: str,
processing_options: Optional[Dict[str, Any]] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.input_path = input_path
self.output_path = output_path
self.processing_options = processing_options or {}
def execute(self, context):
"""Main execution logic"""
self.log.info(f"Processing {self.input_path}")
# Custom processing logic
result = self._process_data()
# Push result to XCom
context['ti'].xcom_push(
key='processing_result',
value={
'input': self.input_path,
'output': self.output_path,
'records': result['count']
}
)
self.log.info(f"Processing complete: {result}")
return result
def _process_data(self) -> Dict[str, Any]:
"""Internal processing method"""
# Implementation here
return {'count': 1000, 'status': 'success'}
def on_kill(self):
"""Cleanup on kill"""
self.log.info("Custom operator killed, cleaning up...")
# Usage
custom_task = CustomDataProcessor(
task_id='process_data',
input_path='s3://bucket/input/{{ ds }}',
output_path='s3://bucket/output/{{ ds }}',
processing_options={'batch_size': 1000},
)
8. TaskFlow API (@task decorator)
# taskflow_api.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict
@dag(
dag_id='taskflow_api_examples',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
)
def taskflow_examples():
@task
def extract() -> List[Dict]:
"""Extract data - returns list of dicts"""
return [{'id': 1, 'value': 'a'}, {'id': 2, 'value': 'b'}]
@task
def transform(data: List[Dict]) -> List[Dict]:
"""Transform data - receives and returns"""
return [{'id': d['id'], 'value': d['value'].upper()} for d in data]
@task
def load(data: List[Dict]) -> bool:
"""Load data - receives and returns status"""
# Load to target
return True
@task
def notify(success: bool) -> None:
"""Notify - receives and returns nothing"""
if success:
print("Pipeline completed successfully")
# Dependencies are automatic based on parameter names
raw = extract()
transformed = transform(raw)
loaded = load(transformed)
notify(loaded)
taskflow_examples()
ℹ️TaskFlow API Benefits
The TaskFlow API with @task decorator provides:
- Automatic XCom passing between tasks
- Type hints for better IDE support
- Cleaner, more Pythonic code
- Automatic serialization of return values
Real-World Scenarios
Scenario 1: Uber's Data Pipeline
# uber_pipeline.py
"""
Uber-style data pipeline:
- Ingest from multiple sources
- Process with custom logic
- Load to data warehouse
"""
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from datetime import datetime
from typing import List, Dict
@dag(
dag_id='uber_ride_data_pipeline',
schedule_interval='*/15 * * * *', # Every 15 minutes
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['uber', 'real-time', 'production'],
)
def uber_pipeline():
@task
def extract_ride_data() -> List[Dict]:
"""Extract ride data from Kafka"""
# Kafka consumer logic
return [{'ride_id': '123', 'status': 'completed'}]
@task
def validate_data(data: List[Dict]) -> List[Dict]:
"""Validate data quality"""
valid_data = []
for record in data:
if self._validate_record(record):
valid_data.append(record)
return valid_data
@task
def process_data(data: List[Dict]) -> Dict:
"""Process ride data"""
# Complex processing logic
return {
'total_rides': len(data),
'processed_at': datetime.now().isoformat()
}
@task
def load_to_warehouse(processed: Dict) -> bool:
"""Load to data warehouse"""
# BigQuery/Snowflake load
return True
@task
def send_metrics(processed: Dict) -> None:
"""Send metrics to monitoring"""
# Send to Datadog/Prometheus
pass
# Pipeline
raw = extract_ride_data()
validated = validate_data(raw)
processed = process_data(validated)
loaded = load_to_warehouse(processed)
send_metrics(processed)
uber_pipeline()
Scenario 2: Amazon's S3 Processing
# amazon_s3_processing.py
"""
Amazon-style S3 processing pipeline:
- Wait for S3 object
- Process file
- Load to Redshift
"""
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from datetime import datetime
@dag(
dag_id='amazon_s3_processing',
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['amazon', 's3', 'production'],
)
def s3_processing():
# Wait for file
wait_file = S3KeySensor(
task_id='wait_for_file',
bucket_name='data-lake',
key='incoming/{{ ds }}/data.csv',
poke_interval=30,
timeout=3600,
)
@task
def process_file() -> dict:
"""Process S3 file"""
# Download and process
return {'records': 1000}
@task
def load_to_redshift(data: dict) -> bool:
"""Load to Redshift"""
# Redshift COPY command
return True
@task
def archive_file() -> None:
"""Archive processed file"""
# Move to archive bucket
pass
# Pipeline
processed = process_file()
loaded = load_to_redshift(processed)
archive = archive_file()
wait_file >> processed >> loaded >> archive
s3_processing()
Edge Cases
⚠️Common Pitfalls
-
Operator Idempotency: Ensure operators produce the same result on re-run. Use unique identifiers for data operations.
-
XCom Size Limits: XCom values are stored in the database. Keep payloads small (< 48KB). Use alternative storage for large data.
-
Template Rendering: Use
{{ ds }}for execution date, not{{ now() }}. Templates are rendered at execution time. -
Connection Reuse: Operators create new connections by default. Use connection pooling for high-frequency operations.
# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime
@dag(dag_id='edge_case_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
# Idempotent operation
@task
def idempotent_task():
"""Use unique run_id for idempotency"""
# BAD: Always creates same file
# with open('/tmp/output.csv', 'w') as f:
# f.write('data')
# GOOD: Use run_id for unique file
with open('/tmp/output_{{ run_id }}.csv', 'w') as f:
f.write('data')
# Template usage
@task
def template_task():
"""Use proper template variables"""
# Execution date
print(f"Execution date: {{ ds }}")
# Previous execution date
print(f"Previous: {{ prev_ds }}")
# Dag run ID
print(f"Run ID: {{ run_id }}")
idempotent_task() >> template_task()
edge_cases()
QuizBox
Best Practices
# best_practices.py
"""
Operator Best Practices:
1. Choose the Right Operator:
- Use built-in operators when available
- Create custom operators for reusable logic
- Use Sensors for waiting conditions
2. Idempotency:
- Design operators to be idempotent
- Use unique identifiers for data operations
- Handle partial failures gracefully
3. Error Handling:
- Set appropriate retry parameters
- Use callbacks for notifications
- Log meaningful messages
4. Performance:
- Use connection pooling
- Minimize XCom usage
- Use appropriate timeouts
5. Security:
- Use connections for credentials
- Never hardcode secrets
- Use variables for configuration
"""
ℹ️Amazon Interview Tip
At Amazon, they heavily use custom operators for their internal tools. When discussing operators, emphasize the importance of idempotency, error handling, and the ability to create reusable components. Also mention how they handle operator failures with automatic retries and alerting.
Summary
Operators are the building blocks of Airflow tasks. Key takeaways:
- Operators execute work, Sensors wait for conditions, Hooks connect to systems
- Use TaskFlow API for cleaner Python task code
- Create custom operators for reusable, complex logic
- Ensure idempotency in all operators
- Use proper error handling and retry mechanisms
For Amazon and Uber interviews, focus on:
- Choosing the right operator for the job
- Creating reusable custom operators
- Handling failures gracefully
- Performance optimization
- Security best practices
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these operators before your interview.