🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Topic: Airflow Operators Deep Dive

Apache Airflow AdvancedOperators⭐ Premium

Advertisement

Airflow Operators Deep Dive

Understanding Task Execution Building Blocks

AmazonUberDifficulty: Intermediate

Interview Question

ℹ️Interview Context

Company: Amazon AWS / Uber Role: Data Engineer / Software Engineer Difficulty: Intermediate Time: 30-45 minutes

Question: "Explain the difference between Operators, Sensors, and Hooks in Airflow. When would you use each? Provide examples of custom operators and when you'd need to create one."


Detailed Theory

Operator Fundamentals

# operator_fundamentals.py
"""
Operator Types in Airflow:

1. Operators: Execute a single task
   - BashOperator: Run shell commands
   - PythonOperator: Run Python callables
   - EmailOperator: Send emails
   - HttpOperator: Make HTTP requests

2. Sensors: Wait for a condition to be met
   - FileSensor: Wait for file
   - S3KeySensor: Wait for S3 object
   - ExternalTaskSensor: Wait for external task

3. Hooks: Interface to external systems
   - S3Hook: AWS S3 operations
   - PostgresHook: PostgreSQL operations
   - HttpHook: HTTP requests

Key Difference:
- Operators DO work
- Sensors WAIT for work
- Hooks CONNECT to systems
"""

1. BashOperator

# bash_operator.py
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.decorators import dag

@dag(dag_id='bash_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def bash_examples():
    # Simple command
    simple_cmd = BashOperator(
        task_id='simple_command',
        bash_command='echo "Hello World"',
    )
    
    # Command with environment variables
    env_cmd = BashOperator(
        task_id='env_command',
        bash_command='echo "Processing $INPUT_FILE"',
        env={'INPUT_FILE': '/data/input.csv'},
    )
    
    # Command with XCom push
    xcom_cmd = BashOperator(
        task_id='xcom_push_command',
        bash_command='echo "{{ ti.xcom_push(key="result", value="success") }}"',
    )
    
    # Multi-line command
    multi_cmd = BashOperator(
        task_id='multi_line_command',
        bash_command="""
            echo "Step 1: Processing"
            python /opt/scripts/process.py
            echo "Step 2: Complete"
        """,
    )
    
    # Command with error handling
    error_cmd = BashOperator(
        task_id='error_handling_command',
        bash_command='python /opt/scripts/process.py || echo "Script failed"',
        cwd='/opt/airflow',  # Working directory
    )
    
    simple_cmd >> env_cmd >> xcom_cmd >> multi_cmd >> error_cmd

bash_examples()

⚠️Important

BashOperator executes commands in a subprocess. Environment variables from the Airflow process are NOT automatically inherited. Use the env parameter to pass specific variables.

2. PythonOperator

# python_operator.py
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
from airflow.decorators import dag
from typing import Dict, Any

def process_data(**context):
    """Process data with access to Airflow context"""
    # Get task instance
    ti = context['ti']
    
    # Get XCom from upstream task
    upstream_data = ti.xcom_pull(task_ids='extract_data')
    
    # Process data
    result = {
        'processed': True,
        'records': len(upstream_data),
        'timestamp': str(datetime.now())
    }
    
    # Push result to XCom
    ti.xcom_push(key='result', value=result)
    
    return result

def transform_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """Transform data without context"""
    return {
        'transformed': True,
        'input': data,
        'output': 'transformed_value'
    }

@dag(dag_id='python_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def python_examples():
    from airflow.operators.python import PythonOperator
    
    # Simple PythonOperator
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
        op_kwargs={'extra_param': 'value'},  # Additional kwargs
    )
    
    # PythonOperator with op_args
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        op_args=[{'input': 'value'}],  # Positional args
    )
    
    # BranchPythonOperator
    branch_task = BranchPythonOperator(
        task_id='branch_decision',
        python_callable=lambda: 'path_a',
    )
    
    process_task >> transform_task >> branch_task

python_examples()

3. EmailOperator

# email_operator.py
from airflow.operators.email import EmailOperator
from datetime import datetime
from airflow.decorators import dag

@dag(dag_id='email_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def email_examples():
    # Simple email
    simple_email = EmailOperator(
        task_id='simple_email',
        to=['team@company.com'],
        subject='Pipeline Complete',
        html_content='<h1>DAG completed successfully</h1>',
    )
    
    # Email with file attachment
    email_with_attachment = EmailOperator(
        task_id='email_with_attachment',
        to=['team@company.com'],
        subject='Daily Report',
        html_content='<h1>Report attached</h1>',
        files=['/tmp/report.csv'],
    )
    
    # Email with templated content
    templated_email = EmailOperator(
        task_id='templated_email',
        to=['{{ params.recipient }}'],
        subject='Report for {{ ds }}',
        html_content="""
        <h2>Daily Report</h2>
        <p>Date: {{ ds }}</p>
        <p>Records processed: {{ ti.xcom_pull(task_ids='process') }}</p>
        """,
        params={'recipient': 'team@company.com'},
    )
    
    simple_email >> email_with_attachment >> templated_email

email_examples()

4. HttpOperator

# http_operator.py
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime
from airflow.decorators import dag

@dag(dag_id='http_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def http_examples():
    # Simple HTTP GET
    http_get = SimpleHttpOperator(
        task_id='http_get',
        http_conn_id='api_default',
        endpoint='/api/data',
        method='GET',
        headers={'Content-Type': 'application/json'},
        response_check=lambda response: response.status_code == 200,
    )
    
    # HTTP POST with JSON body
    http_post = SimpleHttpOperator(
        task_id='http_post',
        http_conn_id='api_default',
        endpoint='/api/submit',
        method='POST',
        data='{"key": "value"}',
        headers={'Content-Type': 'application/json'},
    )
    
    # HTTP with XCom data
    http_with_xcom = SimpleHttpOperator(
        task_id='http_with_xcom',
        http_conn_id='api_default',
        endpoint='/api/process',
        method='POST',
        data='{{ ti.xcom_pull(task_ids="extract") }}',
    )
    
    # HTTP Sensor (wait for API)
    http_sensor = HttpSensor(
        task_id='http_sensor',
        http_conn_id='api_default',
        endpoint='/api/health',
        poke_interval=30,
        timeout=300,
    )
    
    http_get >> http_post >> http_with_xcom

http_examples()

5. S3 Operators

# s3_operators.py
from airflow.providers.amazon.aws.operators.s3 import (
    S3CreateObjectOperator,
    S3CopyObjectOperator,
    S3DeleteObjectsOperator,
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime
from airflow.decorators import dag

@dag(dag_id='s3_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def s3_examples():
    # Create S3 object
    create_object = S3CreateObjectOperator(
        task_id='create_object',
        aws_conn_id='aws_default',
        bucket_name='my-bucket',
        key='data/output.csv',
        data=b'col1,col2\nvalue1,value2',
    )
    
    # Copy S3 object
    copy_object = S3CopyObjectOperator(
        task_id='copy_object',
        aws_conn_id='aws_default',
        source_bucket_name='my-bucket',
        source_bucket_key='data/input.csv',
        dest_bucket_name='my-bucket',
        dest_bucket_key='data/backup/input.csv',
    )
    
    # Delete S3 objects
    delete_objects = S3DeleteObjectsOperator(
        task_id='delete_objects',
        aws_conn_id='aws_default',
        bucket_name='my-bucket',
        prefix='data/temp/',
    )
    
    # Wait for S3 object
    wait_for_object = S3KeySensor(
        task_id='wait_for_object',
        aws_conn_id='aws_default',
        bucket_name='my-bucket',
        key='data/input.csv',
        poke_interval=30,
        timeout=300,
    )
    
    wait_for_object >> create_object >> copy_object >> delete_objects

s3_examples()

6. BigQuery Operators

# bigquery_operators.py
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
    BigQueryCheckOperator,
    BigQueryValueCheckOperator,
    BigQueryTableCheckOperator,
)
from datetime import datetime
from airflow.decorators import dag

@dag(dag_id='bigquery_operator_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def bigquery_examples():
    # Insert job
    insert_job = BigQueryInsertJobOperator(
        task_id='insert_job',
        configuration={
            'query': {
                'query': '''
                    SELECT * FROM `project.dataset.table`
                    WHERE date = '{{ ds }}'
                ''',
                'useLegacySql': False,
            }
        },
        location='US',
    )
    
    # Check operator
    check = BigQueryCheckOperator(
        task_id='check_data',
        sql='SELECT COUNT(*) FROM `project.dataset.table` WHERE date = "{{ ds }}"',
        use_legacy_sql=False,
    )
    
    # Value check
    value_check = BigQueryValueCheckOperator(
        task_id='value_check',
        sql='SELECT COUNT(*) FROM `project.dataset.table`',
        pass_value=1000,
        tolerance=0.1,
        use_legacy_sql=False,
    )
    
    insert_job >> check >> value_check

bigquery_examples()

ℹ️Pro Tip

When using BigQuery operators, always set use_legacy_sql=False to use Standard SQL. Legacy SQL has limited functionality and is deprecated.

7. Custom Operators

# custom_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any, Dict, Optional
import logging

class CustomDataProcessor(BaseOperator):
    """
    Custom operator for data processing.
    
    :param input_path: Path to input data
    :param output_path: Path to output data
    :param processing_options: Additional processing options
    """
    
    template_fields = ('input_path', 'output_path')  # Templatable fields
    template_ext = ('.sql', '.json')  # Template extensions
    
    @apply_defaults
    def __init__(
        self,
        input_path: str,
        output_path: str,
        processing_options: Optional[Dict[str, Any]] = None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.input_path = input_path
        self.output_path = output_path
        self.processing_options = processing_options or {}
    
    def execute(self, context):
        """Main execution logic"""
        self.log.info(f"Processing {self.input_path}")
        
        # Custom processing logic
        result = self._process_data()
        
        # Push result to XCom
        context['ti'].xcom_push(
            key='processing_result',
            value={
                'input': self.input_path,
                'output': self.output_path,
                'records': result['count']
            }
        )
        
        self.log.info(f"Processing complete: {result}")
        return result
    
    def _process_data(self) -> Dict[str, Any]:
        """Internal processing method"""
        # Implementation here
        return {'count': 1000, 'status': 'success'}
    
    def on_kill(self):
        """Cleanup on kill"""
        self.log.info("Custom operator killed, cleaning up...")

# Usage
custom_task = CustomDataProcessor(
    task_id='process_data',
    input_path='s3://bucket/input/{{ ds }}',
    output_path='s3://bucket/output/{{ ds }}',
    processing_options={'batch_size': 1000},
)

8. TaskFlow API (@task decorator)

# taskflow_api.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='taskflow_api_examples',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def taskflow_examples():
    @task
    def extract() -> List[Dict]:
        """Extract data - returns list of dicts"""
        return [{'id': 1, 'value': 'a'}, {'id': 2, 'value': 'b'}]
    
    @task
    def transform(data: List[Dict]) -> List[Dict]:
        """Transform data - receives and returns"""
        return [{'id': d['id'], 'value': d['value'].upper()} for d in data]
    
    @task
    def load(data: List[Dict]) -> bool:
        """Load data - receives and returns status"""
        # Load to target
        return True
    
    @task
    def notify(success: bool) -> None:
        """Notify - receives and returns nothing"""
        if success:
            print("Pipeline completed successfully")
    
    # Dependencies are automatic based on parameter names
    raw = extract()
    transformed = transform(raw)
    loaded = load(transformed)
    notify(loaded)

taskflow_examples()

ℹ️TaskFlow API Benefits

The TaskFlow API with @task decorator provides:

  1. Automatic XCom passing between tasks
  2. Type hints for better IDE support
  3. Cleaner, more Pythonic code
  4. Automatic serialization of return values

Real-World Scenarios

Scenario 1: Uber's Data Pipeline

# uber_pipeline.py
"""
Uber-style data pipeline:
- Ingest from multiple sources
- Process with custom logic
- Load to data warehouse
"""

from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='uber_ride_data_pipeline',
    schedule_interval='*/15 * * * *',  # Every 15 minutes
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['uber', 'real-time', 'production'],
)
def uber_pipeline():
    @task
    def extract_ride_data() -> List[Dict]:
        """Extract ride data from Kafka"""
        # Kafka consumer logic
        return [{'ride_id': '123', 'status': 'completed'}]
    
    @task
    def validate_data(data: List[Dict]) -> List[Dict]:
        """Validate data quality"""
        valid_data = []
        for record in data:
            if self._validate_record(record):
                valid_data.append(record)
        return valid_data
    
    @task
    def process_data(data: List[Dict]) -> Dict:
        """Process ride data"""
        # Complex processing logic
        return {
            'total_rides': len(data),
            'processed_at': datetime.now().isoformat()
        }
    
    @task
    def load_to_warehouse(processed: Dict) -> bool:
        """Load to data warehouse"""
        # BigQuery/Snowflake load
        return True
    
    @task
    def send_metrics(processed: Dict) -> None:
        """Send metrics to monitoring"""
        # Send to Datadog/Prometheus
        pass
    
    # Pipeline
    raw = extract_ride_data()
    validated = validate_data(raw)
    processed = process_data(validated)
    loaded = load_to_warehouse(processed)
    send_metrics(processed)

uber_pipeline()

Scenario 2: Amazon's S3 Processing

# amazon_s3_processing.py
"""
Amazon-style S3 processing pipeline:
- Wait for S3 object
- Process file
- Load to Redshift
"""

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from datetime import datetime

@dag(
    dag_id='amazon_s3_processing',
    schedule_interval='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['amazon', 's3', 'production'],
)
def s3_processing():
    # Wait for file
    wait_file = S3KeySensor(
        task_id='wait_for_file',
        bucket_name='data-lake',
        key='incoming/{{ ds }}/data.csv',
        poke_interval=30,
        timeout=3600,
    )
    
    @task
    def process_file() -> dict:
        """Process S3 file"""
        # Download and process
        return {'records': 1000}
    
    @task
    def load_to_redshift(data: dict) -> bool:
        """Load to Redshift"""
        # Redshift COPY command
        return True
    
    @task
    def archive_file() -> None:
        """Archive processed file"""
        # Move to archive bucket
        pass
    
    # Pipeline
    processed = process_file()
    loaded = load_to_redshift(processed)
    archive = archive_file()
    
    wait_file >> processed >> loaded >> archive

s3_processing()

Edge Cases

⚠️Common Pitfalls

  1. Operator Idempotency: Ensure operators produce the same result on re-run. Use unique identifiers for data operations.

  2. XCom Size Limits: XCom values are stored in the database. Keep payloads small (< 48KB). Use alternative storage for large data.

  3. Template Rendering: Use {{ ds }} for execution date, not {{ now() }}. Templates are rendered at execution time.

  4. Connection Reuse: Operators create new connections by default. Use connection pooling for high-frequency operations.

# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime

@dag(dag_id='edge_case_examples', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
    # Idempotent operation
    @task
    def idempotent_task():
        """Use unique run_id for idempotency"""
        # BAD: Always creates same file
        # with open('/tmp/output.csv', 'w') as f:
        #     f.write('data')
        
        # GOOD: Use run_id for unique file
        with open('/tmp/output_{{ run_id }}.csv', 'w') as f:
            f.write('data')
    
    # Template usage
    @task
    def template_task():
        """Use proper template variables"""
        # Execution date
        print(f"Execution date: {{ ds }}")
        
        # Previous execution date
        print(f"Previous: {{ prev_ds }}")
        
        # Dag run ID
        print(f"Run ID: {{ run_id }}")
    
    idempotent_task() >> template_task()

edge_cases()

QuizBox


Best Practices

# best_practices.py
"""
Operator Best Practices:

1. Choose the Right Operator:
   - Use built-in operators when available
   - Create custom operators for reusable logic
   - Use Sensors for waiting conditions

2. Idempotency:
   - Design operators to be idempotent
   - Use unique identifiers for data operations
   - Handle partial failures gracefully

3. Error Handling:
   - Set appropriate retry parameters
   - Use callbacks for notifications
   - Log meaningful messages

4. Performance:
   - Use connection pooling
   - Minimize XCom usage
   - Use appropriate timeouts

5. Security:
   - Use connections for credentials
   - Never hardcode secrets
   - Use variables for configuration
"""

ℹ️Amazon Interview Tip

At Amazon, they heavily use custom operators for their internal tools. When discussing operators, emphasize the importance of idempotency, error handling, and the ability to create reusable components. Also mention how they handle operator failures with automatic retries and alerting.


Summary

Operators are the building blocks of Airflow tasks. Key takeaways:

  1. Operators execute work, Sensors wait for conditions, Hooks connect to systems
  2. Use TaskFlow API for cleaner Python task code
  3. Create custom operators for reusable, complex logic
  4. Ensure idempotency in all operators
  5. Use proper error handling and retry mechanisms

For Amazon and Uber interviews, focus on:

  • Choosing the right operator for the job
  • Creating reusable custom operators
  • Handling failures gracefully
  • Performance optimization
  • Security best practices

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these operators before your interview.

Advertisement