Python in Data Engineering
Python is the primary programming language for data engineers. It's used for building pipelines, interacting with databases, automating tasks, and processing data at scale.
┌─────────────────────────────────────────────────────────────┐
│ PYTHON IN DATA ENGINEERING │
├─────────────────────────────────────────────────────────────┤
│ Pipeline Orchestration │ Airflow, Dagster, Prefect │
│ Data Processing │ Pandas, PySpark, Dask │
│ Database Interaction │ SQLAlchemy, psycopg2, mysql- │
│ │ connector │
│ Cloud Services │ boto3, google-cloud, azure- │
│ │ identity │
│ CLI Tools │ Click, argparse, rich │
│ Scheduling │ APScheduler, croniter │
│ Data Validation │ Great Expectations, Pydantic │
│ File Formats │ pyarrow, fastavro, orjson │
└─────────────────────────────────────────────────────────────┘
Setting Up Your Environment
Virtual Environments
# Create virtual environment
python -m venv venv
# Activate (Windows)
venv\Scripts\activate
# Activate (macOS/Linux)
source venv/bin/activate
# Install dependencies
pip install pandas sqlalchemy psycopg2-binary boto3 pyarrow
# Freeze dependencies
pip freeze > requirements.txt
# Install from requirements
pip install -r requirements.txt
Type Hints (Production Best Practice)
from typing import Optional, List, Dict, Tuple
from datetime import datetime
# Type hints improve code clarity and enable static analysis
def process_orders(
orders: List[Dict],
start_date: datetime,
end_date: Optional[datetime] = None,
batch_size: int = 1000
) -> Tuple[int, List[str]]:
"""Process orders within date range.
Returns:
Tuple of (processed_count, error_messages)
"""
processed = 0
errors = []
for order in orders:
try:
# Process logic here
processed += 1
except Exception as e:
errors.append(f"Order {order['id']}: {str(e)}")
return processed, errors
Pandas for Data Processing
Core Operations
import pandas as pd
import numpy as np
# Reading data
df = pd.read_csv('data.csv', dtype={'id': int, 'amount': float})
df = pd.read_parquet('data.parquet')
df = pd.read_sql("SELECT * FROM orders", connection)
# Basic operations
print(df.shape) # (rows, columns)
print(df.dtypes) # Column data types
print(df.head()) # First 5 rows
print(df.info()) # Column info and null counts
print(df.describe()) # Statistical summary
# Selection
df[['col1', 'col2']] # Select columns
df[df['amount'] > 100] # Filter rows
df.loc[0:10, ['col1', 'col2']] # Label-based selection
df.iloc[0:10, 0:3] # Position-based selection
# Missing values
df.isnull().sum() # Count nulls per column
df.dropna(subset=['critical_col']) # Drop rows with nulls
df['col'].fillna(df['col'].mean()) # Fill with mean
df['col'].fillna(method='ffill') # Forward fill
Data Transformation Patterns
# Column transformations
df['total'] = df['quantity'] * df['unit_price']
df['category'] = df['category'].str.lower().str.strip()
df['date'] = pd.to_datetime(df['date'])
df['year_month'] = df['date'].dt.to_period('M')
# Apply custom functions
def categorize_amount(amount):
if amount < 10:
return 'low'
elif amount < 100:
return 'medium'
else:
return 'high'
df['amount_category'] = df['amount'].apply(categorize_amount)
# Vectorized operations (faster than apply)
df['amount_category'] = pd.cut(
df['amount'],
bins=[0, 10, 100, float('inf')],
labels=['low', 'medium', 'high']
)
# GroupBy operations
summary = df.groupby(['region', 'product_category']).agg(
total_orders=('order_id', 'count'),
total_revenue=('amount', 'sum'),
avg_order_value=('amount', 'mean'),
unique_customers=('customer_id', 'nunique')
).reset_index()
# Merge operations
result = pd.merge(
orders_df,
customers_df,
on='customer_id',
how='left',
validate='many_to_one'
)
# Reshape data
# Pivot: rows to columns
pivot_df = df.pivot_table(
index='region',
columns='quarter',
values='revenue',
aggfunc='sum',
fill_value=0
)
# Melt: columns to rows
melted_df = pd.melt(
df,
id_vars=['product'],
value_vars=['Q1', 'Q2', 'Q3', 'Q4'],
var_name='quarter',
value_name='revenue'
)
Performance Optimization
# Use chunking for large files
def process_large_csv(filepath, chunk_size=100000):
results = []
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
# Process each chunk
processed = chunk.groupby('category').agg({'amount': 'sum'})
results.append(processed)
return pd.concat(results)
# Optimize dtypes to reduce memory
def optimize_dtypes(df):
for col in df.select_dtypes(include=['int']).columns:
if df[col].max() < 128:
df[col] = df[col].astype('int8')
elif df[col].max() < 32768:
df[col] = df[col].astype('int16')
elif df[col].max() < 2147483648:
df[col] = df[col].astype('int32')
for col in df.select_dtypes(include=['object']).columns:
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # Less than 50% unique
df[col] = df[col].astype('category')
return df
# Parallel processing with Dask
import dask.dataframe as dd
ddf = dd.read_parquet('s3://bucket/data/*.parquet')
result = ddf.groupby('category').agg({'amount': 'sum'}).compute()
SQLAlchemy for Database Operations
Connection Management
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import pandas as pd
# Create engine with connection pooling
engine = create_engine(
"postgresql://user:password@host:5432/dbname",
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Verify connections before use
echo=False
)
# Context manager for connections
@contextmanager
def get_session():
Session = sessionmaker(bind=engine)
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Read data with pandas
with engine.connect() as conn:
df = pd.read_sql(
text("SELECT * FROM orders WHERE date >= :start_date"),
conn,
params={"start_date": "2024-01-01"}
)
# Write data to database
df.to_sql(
'processed_orders',
engine,
if_exists='append',
index=False,
chunksize=1000,
method='multi'
)
ORM Pattern
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customers'
customer_id = Column(Integer, primary_key=True)
first_name = Column(String(100), nullable=False)
last_name = Column(String(100), nullable=False)
email = Column(String(255), unique=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationship
orders = relationship("Order", back_populates="customer")
class Order(Base):
__tablename__ = 'orders'
order_id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey('customers.customer_id'))
amount = Column(Integer, nullable=False) # Stored as cents
status = Column(String(50), default='pending')
customer = relationship("Customer", back_populates="orders")
# Query with ORM
with get_session() as session:
# Get customer with orders
customer = session.query(Customer).filter_by(customer_id=1).first()
print(f"Customer: {customer.first_name} {customer.last_name}")
print(f"Orders: {len(customer.orders)}")
# Complex query
high_value_orders = session.query(Order) \
.join(Customer) \
.filter(Order.amount > 10000) \
.filter(Customer.country == 'USA') \
.all()
psycopg2 for PostgreSQL
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
import csv
class PostgresHandler:
def __init__(self, host, database, user, password, port=5432):
self.conn_params = {
'host': host,
'database': database,
'user': user,
'password': password,
'port': port
}
def get_connection(self):
return psycopg2.connect(**self.conn_params)
def execute_query(self, query, params=None):
"""Execute a single query."""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
if cur.description: # SELECT query
return cur.fetchall()
conn.commit()
def bulk_insert(self, table, columns, data, page_size=1000):
"""Efficient bulk insert using execute_values."""
query = sql.SQL("INSERT INTO {} ({}) VALUES %s").format(
sql.Identifier(table),
sql.SQL(', ').join(map(sql.Identifier, columns))
)
with self.get_connection() as conn:
with conn.cursor() as cur:
execute_values(cur, query, data, page_size=page_size)
conn.commit()
def copy_from_csv(self, table, filepath, delimiter=','):
"""Fast CSV import using COPY."""
with self.get_connection() as conn:
with conn.cursor() as cur:
with open(filepath, 'r') as f:
cur.copy_expert(
sql.SQL("COPY {} FROM STDIN WITH CSV HEADER DELIMITER AS {}").format(
sql.Identifier(table),
sql.Literal(delimiter)
),
f
)
conn.commit()
def upsert(self, table, columns, data, conflict_columns):
"""Insert or update on conflict."""
insert_query = sql.SQL("""
INSERT INTO {table} ({columns})
VALUES %s
ON CONFLICT ({conflict_cols})
DO UPDATE SET {update_clause}
""").format(
table=sql.Identifier(table),
columns=sql.SQL(', ').join(map(sql.Identifier, columns)),
conflict_cols=sql.SQL(', ').join(map(sql.Identifier, conflict_columns)),
update_clause=sql.SQL(', ').join(
sql.SQL("{} = EXCLUDED.{}").format(
sql.Identifier(col), sql.Identifier(col)
) for col in columns if col not in conflict_columns
)
)
with self.get_connection() as conn:
with conn.cursor() as cur:
execute_values(cur, insert_query, data, page_size=1000)
conn.commit()
# Usage
db = PostgresHandler('localhost', 'mydb', 'user', 'password')
# Bulk insert
data = [
(1, 'John', 'Doe', 'john@example.com'),
(2, 'Jane', 'Smith', 'jane@example.com')
]
db.bulk_insert('users', ['id', 'first_name', 'last_name', 'email'], data)
# Upsert
db.upsert(
'users',
['id', 'first_name', 'last_name', 'email'],
data,
conflict_columns=['email']
)
boto3 for AWS Services
import boto3
from botocore.exceptions import ClientError
import json
from io import BytesIO
class S3Handler:
def __init__(self, bucket_name):
self.bucket = bucket_name
self.s3 = boto3.client('s3')
def upload_file(self, local_path, s3_key):
"""Upload a file to S3."""
self.s3.upload_file(local_path, self.bucket, s3_key)
def upload_dataframe(self, df, s3_key, format='parquet'):
"""Upload a pandas DataFrame to S3."""
buffer = BytesIO()
if format == 'parquet':
df.to_parquet(buffer, index=False)
elif format == 'csv':
df.to_csv(buffer, index=False)
buffer.seek(0)
self.s3.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=buffer.getvalue()
)
def read_parquet(self, s3_key):
"""Read a Parquet file from S3."""
response = self.s3.get_object(Bucket=self.bucket, Key=s3_key)
return pd.read_parquet(BytesIO(response['Body'].read()))
def list_files(self, prefix=''):
"""List files with given prefix."""
response = self.s3.list_objects_v2(
Bucket=self.bucket,
Prefix=prefix
)
return [obj['Key'] for obj in response.get('Contents', [])]
def delete_file(self, s3_key):
"""Delete a file from S3."""
self.s3.delete_object(Bucket=self.bucket, Key=s3_key)
# Usage
s3 = S3Handler('my-data-lake')
# Upload processed data
df = pd.read_csv('processed.csv')
s3.upload_dataframe(df, 'gold/processed_data/2024-01-01.parquet')
# Read from S3
df = s3.read_parquet('silver/orders/2024-01-01.parquet')
CLI Tools with Click
import click
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@click.group()
@click.option('--verbose', is_flag=True, help='Enable verbose logging')
def cli(verbose):
"""Data Engineering CLI Tool."""
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
@cli.command()
@click.option('--source', required=True, help='Source database connection string')
@click.option('--target', required=True, help='Target table name')
@click.option('--date', type=click.DateTime(), default=datetime.now(), help='Processing date')
@click.option('--batch-size', default=1000, type=int, help='Batch size for processing')
def extract(source, target, date, batch_size):
"""Extract data from source database."""
logger.info(f"Extracting data for {date.date()}")
# Extraction logic here
db = PostgresHandler.from_url(source)
query = f"""
SELECT * FROM orders
WHERE order_date = %s
"""
data = db.execute_query(query, (date.date(),))
logger.info(f"Extracted {len(data)} records")
@cli.command()
@click.argument('input_path')
@click.argument('output_path')
@click.option('--compression', type=click.Choice(['gzip', 'snappy', 'none']), default='snappy')
def transform(input_path, output_path, compression):
"""Transform raw data to clean format."""
logger.info(f"Transforming {input_path} -> {output_path}")
df = pd.read_parquet(input_path)
# Transformations
df = df.drop_duplicates(subset=['order_id'])
df = df[df['amount'] > 0]
df['processed_at'] = datetime.now()
df.to_parquet(output_path, compression=compression, index=False)
logger.info(f"Transformed {len(df)} records")
@cli.command()
@click.option('--table', required=True, help='Target table name')
@click.option('--data-path', required=True, help='Path to data file')
def load(table, data_path):
"""Load data into target table."""
logger.info(f"Loading {data_path} into {table}")
df = pd.read_parquet(data_path)
engine = create_engine("postgresql://user:pass@host/db")
df.to_sql(table, engine, if_exists='append', index=False, chunksize=1000)
logger.info(f"Loaded {len(df)} records")
if __name__ == '__main__':
cli()
Scheduling with APScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
scheduler = BlockingScheduler()
@scheduler.scheduled_job(CronTrigger(hour=2, minute=0)) # Daily at 2 AM
def daily_etl():
"""Run daily ETL pipeline."""
logger.info(f"Starting daily ETL at {datetime.now()}")
try:
# Extract
extract_data()
# Transform
transform_data()
# Load
load_data()
logger.info("Daily ETL completed successfully")
except Exception as e:
logger.error(f"ETL failed: {e}")
send_alert("ETL Pipeline Failed", str(e))
@scheduler.scheduled_job(CronTrigger(hour='*/6')) # Every 6 hours
def incremental_sync():
"""Run incremental sync."""
logger.info("Starting incremental sync")
sync_incremental_data()
@scheduler.scheduled_job(CronTrigger(day_of_week='sun', hour=3)) # Weekly
def weekly_maintenance():
"""Weekly maintenance tasks."""
logger.info("Running weekly maintenance")
vacuum_analyze_tables()
clean_old_files()
if __name__ == '__main__':
scheduler.start()
Key Takeaways
- Virtual environments are mandatory — always isolate project dependencies
- Type hints improve maintainability — use them in all production code
- Pandas is essential — master DataFrame operations, groupby, and merge
- SQLAlchemy provides connection pooling — use it for production database access
- psycopg2 is faster for bulk operations — use COPY and execute_values
- boto3 is the AWS SDK — learn S3, EC2, and Lambda basics
- Click makes great CLI tools — build reusable data engineering scripts
- Schedule with APScheduler or Airflow — automate pipeline execution
Practice Exercises
-
Pandas challenge: Write a function that reads a 10GB CSV file in chunks, performs aggregations, and writes the result to Parquet.
-
Database handler: Create a PostgreSQL class that supports bulk insert, upsert, and data validation.
-
CLI tool: Build a CLI tool that accepts a database connection string and generates a data profile report (column types, nulls, distributions).
-
S3 data lake: Create a utility class that organizes S3 files into Bronze/Silver/Gold layers with proper partitioning.
-
End-to-end pipeline: Build a complete ETL script that extracts from a database, transforms with pandas, validates with Pydantic, and loads to S3.