The Data Engineering Lifecycle — From Source to Insight

Module 1: FoundationsData LifecycleFree Lesson

Advertisement

Overview

The data engineering lifecycle is the end-to-end journey data takes from its creation in source systems to its consumption by users, applications, and models. Understanding each stage — and the tools, patterns, and best practices associated with it — is fundamental to building reliable data systems.

┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│ GENERATE │──▶│ INGEST   │──▶│ STORE    │──▶│ PROCESS  │──▶│  SERVE   │──▶│ MONITOR  │
│          │   │          │   │          │   │          │   │          │   │          │
│ Sources  │   │ Pipelines│   │ Lakes    │   │ Clean    │   │ APIs     │   │ Alerts   │
│ Events   │   │ Streams  │   │ Waves    │   │ Transform│   │ Dashboards│  │ Metrics  │
│ Logs     │   │ CDC      │   │ Mart     │   │ Enrich   │   │ ML       │   │ Logs     │
└──────────┘   └──────────┘   └──────────┘   └──────────┘   └──────────┘   └──────────┘

Stage 1: Data Generation

Data is generated by various source systems across the organization. Understanding the characteristics of each source type informs how you design ingestion and storage.

Source Types

Source TypeExamplesCharacteristicsChallenges
Transactional DatabasesPostgreSQL, MySQL, OracleStructured, ACID, high consistencySchema changes, CDC complexity
APIsStripe, Salesforce, TwitterSemi-structured, rate-limitedThrottling, pagination, auth
Log FilesApache, Nginx, Application logsUnstructured, high volumeParsing, rotation, retention
IoT SensorsTemperature, GPS, AccelerometerStreaming, time-series, massive scaleOut-of-order events, gaps
Flat FilesCSV, Excel, JSON exportsBatch, variable schemaEncoding, format consistency
Streaming EventsClickstream, TransactionsReal-time, ordered, immutableBackpressure, exactly-once

Data Characteristics

┌─────────────────────────────────────────────────────────────┐
│                  DATA CHARACTERISTICS                        │
├──────────────┬──────────────────────────────────────────────┤
│ Volume       │ How much data? (GB, TB, PB)                  │
│ Velocity     │ How fast? (batch, micro-batch, streaming)    │
│ Variety      │ What format? (structured, semi, unstructured) │
│ Veracity     │ How trustworthy? (clean, noisy, uncertain)   │
│ Value        │ How useful? (high-value vs low-value)         │
│ Variability  │ How consistent? (stable vs changing schemas)  │
└──────────────┴──────────────────────────────────────────────┘

Stage 2: Data Ingestion

Ingestion is the process of moving data from source systems to your data infrastructure. The choice of ingestion method depends on latency requirements, data volume, and source capabilities.

Ingestion Patterns

┌─────────────────────────────────────────────────────────────┐
│                  INGESTION PATTERNS                          │
├─────────────────┬───────────────────────────────────────────┤
│                 │                                           │
│   BATCH         │  ┌────┐  ┌────┐  ┌────┐  ┌────┐         │
│   (Scheduled)   │  │ F1 │  │ F2 │  │ F3 │  │ F4 │         │
│                 │  └────┘  └────┘  └────┘  └────┘         │
│                 │  Every hour/day                           │
│                 │                                           │
│   STREAMING     │  ═══════════════════════════════          │
│   (Real-time)   │  Continuous flow of events               │
│                 │                                           │
│   MICRO-BATCH   │  ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐          │
│   (Near-real)   │  │B1││B2││B3││B4││B5││B6││B7│          │
│                 │  └──┘└──┘└──┘└──┘└──┘└──┘└──┘           │
│                 │  Every few minutes                        │
│                 │                                           │
│   CDC           │  ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓            │
│   (Change)      │  Only changed rows                       │
└─────────────────┴───────────────────────────────────────────┘

Batch Ingestion

Best for: Historical data, daily reports, non-time-sensitive analytics.

# Example: Batch ingestion from PostgreSQL to S3 using Python
import psycopg2
import pandas as pd
import boto3
from io import StringIO

def extract_batch(connection_params, query, table_name):
    """Extract data from source and upload to S3."""
    conn = psycopg2.connect(**connection_params)
    
    # Execute query
    df = pd.read_sql_query(query, conn)
    conn.close()
    
    # Convert to CSV in memory
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    
    # Upload to S3
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='data-lake',
        Key=f'raw/{table_name}/batch_{pd.Timestamp.now()}.csv',
        Body=csv_buffer.getvalue()
    )
    
    return len(df)

# Usage
rows = extract_batch(
    connection_params={
        "host": "source-db.example.com",
        "database": "production",
        "user": "reader",
        "password": "secret"
    },
    query="SELECT * FROM orders WHERE date >= CURRENT_DATE - 1",
    table_name="orders"
)

Streaming Ingestion

Best for: Real-time dashboards, fraud detection, live recommendations.

# Example: Streaming ingestion with Apache Kafka (using confluent-kafka)
from confluent_kafka import Producer
import json
import time

def produce_events(topic, events):
    """Produce events to Kafka topic."""
    producer = Producer({
        'bootstrap.servers': 'kafka-broker:9092',
        'client.id': 'data-ingestion'
    })
    
    for event in events:
        producer.produce(
            topic=topic,
            key=str(event['user_id']),
            value=json.dumps(event).encode('utf-8'),
            callback=delivery_callback
        )
        producer.poll(0)
    
    producer.flush()

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Event delivered to {msg.topic()} [{msg.partition()}]")

# Usage
events = [
    {"user_id": 1, "action": "click", "timestamp": time.time(), "page": "/home"},
    {"user_id": 2, "action": "purchase", "timestamp": time.time(), "amount": 49.99}
]
produce_events('user_events', events)

Change Data Capture (CDC)

Best for: Keeping downstream systems in sync with source databases without full reloads.

┌──────────┐     ┌───────────┐     ┌──────────┐     ┌──────────┐
│ Source   │────▶│ Debezium/ │────▶│  Kafka   │────▶│ Target   │
│ Database │     │ CDC Tool  │     │  Topic   │     │ Warehouse│
│          │     │           │     │          │     │          │
│ Write    │     │ Read      │     │ Stream   │     │ Apply    │
│ Changes  │     │ Binlog    │     │ Changes  │     │ Changes  │
└──────────┘     └───────────┘     └──────────┘     └──────────┘

Stage 3: Data Storage

Storage choices depend on data format, access patterns, cost, and query requirements.

Storage Architecture Layers

┌─────────────────────────────────────────────────────────────┐
│                    DATA LAKEHOUSE                            │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              PRESENTATION LAYER                      │    │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐   │    │
│  │  │ Marts   │  │ Cubes    │  │ Materialized     │   │    │
│  │  │         │  │          │  │ Views            │   │    │
│  │  └─────────┘  └──────────┘  └──────────────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              PROCESSING LAYER                        │    │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐   │    │
│  │  │ Spark   │  │ dbt      │  │ Airflow          │   │    │
│  │  │         │  │          │  │                  │   │    │
│  │  └─────────┘  └──────────┘  └──────────────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              STORAGE LAYER                           │    │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐   │    │
│  │  │ Bronze  │  │ Silver   │  │ Gold             │   │    │
│  │  │ (Raw)   │  │ (Clean)  │  │ (Curated)        │   │    │
│  │  └─────────┘  └──────────┘  └──────────────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Medallion Architecture (Bronze → Silver → Gold)

LayerPurposeData FormatExample
BronzeRaw data, as-is from sourceParquet, JSON, CSVRaw API responses
SilverCleaned, deduplicated, validatedParquet, Delta LakeDeduplicated orders
GoldBusiness-level aggregationsParquet, Delta LakeDaily sales by region

Storage Options Comparison

Storage TypeBest ForCostQuery Performance
Data Lake (S3/GCS)Raw data, archivalLowRequires processing
Data Warehouse (Snowflake/BigQuery)Analytics, reportingMedium-HighExcellent
Operational DB (PostgreSQL)Application queriesMediumHigh for OLTP
OLAP Cube (Druid, ClickHouse)Real-time analyticsMediumVery High
Feature Store (Feast)ML featuresLow-MediumHigh for ML

Stage 4: Data Processing

Processing transforms raw data into useful information through cleaning, transformation, and enrichment.

Processing Paradigms

┌─────────────────────────────────────────────────────────────┐
│                PROCESSING PARADIGMS                          │
├─────────────────┬───────────────────────────────────────────┤
│ Batch           │ Process large datasets periodically       │
│                 │ Tools: Spark, dbt, SQL                    │
├─────────────────┼───────────────────────────────────────────┤
│ Micro-batch    │ Process small batches every few minutes   │
│                 │ Tools: Spark Streaming, Flink (batch)    │
├─────────────────┼───────────────────────────────────────────┤
│ Stream         │ Process events as they arrive             │
│                 │ Tools: Kafka Streams, Flink, Spark        │
├─────────────────┼───────────────────────────────────────────┤
│ Request-response│ Process on-demand via API                │
│                 │ Tools: Lambda functions, FastAPI          │
└─────────────────┴───────────────────────────────────────────┘

Common Processing Operations

# Example: Data processing operations using PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("DataProcessing") \
    .getOrCreate()

# Read bronze layer data
orders = spark.read.format("delta").load("s3://data-lake/bronze/orders")
customers = spark.read.format("delta").load("s3://data-lake/bronze/customers")

# 1. FILTERING — Remove unwanted records
valid_orders = orders.filter(F.col("status") != "cancelled")

# 2. DEDUPLICATION — Remove duplicate records
deduped_orders = valid_orders.dropDuplicates(["order_id"])

# 3. CLEANING — Standardize data
cleaned_orders = deduped_orders \
    .withColumn("amount", F.col("amount").cast("decimal(10,2)")) \
    .withColumn("order_date", F.to_date("order_date", "yyyy-MM-dd"))

# 4. ENRICHMENT — Join with dimension tables
enriched_orders = cleaned_orders.join(
    customers.select("customer_id", "segment", "region"),
    on="customer_id",
    how="left"
)

# 5. AGGREGATION — Create business metrics
daily_sales = enriched_orders \
    .groupBy("order_date", "region", "segment") \
    .agg(
        F.count("order_id").alias("order_count"),
        F.sum("amount").alias("total_revenue"),
        F.avg("amount").alias("avg_order_value")
    )

# 6. WRITE to silver/gold layer
daily_sales.write.format("delta").mode("overwrite").save(
    "s3://data-lake/gold/daily_sales"
)

Stage 5: Data Serving

Data serving is the final stage where processed data is made available to consumers — dashboards, applications, APIs, and ML models.

Serving Patterns

PatternLatencyUse CaseExample
API ServingMillisecondsApplication queriesREST API for user profiles
Dashboard ServingSecondsBI reportingTableau dashboard refresh
Batch ExportHoursOffline analysisCSV export for quarterly report
Feature ServingMillisecondsML inferenceFeature vector for real-time model
Streaming ServingMillisecondsReal-time viewsLive dashboard updates

Data Product Design

┌─────────────────────────────────────────────────────────────┐
│                    DATA PRODUCT                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Schema     │  │  SLA        │  │  Access     │         │
│  │  (Contract) │  │  (Quality)  │  │  (Control)  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Docs       │  │  Lineage    │  │  Tests      │         │
│  │             │  │             │  │             │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
│  Consumers: Dashboards, APIs, ML Models, Analysts          │
└─────────────────────────────────────────────────────────────┘

Stage 6: Data Monitoring

Monitoring ensures data systems remain reliable, performant, and cost-effective.

What to Monitor

Metric CategoryMetricsTools
Pipeline HealthSuccess rate, duration, SLAAirflow, Datadog
Data QualityNull rates, freshness, row countsGreat Expectations, dbt tests
System PerformanceCPU, memory, query timeCloudWatch, Prometheus
CostStorage used, compute hours, queriesCloud billing, FinOps tools
SecurityAccess logs, permission changesCloudTrail, audit logs

Monitoring Implementation

# Example: Data quality monitoring with Great Expectations
import great_expectations as gx

context = gx.get_context()

# Define expectations for a dataset
validator = context.sources.pandas_default.read_csv("s3://data-lake/gold/daily_sales.csv")

validator.expect_column_values_to_not_be_null("order_date")
validator.expect_column_values_to_be_between("total_revenue", min_value=0, max_value=1000000)
validator.expect_column_values_to_be_unique("order_id")
validator.expect_table_row_count_to_be_between(min_value=100, max_value=100000)

results = validator.validate()
print(f"Success: {results.success}")

End-to-End Pipeline Example

# Complete lifecycle example: Daily sales pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def extract_orders(**context):
    """Stage 1: Extract from source database."""
    import psycopg2
    import pandas as pd
    
    conn = psycopg2.connect("postgresql://source-db/orders")
    df = pd.read_sql("SELECT * FROM orders WHERE date = %s", 
                     conn, params=[context['ds']])
    df.to_parquet(f"s3://data-lake/bronze/orders/{context['ds']}.parquet")
    conn.close()
    return len(df)

def transform_orders(**context):
    """Stage 2: Transform and clean."""
    import pandas as pd
    from pyarrow import parquet
    
    df = pd.read_parquet(f"s3://data-lake/bronze/orders/{context['ds']}.parquet")
    
    # Clean and transform
    cleaned = df.drop_duplicates(subset=['order_id'])
    cleaned = cleaned[cleaned['amount'] > 0]
    cleaned['order_date'] = pd.to_datetime(cleaned['order_date'])
    
    cleaned.to_parquet(f"s3://data-lake/silver/orders/{context['ds']}.parquet")
    return len(cleaned)

def load_to_warehouse(**context):
    """Stage 3: Load to data warehouse."""
    from sqlalchemy import create_engine
    
    engine = create_engine("snowflake://warehouse/analytics")
    df = pd.read_parquet(f"s3://data-lake/silver/orders/{context['ds']}.parquet")
    df.to_sql('daily_orders', engine, if_exists='append', index=False)

def monitor(**context):
    """Stage 4: Validate and monitor."""
    # Run quality checks
    pass

# DAG definition
with DAG('daily_sales_pipeline', default_args=default_args,
         schedule_interval='@daily', catchup=False) as dag:

    extract = PythonOperator(task_id='extract', python_callable=extract_orders)
    transform = PythonOperator(task_id='transform', python_callable=transform_orders)
    load = PythonOperator(task_id='load', python_callable=load_to_warehouse)
    monitor_task = PythonOperator(task_id='monitor', python_callable=monitor)
    
    extract >> transform >> load >> monitor_task

Key Takeaways

  1. The lifecycle has 6 stages: Generate → Ingest → Store → Process → Serve → Monitor
  2. Each stage has specific tools and patterns — choose based on latency, volume, and cost requirements
  3. The Medallion Architecture (Bronze → Silver → Gold) provides a clean separation of concerns
  4. Monitoring is not optional — without it, data issues go unnoticed until they cause business impact
  5. End-to-end ownership — data engineers are responsible for the entire lifecycle, not just individual stages
  6. Design for change — source systems, requirements, and tools evolve; build flexible pipelines

Practice Exercises

  1. Lifecycle mapping: For a "real-time social media analytics" use case, describe each stage of the data lifecycle with specific tools.

  2. Source analysis: Identify 3 data sources in your organization. For each, document: volume, velocity, format, and recommended ingestion pattern.

  3. Pipeline design: Design an end-to-end pipeline for "e-commerce order analytics" including Bronze, Silver, and Gold layers.

  4. Monitoring plan: Create a monitoring checklist for a daily ETL pipeline. Include metrics, thresholds, and alerting channels.

  5. Cost estimation: Estimate monthly costs for storing 1TB of data in S3 vs Snowflake vs PostgreSQL. Include storage and query costs.

Advertisement

Need Expert Data Engineering Help?

Professional DE consulting, pipeline architecture, and data platform services.

Advertisement