Overview
The data engineering lifecycle is the end-to-end journey data takes from its creation in source systems to its consumption by users, applications, and models. Understanding each stage — and the tools, patterns, and best practices associated with it — is fundamental to building reliable data systems.
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ GENERATE │──▶│ INGEST │──▶│ STORE │──▶│ PROCESS │──▶│ SERVE │──▶│ MONITOR │
│ │ │ │ │ │ │ │ │ │ │ │
│ Sources │ │ Pipelines│ │ Lakes │ │ Clean │ │ APIs │ │ Alerts │
│ Events │ │ Streams │ │ Waves │ │ Transform│ │ Dashboards│ │ Metrics │
│ Logs │ │ CDC │ │ Mart │ │ Enrich │ │ ML │ │ Logs │
└──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
Stage 1: Data Generation
Data is generated by various source systems across the organization. Understanding the characteristics of each source type informs how you design ingestion and storage.
Source Types
| Source Type | Examples | Characteristics | Challenges |
|---|---|---|---|
| Transactional Databases | PostgreSQL, MySQL, Oracle | Structured, ACID, high consistency | Schema changes, CDC complexity |
| APIs | Stripe, Salesforce, Twitter | Semi-structured, rate-limited | Throttling, pagination, auth |
| Log Files | Apache, Nginx, Application logs | Unstructured, high volume | Parsing, rotation, retention |
| IoT Sensors | Temperature, GPS, Accelerometer | Streaming, time-series, massive scale | Out-of-order events, gaps |
| Flat Files | CSV, Excel, JSON exports | Batch, variable schema | Encoding, format consistency |
| Streaming Events | Clickstream, Transactions | Real-time, ordered, immutable | Backpressure, exactly-once |
Data Characteristics
┌─────────────────────────────────────────────────────────────┐
│ DATA CHARACTERISTICS │
├──────────────┬──────────────────────────────────────────────┤
│ Volume │ How much data? (GB, TB, PB) │
│ Velocity │ How fast? (batch, micro-batch, streaming) │
│ Variety │ What format? (structured, semi, unstructured) │
│ Veracity │ How trustworthy? (clean, noisy, uncertain) │
│ Value │ How useful? (high-value vs low-value) │
│ Variability │ How consistent? (stable vs changing schemas) │
└──────────────┴──────────────────────────────────────────────┘
Stage 2: Data Ingestion
Ingestion is the process of moving data from source systems to your data infrastructure. The choice of ingestion method depends on latency requirements, data volume, and source capabilities.
Ingestion Patterns
┌─────────────────────────────────────────────────────────────┐
│ INGESTION PATTERNS │
├─────────────────┬───────────────────────────────────────────┤
│ │ │
│ BATCH │ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ (Scheduled) │ │ F1 │ │ F2 │ │ F3 │ │ F4 │ │
│ │ └────┘ └────┘ └────┘ └────┘ │
│ │ Every hour/day │
│ │ │
│ STREAMING │ ═══════════════════════════════ │
│ (Real-time) │ Continuous flow of events │
│ │ │
│ MICRO-BATCH │ ┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐┌──┐ │
│ (Near-real) │ │B1││B2││B3││B4││B5││B6││B7│ │
│ │ └──┘└──┘└──┘└──┘└──┘└──┘└──┘ │
│ │ Every few minutes │
│ │ │
│ CDC │ ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ │
│ (Change) │ Only changed rows │
└─────────────────┴───────────────────────────────────────────┘
Batch Ingestion
Best for: Historical data, daily reports, non-time-sensitive analytics.
# Example: Batch ingestion from PostgreSQL to S3 using Python
import psycopg2
import pandas as pd
import boto3
from io import StringIO
def extract_batch(connection_params, query, table_name):
"""Extract data from source and upload to S3."""
conn = psycopg2.connect(**connection_params)
# Execute query
df = pd.read_sql_query(query, conn)
conn.close()
# Convert to CSV in memory
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
# Upload to S3
s3 = boto3.client('s3')
s3.put_object(
Bucket='data-lake',
Key=f'raw/{table_name}/batch_{pd.Timestamp.now()}.csv',
Body=csv_buffer.getvalue()
)
return len(df)
# Usage
rows = extract_batch(
connection_params={
"host": "source-db.example.com",
"database": "production",
"user": "reader",
"password": "secret"
},
query="SELECT * FROM orders WHERE date >= CURRENT_DATE - 1",
table_name="orders"
)
Streaming Ingestion
Best for: Real-time dashboards, fraud detection, live recommendations.
# Example: Streaming ingestion with Apache Kafka (using confluent-kafka)
from confluent_kafka import Producer
import json
import time
def produce_events(topic, events):
"""Produce events to Kafka topic."""
producer = Producer({
'bootstrap.servers': 'kafka-broker:9092',
'client.id': 'data-ingestion'
})
for event in events:
producer.produce(
topic=topic,
key=str(event['user_id']),
value=json.dumps(event).encode('utf-8'),
callback=delivery_callback
)
producer.poll(0)
producer.flush()
def delivery_callback(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Event delivered to {msg.topic()} [{msg.partition()}]")
# Usage
events = [
{"user_id": 1, "action": "click", "timestamp": time.time(), "page": "/home"},
{"user_id": 2, "action": "purchase", "timestamp": time.time(), "amount": 49.99}
]
produce_events('user_events', events)
Change Data Capture (CDC)
Best for: Keeping downstream systems in sync with source databases without full reloads.
┌──────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐
│ Source │────▶│ Debezium/ │────▶│ Kafka │────▶│ Target │
│ Database │ │ CDC Tool │ │ Topic │ │ Warehouse│
│ │ │ │ │ │ │ │
│ Write │ │ Read │ │ Stream │ │ Apply │
│ Changes │ │ Binlog │ │ Changes │ │ Changes │
└──────────┘ └───────────┘ └──────────┘ └──────────┘
Stage 3: Data Storage
Storage choices depend on data format, access patterns, cost, and query requirements.
Storage Architecture Layers
┌─────────────────────────────────────────────────────────────┐
│ DATA LAKEHOUSE │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PRESENTATION LAYER │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Marts │ │ Cubes │ │ Materialized │ │ │
│ │ │ │ │ │ │ Views │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PROCESSING LAYER │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Spark │ │ dbt │ │ Airflow │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ STORAGE LAYER │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Bronze │ │ Silver │ │ Gold │ │ │
│ │ │ (Raw) │ │ (Clean) │ │ (Curated) │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Medallion Architecture (Bronze → Silver → Gold)
| Layer | Purpose | Data Format | Example |
|---|---|---|---|
| Bronze | Raw data, as-is from source | Parquet, JSON, CSV | Raw API responses |
| Silver | Cleaned, deduplicated, validated | Parquet, Delta Lake | Deduplicated orders |
| Gold | Business-level aggregations | Parquet, Delta Lake | Daily sales by region |
Storage Options Comparison
| Storage Type | Best For | Cost | Query Performance |
|---|---|---|---|
| Data Lake (S3/GCS) | Raw data, archival | Low | Requires processing |
| Data Warehouse (Snowflake/BigQuery) | Analytics, reporting | Medium-High | Excellent |
| Operational DB (PostgreSQL) | Application queries | Medium | High for OLTP |
| OLAP Cube (Druid, ClickHouse) | Real-time analytics | Medium | Very High |
| Feature Store (Feast) | ML features | Low-Medium | High for ML |
Stage 4: Data Processing
Processing transforms raw data into useful information through cleaning, transformation, and enrichment.
Processing Paradigms
┌─────────────────────────────────────────────────────────────┐
│ PROCESSING PARADIGMS │
├─────────────────┬───────────────────────────────────────────┤
│ Batch │ Process large datasets periodically │
│ │ Tools: Spark, dbt, SQL │
├─────────────────┼───────────────────────────────────────────┤
│ Micro-batch │ Process small batches every few minutes │
│ │ Tools: Spark Streaming, Flink (batch) │
├─────────────────┼───────────────────────────────────────────┤
│ Stream │ Process events as they arrive │
│ │ Tools: Kafka Streams, Flink, Spark │
├─────────────────┼───────────────────────────────────────────┤
│ Request-response│ Process on-demand via API │
│ │ Tools: Lambda functions, FastAPI │
└─────────────────┴───────────────────────────────────────────┘
Common Processing Operations
# Example: Data processing operations using PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("DataProcessing") \
.getOrCreate()
# Read bronze layer data
orders = spark.read.format("delta").load("s3://data-lake/bronze/orders")
customers = spark.read.format("delta").load("s3://data-lake/bronze/customers")
# 1. FILTERING — Remove unwanted records
valid_orders = orders.filter(F.col("status") != "cancelled")
# 2. DEDUPLICATION — Remove duplicate records
deduped_orders = valid_orders.dropDuplicates(["order_id"])
# 3. CLEANING — Standardize data
cleaned_orders = deduped_orders \
.withColumn("amount", F.col("amount").cast("decimal(10,2)")) \
.withColumn("order_date", F.to_date("order_date", "yyyy-MM-dd"))
# 4. ENRICHMENT — Join with dimension tables
enriched_orders = cleaned_orders.join(
customers.select("customer_id", "segment", "region"),
on="customer_id",
how="left"
)
# 5. AGGREGATION — Create business metrics
daily_sales = enriched_orders \
.groupBy("order_date", "region", "segment") \
.agg(
F.count("order_id").alias("order_count"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value")
)
# 6. WRITE to silver/gold layer
daily_sales.write.format("delta").mode("overwrite").save(
"s3://data-lake/gold/daily_sales"
)
Stage 5: Data Serving
Data serving is the final stage where processed data is made available to consumers — dashboards, applications, APIs, and ML models.
Serving Patterns
| Pattern | Latency | Use Case | Example |
|---|---|---|---|
| API Serving | Milliseconds | Application queries | REST API for user profiles |
| Dashboard Serving | Seconds | BI reporting | Tableau dashboard refresh |
| Batch Export | Hours | Offline analysis | CSV export for quarterly report |
| Feature Serving | Milliseconds | ML inference | Feature vector for real-time model |
| Streaming Serving | Milliseconds | Real-time views | Live dashboard updates |
Data Product Design
┌─────────────────────────────────────────────────────────────┐
│ DATA PRODUCT │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Schema │ │ SLA │ │ Access │ │
│ │ (Contract) │ │ (Quality) │ │ (Control) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Docs │ │ Lineage │ │ Tests │ │
│ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Consumers: Dashboards, APIs, ML Models, Analysts │
└─────────────────────────────────────────────────────────────┘
Stage 6: Data Monitoring
Monitoring ensures data systems remain reliable, performant, and cost-effective.
What to Monitor
| Metric Category | Metrics | Tools |
|---|---|---|
| Pipeline Health | Success rate, duration, SLA | Airflow, Datadog |
| Data Quality | Null rates, freshness, row counts | Great Expectations, dbt tests |
| System Performance | CPU, memory, query time | CloudWatch, Prometheus |
| Cost | Storage used, compute hours, queries | Cloud billing, FinOps tools |
| Security | Access logs, permission changes | CloudTrail, audit logs |
Monitoring Implementation
# Example: Data quality monitoring with Great Expectations
import great_expectations as gx
context = gx.get_context()
# Define expectations for a dataset
validator = context.sources.pandas_default.read_csv("s3://data-lake/gold/daily_sales.csv")
validator.expect_column_values_to_not_be_null("order_date")
validator.expect_column_values_to_be_between("total_revenue", min_value=0, max_value=1000000)
validator.expect_column_values_to_be_unique("order_id")
validator.expect_table_row_count_to_be_between(min_value=100, max_value=100000)
results = validator.validate()
print(f"Success: {results.success}")
End-to-End Pipeline Example
# Complete lifecycle example: Daily sales pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract_orders(**context):
"""Stage 1: Extract from source database."""
import psycopg2
import pandas as pd
conn = psycopg2.connect("postgresql://source-db/orders")
df = pd.read_sql("SELECT * FROM orders WHERE date = %s",
conn, params=[context['ds']])
df.to_parquet(f"s3://data-lake/bronze/orders/{context['ds']}.parquet")
conn.close()
return len(df)
def transform_orders(**context):
"""Stage 2: Transform and clean."""
import pandas as pd
from pyarrow import parquet
df = pd.read_parquet(f"s3://data-lake/bronze/orders/{context['ds']}.parquet")
# Clean and transform
cleaned = df.drop_duplicates(subset=['order_id'])
cleaned = cleaned[cleaned['amount'] > 0]
cleaned['order_date'] = pd.to_datetime(cleaned['order_date'])
cleaned.to_parquet(f"s3://data-lake/silver/orders/{context['ds']}.parquet")
return len(cleaned)
def load_to_warehouse(**context):
"""Stage 3: Load to data warehouse."""
from sqlalchemy import create_engine
engine = create_engine("snowflake://warehouse/analytics")
df = pd.read_parquet(f"s3://data-lake/silver/orders/{context['ds']}.parquet")
df.to_sql('daily_orders', engine, if_exists='append', index=False)
def monitor(**context):
"""Stage 4: Validate and monitor."""
# Run quality checks
pass
# DAG definition
with DAG('daily_sales_pipeline', default_args=default_args,
schedule_interval='@daily', catchup=False) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_orders)
transform = PythonOperator(task_id='transform', python_callable=transform_orders)
load = PythonOperator(task_id='load', python_callable=load_to_warehouse)
monitor_task = PythonOperator(task_id='monitor', python_callable=monitor)
extract >> transform >> load >> monitor_task
Key Takeaways
- The lifecycle has 6 stages: Generate → Ingest → Store → Process → Serve → Monitor
- Each stage has specific tools and patterns — choose based on latency, volume, and cost requirements
- The Medallion Architecture (Bronze → Silver → Gold) provides a clean separation of concerns
- Monitoring is not optional — without it, data issues go unnoticed until they cause business impact
- End-to-end ownership — data engineers are responsible for the entire lifecycle, not just individual stages
- Design for change — source systems, requirements, and tools evolve; build flexible pipelines
Practice Exercises
-
Lifecycle mapping: For a "real-time social media analytics" use case, describe each stage of the data lifecycle with specific tools.
-
Source analysis: Identify 3 data sources in your organization. For each, document: volume, velocity, format, and recommended ingestion pattern.
-
Pipeline design: Design an end-to-end pipeline for "e-commerce order analytics" including Bronze, Silver, and Gold layers.
-
Monitoring plan: Create a monitoring checklist for a daily ETL pipeline. Include metrics, thresholds, and alerting channels.
-
Cost estimation: Estimate monthly costs for storing 1TB of data in S3 vs Snowflake vs PostgreSQL. Include storage and query costs.