πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Time Series Forecasting at Scale (Prophet + LightGBM + Airflow)

AI/ML ProjectsTime Series⭐ Premium

Advertisement

Time Series Forecasting at Scale

Prophet + LightGBM + Airflow | Automated ML Pipeline

Advanced14+ HoursProduction-Ready

Project Overview

Problem Statement

Businesses need accurate demand forecasting, revenue predictions, and capacity planning. Time series data has unique patterns (trends, seasonality, holidays) that require specialized models. This pipeline combines Prophet for trend/seasonality with LightGBM for feature-driven forecasting, orchestrated by Airflow.

Objectives

  • Build ensemble forecasting with Prophet + LightGBM
  • Implement automated feature engineering for time series
  • Create Airflow DAGs for scheduled retraining
  • Deploy with A/B testing between model versions
  • Monitor for concept drift and data quality
ComponentTechnology
Statistical ModelFacebook Prophet
ML ModelLightGBM + XGBoost
OrchestrationApache Airflow
Data StorageBigQuery + Parquet
Feature StoreFeast
API FrameworkFastAPI
MonitoringEvidently AI + Grafana
Experiment TrackingMLflow

Architecture Diagram

Architecture Diagram
+-------------------------------------------------------------------+
|          Time Series Forecasting Pipeline Architecture            |
+-------------------------------------------------------------------+
|  +--------------+    +--------------+    +------------------+     |
|  | Time Series  |--->| Feature      |--->| Model Training   |     |
|  | Data (BQ)    |    | Engineering  |    | (Prophet + LGBM) |     |
+--------------+    +--------------+    +--------+---------+     |
|                                                  |               |
|                                                  v               |
|  +--------------+    +--------------+    +------------------+     |
|  |  Forecast    |<---|  Airflow     |<---|  Model Registry  |     |
|  |  API         |    |  Scheduler   |    |  (MLflow)        |     |
|  +--------------+    +--------------+    +------------------+     |
|        |                                                   |     |
|        v                                                   v     |
|  +--------------+    +--------------+    +------------------+     |
|  |  Business    |    |  Drift       |    |  Retraining      |     |
|  |  Dashboard   |    |  Detection   |    |  Trigger         |     |
|  +--------------+    +--------------+    +------------------+     |
+-------------------------------------------------------------------+

Step-by-Step Implementation

Step 1: Environment Setup

mkdir ts-forecasting && cd ts-forecasting
pip install prophet lightgbm xgboost
pip install apache-airflow airflow-providers-google
pip install feast mlflow evidently
pip install fastapi uvicorn pandas numpy
pip install statsmodels scipy

Step 2: Feature Engineering for Time Series

Build comprehensive feature engineering that captures temporal patterns, lag features, and external regressors.

# src/features/time_features.py
import pandas as pd
import numpy as np
from typing import List, Optional
from dataclasses import dataclass


@dataclass
class FeatureConfig:
    target_col: str
    date_col: str
    lag_periods: List[int] = None
    rolling_windows: List[int] = None
    holiday_dates: Optional[List[str]] = None

    def __post_init__(self):
        if self.lag_periods is None:
            self.lag_periods = [1, 7, 14, 28, 90]
        if self.rolling_windows is None:
            self.rolling_windows = [7, 14, 28, 90]


class TimeSeriesFeatureEngineer:
    def __init__(self, config: FeatureConfig):
        self.config = config

    def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        df[self.config.date_col] = pd.to_datetime(df[self.config.date_col])
        df = df.sort_values(self.config.date_col).reset_index(drop=True)

        # Calendar features
        df["day_of_week"] = df[self.config.date_col].dt.dayofweek
        df["month"] = df[self.config.date_col].dt.month
        df["quarter"] = df[self.config.date_col].dt.quarter
        df["year"] = df[self.config.date_col].dt.year
        df["is_weekend"] = (df["day_of_week"] >= 5).astype(int)
        df["day_of_month"] = df[self.config.date_col].dt.day
        df["week_of_year"] = df[self.config.date_col].dt.isocalendar().week.astype(int)

        # Lag features
        for lag in self.config.lag_periods:
            df[f"lag_{lag}"] = df[self.config.target_col].shift(lag)

        # Rolling statistics
        for window in self.config.rolling_windows:
            df[f"rolling_mean_{window}"] = df[self.config.target_col].rolling(window=window).mean()
            df[f"rolling_std_{window}"] = df[self.config.target_col].rolling(window=window).std()
            df[f"rolling_min_{window}"] = df[self.config.target_col].rolling(window=window).min()
            df[f"rolling_max_{window}"] = df[self.config.target_col].rolling(window=window).max()
            df[f"rolling_median_{window}"] = df[self.config.target_col].rolling(window=window).median()

        # Exponentially weighted features
        for span in [7, 14, 28]:
            df[f"ewm_mean_{span}"] = df[self.config.target_col].ewm(span=span).mean()

        # Diff features
        df["diff_1"] = df[self.config.target_col].diff(1)
        df["diff_7"] = df[self.config.target_col].diff(7)

        # Percentage change
        df["pct_change_1"] = df[self.config.target_col].pct_change(1)
        df["pct_change_7"] = df[self.config.target_col].pct_change(7)

        # Holiday features
        if self.config.holiday_dates:
            holidays = pd.to_datetime(self.config.holiday_dates)
            df["is_holiday"] = df[self.config.date_col].isin(holidays).astype(int)
            df["days_to_holiday"] = df[self.config.date_col].apply(
                lambda x: min([abs((x - h).days) for h in holidays]) if holidays.any() else 0
            )

        return df.dropna()

    def create_prophet_features(self, df: pd.DataFrame) -> pd.DataFrame:
        prophet_df = df[[self.config.date_col, self.config.target_col]].copy()
        prophet_df.columns = ["ds", "y"]
        return prophet_df

Step 3: Prophet + LightGBM Ensemble

Combine Prophet for trend/seasonality with LightGBM for feature-driven corrections.

# src/models/ensemble.py
import pandas as pd
import numpy as np
from prophet import Prophet
import lightgbm as lgb
from typing import Dict, Tuple, Optional
import mlflow
from dataclasses import dataclass


@dataclass
class EnsembleConfig:
    prophet_seasonality: str = "monthly"
    lgbm_params: Dict = None
    ensemble_method: str = "weighted"  # weighted, stacking
    prophet_weight: float = 0.4
    lgbm_weight: float = 0.6

    def __post_init__(self):
        if self.lgbm_params is None:
            self.lgbm_params = {
                "objective": "regression",
                "metric": "rmse",
                "num_leaves": 31,
                "learning_rate": 0.05,
                "feature_fraction": 0.8,
                "bagging_fraction": 0.8,
                "bagging_freq": 5,
                "verbose": -1,
            }


class ProphetLightGBMEnsemble:
    def __init__(self, config: EnsembleConfig = None):
        self.config = config or EnsembleConfig()
        self.prophet_model = None
        self.lgbm_model = None

    def fit(self, df: pd.DataFrame, target_col: str, date_col: str):
        # Split for LGBM feature-based predictions
        feature_cols = [c for c in df.columns if c not in [target_col, date_col, "ds", "y"]]

        # Fit Prophet
        prophet_df = df[[date_col, target_col]].rename(columns={date_col: "ds", target_col: "y"})
        self.prophet_model = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=True,
            daily_seasonality=False,
            changepoint_prior_scale=0.05,
        )
        self.prophet_model.fit(prophet_df)

        # Get Prophet predictions as feature for LGBM
        prophet_pred = self.prophet_model.predict(prophet_df)["yhat"].values

        # Fit LightGBM with Prophet predictions as additional feature
        X = df[feature_cols].copy()
        X["prophet_pred"] = prophet_pred
        y = df[target_col].values

        train_data = lgb.Dataset(X, label=y)
        self.lgbm_model = lgb.train(
            self.config.lgbm_params, train_data, num_boost_round=1000,
            valid_sets=[train_data], callbacks=[lgb.log_evaluation(100)],
        )
        self.feature_cols = feature_cols

    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        # Prophet forecast
        future = self.prophet_model.make_future_dataframe(periods=len(df))
        prophet_forecast = self.prophet_model.predict(future)

        # LGBM prediction
        X = df[self.feature_cols].copy()
        X["prophet_pred"] = prophet_forecast["yhat"].values[-len(df):]
        lgbm_pred = self.lgbm_model.predict(X)

        # Ensemble
        ensemble_pred = (
            self.config.prophet_weight * prophet_forecast["yhat"].values[-len(df):]
            + self.config.lgbm_weight * lgbm_pred
        )

        return pd.DataFrame({
            "date": df["date"].values,
            "prophet_pred": prophet_forecast["yhat"].values[-len(df):],
            "lgbm_pred": lgbm_pred,
            "ensemble_pred": ensemble_pred,
            "prophet_lower": prophet_forecast["yhat_lower"].values[-len(df):],
            "prophet_upper": prophet_forecast["yhat_upper"].values[-len(df):],
        })

Step 4: Airflow DAG for Automated Retraining

# dags/forecast_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["ml-alerts@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
}


def extract_data(**context):
    """Extract latest data from BigQuery."""
    from google.cloud import bigquery
    client = bigquery.Client()
    query = "SELECT * FROM `project.dataset.time_series_data` WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)"
    df = client.query(query).to_dataframe()
    df.to_parquet("/tmp/ts_data.parquet")
    return f"Extracted {len(df)} rows"


def train_models(**context):
    """Train Prophet + LightGBM ensemble."""
    import pandas as pd
    from src.models.ensemble import ProphetLightGBMEnsemble, EnsembleConfig
    import mlflow

    df = pd.read_parquet("/tmp/ts_data.parquet")
    config = EnsembleConfig()
    ensemble = ProphetLightGBMEnsemble(config)

    with mlflow.start_run():
        ensemble.fit(df, target_col="value", date_col="date")
        mlflow.prophet.log_model(ensemble.prophet_model, "prophet_model")
        mlflow.lightgbm.log_model(ensemble.lgbm_model, "lgbm_model")
    return "Training complete"


def evaluate_and_register(**context):
    """Evaluate and register model if better than current."""
    from src.monitoring.drift import DriftDetector
    detector = DriftDetector()
    is_better = detector.compare_models("current", "new")
    if is_better:
        detector.register_model("new")
        return "Model registered"
    return "Model not registered - no improvement"


with DAG(
    "time_series_forecast",
    default_args=default_args,
    description="Automated time series forecasting pipeline",
    schedule_interval="0 2 * * 1",  # Weekly on Monday at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(task_id="extract_data", python_callable=extract_data)
    train = PythonOperator(task_id="train_models", python_callable=train_models)
    evaluate = PythonOperator(task_id="evaluate_register", python_callable=evaluate_and_register)
    deploy = BashOperator(task_id="deploy_model", bash_command="python -m src.deploy.push_model")

    extract >> train >> evaluate >> deploy

Step 5: FastAPI Serving

# src/api/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
from contextlib import asynccontextmanager
import pandas as pd

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load models at startup
    app.state.ensemble = load_ensemble("production")
    yield

app = FastAPI(title="Forecast API", lifespan=lifespan)

class ForecastRequest(BaseModel):
    historical_data: List[dict]
    horizon: int = 30
    include_confidence: bool = True

class ForecastResponse(BaseModel):
    forecasts: List[dict]
    model_version: str

@app.post("/api/v1/forecast", response_model=ForecastResponse)
async def forecast(request: ForecastRequest):
    df = pd.DataFrame(request.historical_data)
    predictions = app.state.ensemble.predict(df)
    return ForecastResponse(
        forecasts=predictions.to_dict(orient="records"),
        model_version="v1.0.0",
    )

ℹ️

Always include confidence intervals in forecasts. Point predictions alone are insufficient for business decision-making. Prophet provides natural uncertainty intervals.

πŸ’‘

Monitor forecast accuracy by horizon length. Short-term forecasts (1-7 days) should be much more accurate than long-term (30+ days). Set different alerting thresholds per horizon.

Performance Metrics

MetricTargetDescription
MAPE< 10%Mean Absolute Percentage Error
RMSEBaseline -20%vs simple baseline
Coverage> 90%95% CI actual coverage
Retraining Latency< 30minFull pipeline
Inference Latency< 100msPer forecast request

Interview Talking Points

  1. Ensemble Strategy: Prophet captures trend and seasonality, LightGBM handles feature interactions and non-linear patterns.
  2. Feature Engineering: Lag features, rolling statistics, and calendar features are critical for forecasting accuracy.
  3. Airflow Orchestration: DAGs ensure reliable, auditable, and schedulable ML pipelines.
  4. Backtesting: Walk-forward validation simulates real-world forecasting conditions.
  5. Drift Detection: Monitor for distribution shifts in input data and forecast accuracy degradation.
  6. Business Impact: Connect forecast accuracy to business KPIs like inventory costs and revenue.

⚠️

Prophet can overfit with too many holidays or changepoints. Cross-validate the number of changepoints and holiday effects before production deployment.

ℹ️

For high-frequency data (intraday), consider using NeuralProphet or DeepAR which handle sub-daily patterns better.

Advertisement