πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

ML Pipelines: Kubeflow, Airflow, Prefect, Dagster

MLOpsML Pipeline Orchestration⭐ Premium

Advertisement

Interview Question (Hard) β€” Asked at: Google, Meta, Uber, Netflix, Airbnb

"Design an ML pipeline orchestration system that handles data validation, feature engineering, model training, evaluation, and deployment. How do you ensure idempotency, handle failures, and manage dependencies?"

ML Pipeline Architecture Overview

ML pipelines coordinate the end-to-end workflow of building, deploying, and maintaining machine learning models. A well-designed pipeline ensures reproducibility, scalability, and maintainability.

Pipeline Design Principles

  1. Idempotency: Same inputs produce same outputs
  2. Reproducibility: Pipelines can be re-run with deterministic results
  3. Modularity: Components are independent and composable
  4. Observability: Full visibility into pipeline execution
  5. Fault Tolerance: Graceful handling of failures

Pipeline Architecture Diagram

Architecture Diagram
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 ML Pipeline Orchestration                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚  Data    │───▢│ Feature  │───▢│ Training │───▢│ Evaluate β”‚ β”‚
β”‚  β”‚  Ingest  β”‚    β”‚ Compute  β”‚    β”‚ Pipeline β”‚    β”‚ Pipeline β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚       β”‚              β”‚              β”‚                β”‚         β”‚
β”‚       β–Ό              β–Ό              β–Ό                β–Ό         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ Schedule β”‚    β”‚ Trigger  β”‚    β”‚ Monitor  β”‚    β”‚ Deploy   β”‚ β”‚
β”‚  β”‚ (Cron)   β”‚    β”‚ (Event)  β”‚    β”‚ (Alerts) β”‚    β”‚ (Canary) β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    Pipeline State Store                  β”‚   β”‚
β”‚  β”‚              (PostgreSQL / Redis / etcd)                 β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Kubeflow Pipelines

KFP Pipeline Definition

from kfp import dsl
from kfp import compiler
from kfp.dsl import (
    Input, Output, Dataset, Model, Metrics,
    component, pipeline, Condition
)
from kfp import kubernetes
import json

@component(
    base_image="python:3.9",
    packages_to_install=[
        "pandas==2.0.0",
        "great-expectations==0.17.0",
        "pyarrow==12.0.0"
    ]
)
def data_validation(
    input_data: Input[Dataset],
    validated_data: Output[Dataset],
    report: Output[Dataset]
):
    """Validate incoming data quality."""
    import pandas as pd
    import great_expectations as ge
    from great_expectations.core import ExpectationSuite
    from pathlib import Path
    import json
    
    # Load data
    df = pd.read_parquet(input_data.path)
    
    # Create expectation suite
    suite = ExpectationSuite(expectation_suite_name="data_quality")
    
    # Define expectations
    expectations = [
        {"expectation_type": "expect_column_values_to_not_be_null", 
         "kwargs": {"column": "user_id"}},
        {"expectation_type": "expect_column_values_to_be_unique", 
         "kwargs": {"column": "user_id"}},
        {"expectation_type": "expect_column_values_to_be_between", 
         "kwargs": {"column": "amount", "min_value": 0, "max_value": 100000}},
        {"expectation_type": "expect_table_row_count_to_be_between", 
         "kwargs": {"min_value": 1000, "max_value": 10000000}},
    ]
    
    for exp in expectations:
        suite.add_expectation(exp)
    
    # Validate
    ge_df = ge.from_pandas(df)
    results = ge_df.validate(suite)
    
    # Save validated data
    df.to_parquet(validated_data.path, index=False)
    
    # Save validation report
    report_data = {
        "passed": results.success,
        "statistics": results.statistics,
        "results": [
            {
                "expectation": r.expectation_config.expectation_type,
                "success": r.success,
                "result": r.result
            }
            for r in results.results
        ]
    }
    
    with open(report.path, 'w') as f:
        json.dump(report_data, f, indent=2, default=str)

@component(
    base_image="python:3.9",
    packages_to_install=[
        "pandas==2.0.0",
        "scikit-learn==1.3.0",
        "feature-engine==1.6.0"
    ]
)
def feature_engineering(
    input_data: Input[Dataset],
    output_features: Output[Dataset],
    feature_config: dict
):
    """Transform raw data into ML features."""
    import pandas as pd
    from sklearn.preprocessing import StandardScaler, LabelEncoder
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    import joblib
    
    df = pd.read_parquet(input_data.path)
    
    # Define transformations based on config
    numeric_features = feature_config.get('numeric_features', [])
    categorical_features = feature_config.get('categorical_features', [])
    
    # Create preprocessing pipeline
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numeric_features),
            ('cat', LabelEncoder(), categorical_features)
        ]
    )
    
    # Fit and transform
    X = df.drop(columns=[feature_config['target']])
    y = df[feature_config['target']]
    
    X_processed = preprocessor.fit_transform(X)
    
    # Save processed features
    feature_names = (
        numeric_features + 
        [f"cat_{f}" for f in categorical_features]
    )
    
    output_df = pd.DataFrame(X_processed, columns=feature_names)
    output_df[feature_config['target']] = y.values
    
    output_df.to_parquet(output_features.path, index=False)
    
    # Save preprocessor
    joblib.dump(preprocessor, f"{output_features.path}_preprocessor.pkl")

@component(
    base_image="python:3.9",
    packages_to_install=[
        "xgboost==1.7.0",
        "scikit-learn==1.3.0",
        "mlflow==2.5.0"
    ]
)
def model_training(
    train_data: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
    hyperparams: dict
):
    """Train XGBoost model with MLflow tracking."""
    import pandas as pd
    import xgboost as xgb
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import roc_auc_score, precision_score, recall_score
    import mlflow
    import json
    
    mlflow.set_experiment("kfp_training")
    
    df = pd.read_parquet(train_data.path)
    
    target_col = hyperparams.get('target', 'label')
    X = df.drop(columns=[target_col])
    y = df[target_col]
    
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    with mlflow.start_run():
        # Log hyperparameters
        mlflow.log_params(hyperparams)
        
        # Train model
        dtrain = xgb.DMatrix(X_train, label=y_train)
        dval = xgb.DMatrix(X_val, label=y_val)
        
        params = {
            'objective': 'binary:logistic',
            'eval_metric': 'auc',
            'max_depth': hyperparams.get('max_depth', 6),
            'learning_rate': hyperparams.get('learning_rate', 0.1),
            'n_estimators': hyperparams.get('n_estimators', 100),
            'subsample': hyperparams.get('subsample', 0.8),
            'colsample_bytree': hyperparams.get('colsample_bytree', 0.8),
        }
        
        model_xgb = xgb.train(
            params,
            dtrain,
            num_boost_round=1000,
            evals=[(dval, 'val')],
            early_stopping_rounds=50,
            verbose_eval=100
        )
        
        # Evaluate
        val_pred = model_xgb.predict(dval)
        
        eval_metrics = {
            'auc_roc': float(roc_auc_score(y_val, val_pred)),
            'precision': float(precision_score(y_val, (val_pred > 0.5).astype(int))),
            'recall': float(recall_score(y_val, (val_pred > 0.5).astype(int))),
        }
        
        mlflow.log_metrics(eval_metrics)
        
        # Save model
        model_xgb.save_model(model.path)
        
        # Log metrics
        metrics.log_metrics(eval_metrics)

@component(
    base_image="python:3.9",
    packages_to_install=["requests==2.31.0"]
)
def model_evaluation(
    model: Input[Model],
    test_data: Input[Dataset],
    metrics: Output[Metrics],
    threshold_config: dict
):
    """Evaluate model against quality gates."""
    import pandas as pd
    import xgboost as xgb
    import json
    
    # Load test data
    test_df = pd.read_parquet(test_data.path)
    target_col = threshold_config.get('target', 'label')
    
    X_test = test_df.drop(columns=[target_col])
    y_test = test_df[target_col]
    
    # Load and run model
    model_xgb = xgb.Booster()
    model_xgb.load_model(model.path)
    
    dtest = xgb.DMatrix(X_test)
    predictions = model_xgb.predict(dtest)
    
    # Calculate metrics
    from sklearn.metrics import (
        roc_auc_score, precision_score, recall_score, f1_score
    )
    
    eval_metrics = {
        'auc_roc': float(roc_auc_score(y_test, predictions)),
        'precision': float(precision_score(y_test, (predictions > 0.5).astype(int))),
        'recall': float(recall_score(y_test, (predictions > 0.5).astype(int))),
        'f1_score': float(f1_score(y_test, (predictions > 0.5).astype(int))),
    }
    
    # Check thresholds
    thresholds = threshold_config.get('thresholds', {})
    gate_results = {}
    
    for metric_name, threshold in thresholds.items():
        if metric_name in eval_metrics:
            passed = eval_metrics[metric_name] >= threshold
            gate_results[f"{metric_name}_passed"] = passed
            if not passed:
                raise ValueError(
                    f"Quality gate failed: {metric_name}={eval_metrics[metric_name]:.4f} "
                    f"< {threshold}"
                )
    
    eval_metrics.update(gate_results)
    metrics.log_metrics(eval_metrics)

@dsl.pipeline(
    name='ml-training-pipeline',
    description='End-to-end ML training pipeline with validation',
    pipeline_root='gs://my-bucket/pipeline-root'
)
def ml_pipeline(
    data_path: str,
    hyperparams: dict,
    threshold_config: dict
):
    """Main ML pipeline definition."""
    
    # Step 1: Data validation
    validation_op = data_validation(input_data=data_path)
    
    # Step 2: Feature engineering
    feature_op = feature_engineering(
        input_data=validation_op.outputs['validated_data'],
        feature_config=hyperparams
    )
    
    # Step 3: Model training
    training_op = model_training(
        train_data=feature_op.outputs['output_features'],
        hyperparams=hyperparams
    )
    
    # Step 4: Model evaluation
    evaluation_op = model_evaluation(
        model=training_op.outputs['model'],
        test_data=validation_op.outputs['validated_data'],
        threshold_config=threshold_config
    )
    
    # Step 5: Conditional deployment
    with Condition(evaluation_op.outputs['metrics']['auc_roc'] > 0.95):
        deploy_op = kubernetes.load_artifact_from_uri(
            artifact_uri="deploy_model",
            uri=training_op.outputs['model'].uri
        )

# Compile pipeline
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path='ml_pipeline.yaml'
)

ℹ️

Kubeflow Pipelines run on Kubernetes and provide native support for GPU scheduling, distributed training, and experiment tracking. Use it for teams already invested in the Kubernetes ecosystem.

Apache Airflow

Airflow DAG for ML Pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
from docker.types import Mount
import json

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

def validate_data_quality(**context):
    """Check data quality before training."""
    import great_expectations as ge
    
    ti = context['ti']
    data_path = ti.xcom_pull(task_ids='data_ingestion')
    
    df = ge.read_csv(data_path)
    ge_df = ge.from_pandas(df)
    
    # Run validations
    results = ge_df.validate([
        {"expect_column_values_to_not_be_null": {"column": "user_id"}},
        {"expect_column_values_to_be_unique": {"column": "user_id"}},
        {"expect_column_values_to_be_between": {
            "column": "amount", "min_value": 0, "max_value": 100000
        }},
    ])
    
    if not results.success:
        raise ValueError("Data quality validation failed")
    
    return {"status": "passed", "rows": len(df)}

def check_model_performance(**context):
    """Check if model meets performance thresholds."""
    ti = context['ti']
    metrics = ti.xcom_pull(task_ids='model_training')
    
    thresholds = {
        'auc_roc': 0.90,
        'precision': 0.85,
        'recall': 0.80
    }
    
    for metric, threshold in thresholds.items():
        if metrics.get(metric, 0) < threshold:
            return 'notify_failure'
    
    return 'deploy_model'

with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='Production ML training pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'production'],
    max_active_runs=1,
) as dag:
    
    with TaskGroup('data_preparation') as data_prep:
        data_ingestion = DockerOperator(
            task_id='data_ingestion',
            image='registry.example.com/ml/data-ingest:latest',
            command='python ingest.py',
            environment={
                'DATA_SOURCE': '{{ var.value.data_source }}',
                'OUTPUT_PATH': '/data/raw/'
            },
            docker_url='unix://var/run/docker.sock',
            mount_tmp_folder=False,
            auto_remove=True,
        )
        
        data_validation = PythonOperator(
            task_id='data_validation',
            python_callable=validate_data_quality,
        )
        
        data_ingestion >> data_validation
    
    with TaskGroup('feature_engineering') as feature_eng:
        feature_computation = DockerOperator(
            task_id='feature_computation',
            image='registry.example.com/ml/feature-eng:latest',
            command='python compute_features.py',
            environment={
                'INPUT_PATH': '/data/raw/',
                'OUTPUT_PATH': '/data/features/'
            },
        )
        
        feature_validation = PostgresOperator(
            task_id='feature_validation',
            sql="""
                SELECT 
                    COUNT(*) as total_rows,
                    COUNT(DISTINCT user_id) as unique_users,
                    AVG(amount) as avg_amount
                FROM features_table
                WHERE date = '{{ ds }}'
            """,
            postgres_conn_id='ml_database'
        )
        
        feature_computation >> feature_validation
    
    with TaskGroup('model_training') as training:
        train_model = DockerOperator(
            task_id='train_model',
            image='registry.example.com/ml/training:latest',
            command='python train.py',
            environment={
                'FEATURES_PATH': '/data/features/',
                'MODEL_OUTPUT': '/models/',
                'EXPERIMENT_NAME': 'airflow_pipeline_{{ ds }}'
            },
            mount=[
                Mount(source='gpu-runtime', target='/usr/local/nvidia', type='volume')
            ],
        )
        
        evaluate_model = DockerOperator(
            task_id='evaluate_model',
            image='registry.example.com/ml/evaluation:latest',
            command='python evaluate.py',
        )
        
        train_model >> evaluate_model
    
    deploy_decision = BranchPythonOperator(
        task_id='deploy_decision',
        python_callable=check_model_performance,
    )
    
    deploy_model = DockerOperator(
        task_id='deploy_model',
        image='registry.example.com/ml/deployment:latest',
        command='python deploy.py --env production',
    )
    
    notify_failure = PostgresOperator(
        task_id='notify_failure',
        sql="""
            INSERT INTO pipeline_notifications 
            (pipeline_run, status, message, created_at)
            VALUES (
                '{{ run_id }}',
                'FAILED',
                'Model performance below threshold',
                NOW()
            )
        """,
        postgres_conn_id='ml_database'
    )
    
    # Define task dependencies
    data_prep >> feature_eng >> training >> deploy_decision
    deploy_decision >> [deploy_model, notify_failure]

Prefect

Prefect ML Flow

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.deployments import Deployment
from datetime import timedelta
from typing import Optional, Dict, List
import pandas as pd
import numpy as np
from pathlib import Path
import json
import hashlib

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    log_prints=True
)
def load_data(data_path: str) -> pd.DataFrame:
    """Load and validate input data."""
    logger = get_run_logger()
    
    df = pd.read_parquet(data_path)
    logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
    
    # Basic validation
    assert len(df) > 0, "DataFrame is empty"
    assert 'user_id' in df.columns, "Missing user_id column"
    
    return df

@task(retries=2, log_prints=True)
def compute_features(df: pd.DataFrame, config: dict) -> pd.DataFrame:
    """Compute features from raw data."""
    logger = get_run_logger()
    
    features = df.copy()
    
    # Numeric features
    for col in config.get('numeric_features', []):
        features[f'{col}_log'] = np.log1p(features[col].clip(lower=0))
        features[f'{col}_sqrt'] = np.sqrt(features[col].clip(lower=0))
    
    # Aggregation features
    if 'groupby' in config:
        group_col = config['groupby']['column']
        agg_funcs = config['groupby']['aggregations']
        
        for func_name, func in agg_funcs.items():
            grouped = features.groupby(group_col).agg({col: func})
            features = features.merge(
                grouped, 
                on=group_col, 
                suffixes=('', f'_{group_col}_{func_name}')
            )
    
    # Time features
    if 'timestamp_column' in config:
        ts_col = config['timestamp_column']
        features[ts_col] = pd.to_datetime(features[ts_col])
        features['hour'] = features[ts_col].dt.hour
        features['dayofweek'] = features[ts_col].dt.dayofweek
        features['month'] = features[ts_col].dt.month
    
    logger.info(f"Computed {len(features.columns)} features")
    
    return features

@task(retries=1, log_prints=True)
def train_model(features: pd.DataFrame, config: dict) -> dict:
    """Train model and return metrics."""
    logger = get_run_logger()
    
    import xgboost as xgb
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import roc_auc_score, precision_score, recall_score
    
    target = config['target']
    X = features.drop(columns=[target])
    y = features[target]
    
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    params = config.get('model_params', {})
    
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dval = xgb.DMatrix(X_val, label=y_val)
    
    model = xgb.train(
        params,
        dtrain,
        num_boost_round=1000,
        evals=[(dval, 'val')],
        early_stopping_rounds=50,
        verbose_eval=100
    )
    
    # Evaluate
    val_pred = model.predict(dval)
    
    metrics = {
        'auc_roc': float(roc_auc_score(y_val, val_pred)),
        'precision': float(precision_score(y_val, (val_pred > 0.5).astype(int))),
        'recall': float(recall_score(y_val, (val_pred > 0.5).astype(int))),
    }
    
    logger.info(f"Model metrics: {metrics}")
    
    # Save model
    model_path = Path(config['model_output']) / 'model.json'
    model.save_model(str(model_path))
    
    return {
        'metrics': metrics,
        'model_path': str(model_path),
        'feature_importance': dict(model.get_score())
    }

@task(retries=1, log_prints=True)
def evaluate_model(metrics: dict, config: dict) -> bool:
    """Evaluate model against quality gates."""
    logger = get_run_logger()
    
    thresholds = config.get('thresholds', {})
    
    for metric_name, threshold in thresholds.items():
        if metric_name in metrics:
            if metrics[metric_name] < threshold:
                logger.warning(
                    f"Quality gate failed: {metric_name}={metrics[metric_name]:.4f} "
                    f"< {threshold}"
                )
                return False
    
    logger.info("All quality gates passed")
    return True

@task(retries=2, log_prints=True)
def deploy_model(model_path: str, config: dict):
    """Deploy model to production."""
    logger = get_run_logger()
    
    import shutil
    
    # Copy model to deployment location
    deploy_path = Path(config['deploy_path'])
    deploy_path.mkdir(parents=True, exist_ok=True)
    
    shutil.copy(model_path, deploy_path / 'model.json')
    
    # Create deployment metadata
    metadata = {
        'deployed_at': datetime.now().isoformat(),
        'model_path': model_path,
        'config': config
    }
    
    with open(deploy_path / 'metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2, default=str)
    
    logger.info(f"Model deployed to {deploy_path}")

@flow(
    name="ml-training-pipeline",
    description="End-to-end ML training pipeline",
    log_prints=True,
    timeout_seconds=3600,
    retries=1,
    retry_delay_seconds=300
)
def ml_pipeline(
    data_path: str,
    config_path: str,
    deploy_path: str = "/models/production"
):
    """Main ML pipeline flow."""
    logger = get_run_logger()
    
    # Load configuration
    with open(config_path) as f:
        config = json.load(f)
    
    config['deploy_path'] = deploy_path
    
    # Execute pipeline
    df = load_data(data_path)
    features = compute_features(df, config)
    training_results = train_model(features, config)
    
    # Evaluate
    passed = evaluate_model(training_results['metrics'], config)
    
    if passed:
        deploy_model(training_results['model_path'], config)
        logger.info("Pipeline completed successfully")
    else:
        logger.error("Pipeline failed quality gates")
        raise ValueError("Model did not meet quality thresholds")
    
    return training_results

# Deploy as a scheduled flow
if __name__ == "__main__":
    deployment = Deployment.build_from_flow(
        flow=ml_pipeline,
        name="daily-training",
        parameters={
            "data_path": "s3://bucket/data/latest/",
            "config_path": "config/training_config.json"
        },
        schedule={
            "anchor_date": "2024-01-01T02:00:00",
            "interval": 86400  # Daily
        },
        work_queue_name="ml-queues",
        work_pool_name="ml-pools"
    )
    deployment.apply()

ℹ️

Prefect 2.0 provides a Python-native orchestration experience with dynamic workflows, task caching, and automatic retry logic. It's ideal for teams wanting flexible, code-first pipeline definitions.

Dagster

Dagster ML Assets

from dagster import (
    asset, AssetMaterialization, Output, MetadataValue,
    schedule, ScheduleDefinition, job, op, In, Out,
    RetryPolicy, Config, DagsterInstance
)
from dagstermill import define_dagstermill_asset
from dagster_aws.s3 import S3Resource
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from typing import Dict, Any

class TrainingConfig(Config):
    data_path: str
    model_name: str
    target_column: str
    hyperparameters: Dict[str, Any]
    thresholds: Dict[str, float]

@asset(
    name="raw_data",
    group_name="data_ingestion",
    retry_policy=RetryPolicy(max_retries=3, delay=timedelta(minutes=5)),
    io_manager_key="s3_io_manager"
)
def raw_data(context, config: TrainingConfig) -> pd.DataFrame:
    """Load raw training data from S3."""
    context.log.info(f"Loading data from {config.data_path}")
    
    df = pd.read_parquet(config.data_path)
    
    context.log_output_event(
        MetadataValue.int(len(df)),
        description="Number of rows loaded"
    )
    
    context.emit_event(
        asset_key="raw_data",
        event_type="ASSET_MATERIALIZED",
        metadata={
            "rows": MetadataValue.int(len(df)),
            "columns": MetadataValue.int(len(df.columns)),
        }
    )
    
    return df

@asset(
    name="validated_data",
    group_name="data_validation",
    deps=["raw_data"],
    io_manager_key="s3_io_manager"
)
def validated_data(context, raw_data: pd.DataFrame) -> pd.DataFrame:
    """Validate data quality."""
    df = raw_data.copy()
    
    # Validation checks
    validations = [
        ("No null user_ids", df['user_id'].notna().all()),
        ("Unique user_ids", df['user_id'].is_unique),
        ("Amount in valid range", (df['amount'] >= 0).all() and (df['amount'] <= 100000).all()),
        ("Minimum rows", len(df) >= 1000),
    ]
    
    for check_name, result in validations:
        if not result:
            raise ValueError(f"Validation failed: {check_name}")
        context.log.info(f"βœ“ {check_name}")
    
    return df

@asset(
    name="features",
    group_name="feature_engineering",
    deps=["validated_data"],
    io_manager_key="s3_io_manager"
)
def features(context, validated_data: pd.DataFrame, config: TrainingConfig) -> pd.DataFrame:
    """Compute features from validated data."""
    df = validated_data.copy()
    
    # Feature transformations
    df['log_amount'] = np.log1p(df['amount'])
    df['amount_squared'] = df['amount'] ** 2
    
    # Rolling features
    df['rolling_mean_7d'] = df.groupby('user_id')['amount'].transform(
        lambda x: x.rolling(7).mean()
    )
    df['rolling_std_7d'] = df.groupby('user_id')['amount'].transform(
        lambda x: x.rolling(7).std()
    )
    
    context.log.info(f"Computed {len(df.columns)} features")
    
    return df

@asset(
    name="trained_model",
    group_name="model_training",
    deps=["features"],
    io_manager_key="s3_io_manager",
    retry_policy=RetryPolicy(max_retries=1)
)
def trained_model(context, features: pd.DataFrame, config: TrainingConfig) -> dict:
    """Train XGBoost model."""
    import xgboost as xgb
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import roc_auc_score
    
    X = features.drop(columns=[config.target_column])
    y = features[config.target_column]
    
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dval = xgb.DMatrix(X_val, label=y_val)
    
    model = xgb.train(
        config.hyperparameters,
        dtrain,
        num_boost_round=1000,
        evals=[(dval, 'val')],
        early_stopping_rounds=50
    )
    
    # Evaluate
    val_pred = model.predict(dval)
    auc_roc = float(roc_auc_score(y_val, val_pred))
    
    context.log.info(f"Model AUC-ROC: {auc_roc:.4f}")
    
    # Check threshold
    if auc_roc < config.thresholds.get('auc_roc', 0.90):
        raise ValueError(f"AUC-ROC {auc_roc:.4f} below threshold")
    
    return {
        'model': model,
        'metrics': {'auc_roc': auc_roc},
        'feature_names': list(X.columns)
    }

@asset(
    name="model_registry_entry",
    group_name="deployment",
    deps=["trained_model"],
    io_manager_key="s3_io_manager"
)
def model_registry_entry(context, trained_model: dict, config: TrainingConfig):
    """Register model in model registry."""
    import json
    
    entry = {
        'model_name': config.model_name,
        'version': datetime.now().strftime('%Y%m%d%H%M%S'),
        'metrics': trained_model['metrics'],
        'feature_names': trained_model['feature_names'],
        'hyperparameters': config.hyperparameters,
        'created_at': datetime.now().isoformat(),
    }
    
    context.log.info(f"Registered model: {entry['version']}")
    
    return entry

# Define schedules
daily_schedule = ScheduleDefinition(
    job=ml_job,
    cron_schedule="0 2 * * *",  # Daily at 2 AM
    default_status=ScheduleDefinition.DefaultStatus.RUNNING
)

# Define jobs
ml_job = ml_job.to_job(
    name="daily_ml_pipeline",
    config=TrainingConfig(
        data_path="s3://bucket/data/latest/",
        model_name="fraud_detection",
        target_column="is_fraud",
        hyperparameters={
            'objective': 'binary:logistic',
            'eval_metric': 'auc',
            'max_depth': 6,
            'learning_rate': 0.1,
        },
        thresholds={
            'auc_roc': 0.90
        }
    )
)

Pipeline Comparison Matrix

FeatureKubeflowAirflowPrefectDagster
OrchestrationKubernetes-nativeWorker-basedHybridIn-process
SchedulingCronCron/SensorsIntervalCron/Sensors
UIBuilt-inBuilt-inCloud/HelmBuilt-in
CachingComponent-levelXCom-basedTask-levelAsset-level
GPU SupportNativeDocker/K8sDockerDocker/K8s
ScalabilityExcellentGoodGoodGood
Learning CurveSteepModerateEasyModerate
Best ForK8s shopsComplex workflowsPython teamsData-centric

⚠️

Choose your orchestrator based on your team's expertise and infrastructure. Kubeflow excels for Kubernetes-native ML, Airflow for complex dependencies, Prefect for Python-first teams, and Dagster for data-centric workflows.

Summary

ML pipeline orchestration is critical for production ML:

  1. Kubeflow: Best for Kubernetes-native ML with GPU support
  2. Airflow: Ideal for complex multi-step workflows with dependencies
  3. Prefect: Python-native with dynamic workflows and caching
  4. Dagster: Data-centric with asset-based orchestration

Implement proper idempotency, monitoring, and error handling for production reliability.

Advertisement