πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Topic: Airflow TaskFlow API

Apache Airflow AdvancedTaskFlow API⭐ Premium

Advertisement

Airflow TaskFlow API

Modern Pythonic DAG Development

GoogleUberDifficulty: Intermediate

Interview Question

ℹ️Interview Context

Company: Google / Uber Role: Data Engineer / Software Engineer Difficulty: Intermediate Time: 30-45 minutes

Question: "Explain the TaskFlow API in Airflow. How does it differ from traditional PythonOperator? When would you use TaskFlow API vs traditional operators?"


Detailed Theory

TaskFlow API Fundamentals

# taskflow_fundamentals.py
"""
TaskFlow API (Airflow 2.0+):

Key Features:
- @task decorator for defining tasks
- Automatic XCom passing between tasks
- Type hints for better IDE support
- Cleaner, more Pythonic code
- Dynamic task generation with .expand()

Benefits:
- Less boilerplate code
- Automatic serialization
- Better error messages
- Easier testing
"""

1. Basic TaskFlow API Usage

# basic_taskflow.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any, List

@dag(
    dag_id='basic_taskflow',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
)
def basic_taskflow():
    @task
    def extract() -> Dict[str, Any]:
        """Extract data"""
        return {
            'records': [
                {'id': 1, 'value': 'a'},
                {'id': 2, 'value': 'b'},
            ]
        }
    
    @task
    def transform(data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform data - receives from extract"""
        transformed = [
            {'id': r['id'], 'value': r['value'].upper()}
            for r in data['records']
        ]
        return {'records': transformed}
    
    @task
    def load(data: Dict[str, Any]) -> bool:
        """Load data - receives from transform"""
        # Load to target
        return True
    
    @task
    def notify(success: bool) -> None:
        """Notify - receives from load"""
        if success:
            print("Pipeline completed successfully")
    
    # Dependencies are automatic based on parameter names
    raw = extract()
    transformed = transform(raw)
    loaded = load(transformed)
    notify(loaded)

basic_taskflow()

ℹ️Automatic XCom

The TaskFlow API automatically passes return values between tasks via XCom. You don't need to manually push/pull XCom values.

2. Task Parameters and Configuration

# task_parameters.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any

@dag(
    dag_id='task_parameters',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def task_parameters():
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=30),
        execution_timeout=timedelta(hours=1),
        on_failure_callback=task_failure_callback,
        on_retry_callback=task_retry_callback,
        on_success_callback=task_success_callback,
        pool='my_pool',
        priority_weight=10,
        weights_downstream=2,
    )
    def configured_task() -> Dict[str, Any]:
        """Task with full configuration"""
        return {'status': 'success'}
    
    @task(
        task_id='custom_task_id',  # Custom task ID
        queue='my_queue',  # Specific queue
        executor_config={
            "KubernetesExecutor": {
                "request_memory": "2Gi",
                "limit_memory": "4Gi",
            }
        },
    )
    def custom_configured_task() -> Dict[str, Any]:
        """Task with custom configuration"""
        return {'status': 'custom'}
    
    # Dependencies
    result1 = configured_task()
    result2 = custom_configured_task()

task_parameters()

3. Multiple Return Values

# multiple_returns.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Tuple, Dict, Any

@dag(
    dag_id='multiple_returns',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def multiple_returns():
    @task
    def process() -> Tuple[Dict[str, Any], Dict[str, Any]]:
        """Return multiple values as tuple"""
        return (
            {'status': 'success', 'count': 100},
            {'metrics': {'time': 1.5, 'memory': 1024}}
        )
    
    @task
    def handle_results(
        status: Dict[str, Any],
        metrics: Dict[str, Any]
    ) -> None:
        """Receive multiple values as separate parameters"""
        print(f"Status: {status}")
        print(f"Metrics: {metrics}")
    
    # Multiple returns are passed as separate arguments
    status, metrics = process()
    handle_results(status, metrics)

multiple_returns()

4. Dynamic Task Generation with .expand()

# dynamic_tasks.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='dynamic_task_generation',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def dynamic_generation():
    @task
    def get_items() -> List[str]:
        """Get list of items to process"""
        return ['item1', 'item2', 'item3', 'item4']
    
    @task
    def process_item(item: str) -> Dict[str, str]:
        """Process individual item"""
        return {
            'item': item,
            'status': 'processed',
            'result': f'{item}_result'
        }
    
    @task
    def aggregate_results(results: List[Dict[str, str]]) -> Dict[str, Any]:
        """Aggregate all results"""
        return {
            'total': len(results),
            'results': results
        }
    
    # Dynamic task generation
    items = get_items()
    processed = process_item.expand(items)
    aggregated = aggregate_results(processed)

dynamic_generation()

ℹ️Pro Tip

Use .expand() for dynamic task generation. It's cleaner than using PythonOperator with dynamic Python code. Each task is independent and can run in parallel.

5. Cross-Task Dependencies

# cross_task_deps.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import Dict, Any

@dag(
    dag_id='cross_task_dependencies',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def cross_deps():
    @task
    def step_a() -> Dict[str, Any]:
        """First step"""
        return {'a': 'result_a'}
    
    @task
    def step_b() -> Dict[str, Any]:
        """Second step"""
        return {'b': 'result_b'}
    
    @task
    def step_c(a: Dict[str, Any]) -> Dict[str, Any]:
        """Depends on step_a"""
        return {'c': f"From A: {a}"}
    
    @task
    def step_d(b: Dict[str, Any]) -> Dict[str, Any]:
        """Depends on step_b"""
        return {'d': f"From B: {b}"}
    
    @task
    def final(c: Dict[str, Any], d: Dict[str, Any]) -> None:
        """Depends on step_c and step_d"""
        print(f"C: {c}")
        print(f"D: {d}")
    
    # Parallel steps
    a = step_a()
    b = step_b()
    
    # Dependent steps
    c = step_c(a)
    d = step_d(b)
    
    # Final aggregation
    final(c, d)

cross_deps()

6. TaskGroups with TaskFlow

# taskgroups_taskflow.py
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from typing import Dict, Any

@dag(
    dag_id='taskgroups_taskflow',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def taskgroups_example():
    @task
    def start() -> Dict[str, Any]:
        """Start task"""
        return {'status': 'started'}
    
    @task
    def end(status: Dict[str, Any]) -> None:
        """End task"""
        print(f"Pipeline completed: {status}")
    
    with TaskGroup(group_id='processing') as processing:
        @task
        def extract() -> Dict[str, Any]:
            """Extract data"""
            return {'data': 'raw'}
        
        @task
        def transform(data: Dict[str, Any]) -> Dict[str, Any]:
            """Transform data"""
            return {'data': 'transformed'}
        
        @task
        def load(data: Dict[str, Any]) -> Dict[str, Any]:
            """Load data"""
            return {'status': 'loaded'}
        
        # Internal dependencies
        raw = extract()
        transformed = transform(raw)
        loaded = load(transformed)
    
    # Main dependencies
    started = start()
    loaded = processing()
    end(loaded)

taskgroups_example()

7. Dynamic Task Mapping

# dynamic_mapping.py
from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='dynamic_task_mapping',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
)
def dynamic_mapping():
    @task
    def get_configs() -> List[Dict[str, str]]:
        """Get list of configurations"""
        return [
            {'table': 'users', 'schema': 'public'},
            {'table': 'orders', 'schema': 'public'},
            {'table': 'products', 'schema': 'catalog'},
        ]
    
    @task
    def process_config(config: Dict[str, str]) -> Dict[str, str]:
        """Process individual configuration"""
        return {
            'table': config['table'],
            'rows': 1000,
            'status': 'processed'
        }
    
    @task
    def validate_results(results: List[Dict[str, str]]) -> bool:
        """Validate all results"""
        return all(r['status'] == 'processed' for r in results)
    
    # Dynamic mapping
    configs = get_configs()
    results = process_config.expand(configs)
    valid = validate_results(results)

dynamic_mapping()

⚠️Important

When using .expand(), ensure the function accepts a single item, not a list. The .expand() method handles the iteration automatically.


Real-World Scenarios

Scenario 1: Google's Data Pipeline

# google_pipeline.py
"""
Google-style data pipeline:
- Dynamic task generation
- Parallel processing
- Quality checks
"""

from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='google_data_pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['google', 'production'],
)
def google_pipeline():
    @task
    def get_sources() -> List[Dict[str, str]]:
        """Get list of data sources"""
        return [
            {'name': 'bigquery', 'project': 'my-project'},
            {'name': 'gcs', 'bucket': 'my-bucket'},
            {'name': 'pubsub', 'topic': 'my-topic'},
        ]
    
    @task
    def extract_source(source: Dict[str, str]) -> Dict[str, any]:
        """Extract from individual source"""
        # Source-specific extraction logic
        return {
            'source': source['name'],
            'records': 1000,
            'status': 'extracted'
        }
    
    @task
    def transform_data(data: Dict[str, any]) -> Dict[str, any]:
        """Transform extracted data"""
        return {
            'source': data['source'],
            'records': data['records'],
            'status': 'transformed'
        }
    
    @task
    def validate_data(data: Dict[str, any]) -> bool:
        """Validate transformed data"""
        return data['status'] == 'transformed'
    
    @task
    def load_to_bigquery(results: List[Dict[str, any]]) -> bool:
        """Load all results to BigQuery"""
        # BigQuery load logic
        return True
    
    # Pipeline
    sources = get_sources()
    extracted = extract_source.expand(sources)
    transformed = transform_data.expand(extracted)
    validated = validate_data.expand(transformed)
    loaded = load_to_bigquery(transformed)

google_pipeline()

Scenario 2: Uber's Real-time Pipeline

# uber_pipeline.py
"""
Uber-style real-time pipeline:
- Process streaming data
- Dynamic parallel processing
- Aggregation
"""

from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict

@dag(
    dag_id='uber_realtime_pipeline',
    schedule_interval='*/5 * * * *',  # Every 5 minutes
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['uber', 'real-time', 'production'],
)
def uber_pipeline():
    @task
    def get_partitions() -> List[str]:
        """Get time partitions to process"""
        return ['2024-01-01-00', '2024-01-01-05', '2024-01-01-10']
    
    @task
    def process_partition(partition: str) -> Dict[str, any]:
        """Process individual partition"""
        return {
            'partition': partition,
            'records': 500,
            'status': 'processed'
        }
    
    @task
    def aggregate_partitions(results: List[Dict[str, any]]) -> Dict[str, any]:
        """Aggregate all partition results"""
        total_records = sum(r['records'] for r in results)
        return {
            'total_partitions': len(results),
            'total_records': total_records,
            'status': 'aggregated'
        }
    
    @task
    def send_metrics(metrics: Dict[str, any]) -> None:
        """Send metrics to monitoring"""
        # Send to Datadog/Prometheus
        pass
    
    # Pipeline
    partitions = get_partitions()
    processed = process_partition.expand(partitions)
    aggregated = aggregate_partitions(processed)
    send_metrics(aggregated)

uber_pipeline()

Edge Cases

⚠️Common Pitfalls

  1. Type Hints: Use proper type hints for better IDE support and error detection.

  2. Return Values: Ensure return values are JSON-serializable for XCom storage.

  3. Task Naming: Task IDs are derived from function names. Use task_id for custom names.

  4. Dependencies: Dependencies are automatic based on parameter names. Be careful with variable names.

# edge_cases.py
from airflow.decorators import dag, task
from datetime import datetime

@dag(dag_id='taskflow_edge_cases', start_date=datetime(2024, 1, 1), schedule_interval='@daily')
def edge_cases():
    # Type hints issue
    @task
    def type_hint_issue(data: dict) -> dict:
        """Use proper type hints"""
        return {'processed': True}
    
    # Return value issue
    @task
    def return_value_issue():
        """BAD: Return non-serializable object"""
        import pandas as pd
        # return pd.DataFrame(...)  # Not serializable!
        
        # GOOD: Return serializable object
        return {'data': [1, 2, 3]}
    
    # Task naming issue
    @task(task_id='custom_name')
    def custom_task_name() -> dict:
        """Custom task name"""
        return {'status': 'custom'}
    
    return_value_issue() >> custom_task_name()

edge_cases()

QuizBox


Best Practices

# best_practices.py
"""
TaskFlow API Best Practices:

1. Type Hints:
   - Use proper type hints
   - Document return types
   - Use typing module

2. Return Values:
   - Return JSON-serializable objects
   - Keep payloads small
   - Return meaningful data

3. Task Naming:
   - Use descriptive function names
   - Use task_id for custom names
   - Follow naming conventions

4. Dependencies:
   - Use parameter names for dependencies
   - Be careful with variable names
   - Document dependencies

5. Testing:
   - Test tasks independently
   - Mock external dependencies
   - Use pytest fixtures
"""

ℹ️Google Interview Tip

At Google, they emphasize clean, maintainable code. When discussing TaskFlow API, highlight the benefits of automatic XCom passing, type hints, and cleaner code. Also mention how dynamic task generation with .expand() simplifies parallel processing.


Summary

TaskFlow API is the modern way to write Python tasks in Airflow. Key takeaways:

  1. @task decorator simplifies task definition
  2. Automatic XCom passing eliminates boilerplate
  3. .expand() enables dynamic task generation
  4. Type hints improve code quality
  5. Cleaner code is more maintainable

For Google and Uber interviews, focus on:

  • Benefits over PythonOperator
  • Dynamic task generation patterns
  • Proper type hints and serialization
  • Testing strategies
  • Production best practices

This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.

Advertisement