Time Series Forecasting at Scale
Prophet + LightGBM + Airflow | Automated ML Pipeline
Project Overview
Problem Statement
Businesses need accurate demand forecasting, revenue predictions, and capacity planning. Time series data has unique patterns (trends, seasonality, holidays) that require specialized models. This pipeline combines Prophet for trend/seasonality with LightGBM for feature-driven forecasting, orchestrated by Airflow.
Objectives
- Build ensemble forecasting with Prophet + LightGBM
- Implement automated feature engineering for time series
- Create Airflow DAGs for scheduled retraining
- Deploy with A/B testing between model versions
- Monitor for concept drift and data quality
| Component | Technology |
|---|---|
| Statistical Model | Facebook Prophet |
| ML Model | LightGBM + XGBoost |
| Orchestration | Apache Airflow |
| Data Storage | BigQuery + Parquet |
| Feature Store | Feast |
| API Framework | FastAPI |
| Monitoring | Evidently AI + Grafana |
| Experiment Tracking | MLflow |
Architecture Diagram
+-------------------------------------------------------------------+
| Time Series Forecasting Pipeline Architecture |
+-------------------------------------------------------------------+
| +--------------+ +--------------+ +------------------+ |
| | Time Series |--->| Feature |--->| Model Training | |
| | Data (BQ) | | Engineering | | (Prophet + LGBM) | |
+--------------+ +--------------+ +--------+---------+ |
| | |
| v |
| +--------------+ +--------------+ +------------------+ |
| | Forecast |<---| Airflow |<---| Model Registry | |
| | API | | Scheduler | | (MLflow) | |
| +--------------+ +--------------+ +------------------+ |
| | | |
| v v |
| +--------------+ +--------------+ +------------------+ |
| | Business | | Drift | | Retraining | |
| | Dashboard | | Detection | | Trigger | |
| +--------------+ +--------------+ +------------------+ |
+-------------------------------------------------------------------+
Step-by-Step Implementation
Step 1: Environment Setup
mkdir ts-forecasting && cd ts-forecasting
pip install prophet lightgbm xgboost
pip install apache-airflow airflow-providers-google
pip install feast mlflow evidently
pip install fastapi uvicorn pandas numpy
pip install statsmodels scipy
Step 2: Feature Engineering for Time Series
Build comprehensive feature engineering that captures temporal patterns, lag features, and external regressors.
# src/features/time_features.py
import pandas as pd
import numpy as np
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class FeatureConfig:
target_col: str
date_col: str
lag_periods: List[int] = None
rolling_windows: List[int] = None
holiday_dates: Optional[List[str]] = None
def __post_init__(self):
if self.lag_periods is None:
self.lag_periods = [1, 7, 14, 28, 90]
if self.rolling_windows is None:
self.rolling_windows = [7, 14, 28, 90]
class TimeSeriesFeatureEngineer:
def __init__(self, config: FeatureConfig):
self.config = config
def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df[self.config.date_col] = pd.to_datetime(df[self.config.date_col])
df = df.sort_values(self.config.date_col).reset_index(drop=True)
# Calendar features
df["day_of_week"] = df[self.config.date_col].dt.dayofweek
df["month"] = df[self.config.date_col].dt.month
df["quarter"] = df[self.config.date_col].dt.quarter
df["year"] = df[self.config.date_col].dt.year
df["is_weekend"] = (df["day_of_week"] >= 5).astype(int)
df["day_of_month"] = df[self.config.date_col].dt.day
df["week_of_year"] = df[self.config.date_col].dt.isocalendar().week.astype(int)
# Lag features
for lag in self.config.lag_periods:
df[f"lag_{lag}"] = df[self.config.target_col].shift(lag)
# Rolling statistics
for window in self.config.rolling_windows:
df[f"rolling_mean_{window}"] = df[self.config.target_col].rolling(window=window).mean()
df[f"rolling_std_{window}"] = df[self.config.target_col].rolling(window=window).std()
df[f"rolling_min_{window}"] = df[self.config.target_col].rolling(window=window).min()
df[f"rolling_max_{window}"] = df[self.config.target_col].rolling(window=window).max()
df[f"rolling_median_{window}"] = df[self.config.target_col].rolling(window=window).median()
# Exponentially weighted features
for span in [7, 14, 28]:
df[f"ewm_mean_{span}"] = df[self.config.target_col].ewm(span=span).mean()
# Diff features
df["diff_1"] = df[self.config.target_col].diff(1)
df["diff_7"] = df[self.config.target_col].diff(7)
# Percentage change
df["pct_change_1"] = df[self.config.target_col].pct_change(1)
df["pct_change_7"] = df[self.config.target_col].pct_change(7)
# Holiday features
if self.config.holiday_dates:
holidays = pd.to_datetime(self.config.holiday_dates)
df["is_holiday"] = df[self.config.date_col].isin(holidays).astype(int)
df["days_to_holiday"] = df[self.config.date_col].apply(
lambda x: min([abs((x - h).days) for h in holidays]) if holidays.any() else 0
)
return df.dropna()
def create_prophet_features(self, df: pd.DataFrame) -> pd.DataFrame:
prophet_df = df[[self.config.date_col, self.config.target_col]].copy()
prophet_df.columns = ["ds", "y"]
return prophet_df
Step 3: Prophet + LightGBM Ensemble
Combine Prophet for trend/seasonality with LightGBM for feature-driven corrections.
# src/models/ensemble.py
import pandas as pd
import numpy as np
from prophet import Prophet
import lightgbm as lgb
from typing import Dict, Tuple, Optional
import mlflow
from dataclasses import dataclass
@dataclass
class EnsembleConfig:
prophet_seasonality: str = "monthly"
lgbm_params: Dict = None
ensemble_method: str = "weighted" # weighted, stacking
prophet_weight: float = 0.4
lgbm_weight: float = 0.6
def __post_init__(self):
if self.lgbm_params is None:
self.lgbm_params = {
"objective": "regression",
"metric": "rmse",
"num_leaves": 31,
"learning_rate": 0.05,
"feature_fraction": 0.8,
"bagging_fraction": 0.8,
"bagging_freq": 5,
"verbose": -1,
}
class ProphetLightGBMEnsemble:
def __init__(self, config: EnsembleConfig = None):
self.config = config or EnsembleConfig()
self.prophet_model = None
self.lgbm_model = None
def fit(self, df: pd.DataFrame, target_col: str, date_col: str):
# Split for LGBM feature-based predictions
feature_cols = [c for c in df.columns if c not in [target_col, date_col, "ds", "y"]]
# Fit Prophet
prophet_df = df[[date_col, target_col]].rename(columns={date_col: "ds", target_col: "y"})
self.prophet_model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
changepoint_prior_scale=0.05,
)
self.prophet_model.fit(prophet_df)
# Get Prophet predictions as feature for LGBM
prophet_pred = self.prophet_model.predict(prophet_df)["yhat"].values
# Fit LightGBM with Prophet predictions as additional feature
X = df[feature_cols].copy()
X["prophet_pred"] = prophet_pred
y = df[target_col].values
train_data = lgb.Dataset(X, label=y)
self.lgbm_model = lgb.train(
self.config.lgbm_params, train_data, num_boost_round=1000,
valid_sets=[train_data], callbacks=[lgb.log_evaluation(100)],
)
self.feature_cols = feature_cols
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
# Prophet forecast
future = self.prophet_model.make_future_dataframe(periods=len(df))
prophet_forecast = self.prophet_model.predict(future)
# LGBM prediction
X = df[self.feature_cols].copy()
X["prophet_pred"] = prophet_forecast["yhat"].values[-len(df):]
lgbm_pred = self.lgbm_model.predict(X)
# Ensemble
ensemble_pred = (
self.config.prophet_weight * prophet_forecast["yhat"].values[-len(df):]
+ self.config.lgbm_weight * lgbm_pred
)
return pd.DataFrame({
"date": df["date"].values,
"prophet_pred": prophet_forecast["yhat"].values[-len(df):],
"lgbm_pred": lgbm_pred,
"ensemble_pred": ensemble_pred,
"prophet_lower": prophet_forecast["yhat_lower"].values[-len(df):],
"prophet_upper": prophet_forecast["yhat_upper"].values[-len(df):],
})
Step 4: Airflow DAG for Automated Retraining
# dags/forecast_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ml-alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=10),
}
def extract_data(**context):
"""Extract latest data from BigQuery."""
from google.cloud import bigquery
client = bigquery.Client()
query = "SELECT * FROM `project.dataset.time_series_data` WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)"
df = client.query(query).to_dataframe()
df.to_parquet("/tmp/ts_data.parquet")
return f"Extracted {len(df)} rows"
def train_models(**context):
"""Train Prophet + LightGBM ensemble."""
import pandas as pd
from src.models.ensemble import ProphetLightGBMEnsemble, EnsembleConfig
import mlflow
df = pd.read_parquet("/tmp/ts_data.parquet")
config = EnsembleConfig()
ensemble = ProphetLightGBMEnsemble(config)
with mlflow.start_run():
ensemble.fit(df, target_col="value", date_col="date")
mlflow.prophet.log_model(ensemble.prophet_model, "prophet_model")
mlflow.lightgbm.log_model(ensemble.lgbm_model, "lgbm_model")
return "Training complete"
def evaluate_and_register(**context):
"""Evaluate and register model if better than current."""
from src.monitoring.drift import DriftDetector
detector = DriftDetector()
is_better = detector.compare_models("current", "new")
if is_better:
detector.register_model("new")
return "Model registered"
return "Model not registered - no improvement"
with DAG(
"time_series_forecast",
default_args=default_args,
description="Automated time series forecasting pipeline",
schedule_interval="0 2 * * 1", # Weekly on Monday at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(task_id="extract_data", python_callable=extract_data)
train = PythonOperator(task_id="train_models", python_callable=train_models)
evaluate = PythonOperator(task_id="evaluate_register", python_callable=evaluate_and_register)
deploy = BashOperator(task_id="deploy_model", bash_command="python -m src.deploy.push_model")
extract >> train >> evaluate >> deploy
Step 5: FastAPI Serving
# src/api/main.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
from contextlib import asynccontextmanager
import pandas as pd
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load models at startup
app.state.ensemble = load_ensemble("production")
yield
app = FastAPI(title="Forecast API", lifespan=lifespan)
class ForecastRequest(BaseModel):
historical_data: List[dict]
horizon: int = 30
include_confidence: bool = True
class ForecastResponse(BaseModel):
forecasts: List[dict]
model_version: str
@app.post("/api/v1/forecast", response_model=ForecastResponse)
async def forecast(request: ForecastRequest):
df = pd.DataFrame(request.historical_data)
predictions = app.state.ensemble.predict(df)
return ForecastResponse(
forecasts=predictions.to_dict(orient="records"),
model_version="v1.0.0",
)
βΉοΈ
Always include confidence intervals in forecasts. Point predictions alone are insufficient for business decision-making. Prophet provides natural uncertainty intervals.
π‘
Monitor forecast accuracy by horizon length. Short-term forecasts (1-7 days) should be much more accurate than long-term (30+ days). Set different alerting thresholds per horizon.
Performance Metrics
| Metric | Target | Description |
|---|---|---|
| MAPE | < 10% | Mean Absolute Percentage Error |
| RMSE | Baseline -20% | vs simple baseline |
| Coverage | > 90% | 95% CI actual coverage |
| Retraining Latency | < 30min | Full pipeline |
| Inference Latency | < 100ms | Per forecast request |
Interview Talking Points
- Ensemble Strategy: Prophet captures trend and seasonality, LightGBM handles feature interactions and non-linear patterns.
- Feature Engineering: Lag features, rolling statistics, and calendar features are critical for forecasting accuracy.
- Airflow Orchestration: DAGs ensure reliable, auditable, and schedulable ML pipelines.
- Backtesting: Walk-forward validation simulates real-world forecasting conditions.
- Drift Detection: Monitor for distribution shifts in input data and forecast accuracy degradation.
- Business Impact: Connect forecast accuracy to business KPIs like inventory costs and revenue.
β οΈ
Prophet can overfit with too many holidays or changepoints. Cross-validate the number of changepoints and holiday effects before production deployment.
βΉοΈ
For high-frequency data (intraday), consider using NeuralProphet or DeepAR which handle sub-daily patterns better.