Python for Data Engineers — Essential Libraries and Patterns

Module 1: FoundationsPython ProgrammingFree Lesson

Advertisement

Python in Data Engineering

Python is the primary programming language for data engineers. It's used for building pipelines, interacting with databases, automating tasks, and processing data at scale.

┌─────────────────────────────────────────────────────────────┐
│                  PYTHON IN DATA ENGINEERING                   │
├─────────────────────────────────────────────────────────────┤
│  Pipeline Orchestration  │  Airflow, Dagster, Prefect       │
│  Data Processing         │  Pandas, PySpark, Dask           │
│  Database Interaction    │  SQLAlchemy, psycopg2, mysql-    │
│                          │  connector                        │
│  Cloud Services          │  boto3, google-cloud, azure-     │
│                          │  identity                         │
│  CLI Tools               │  Click, argparse, rich           │
│  Scheduling              │  APScheduler, croniter           │
│  Data Validation         │  Great Expectations, Pydantic    │
│  File Formats            │  pyarrow, fastavro, orjson       │
└─────────────────────────────────────────────────────────────┘

Setting Up Your Environment

Virtual Environments

# Create virtual environment
python -m venv venv

# Activate (Windows)
venv\Scripts\activate

# Activate (macOS/Linux)
source venv/bin/activate

# Install dependencies
pip install pandas sqlalchemy psycopg2-binary boto3 pyarrow

# Freeze dependencies
pip freeze > requirements.txt

# Install from requirements
pip install -r requirements.txt

Type Hints (Production Best Practice)

from typing import Optional, List, Dict, Tuple
from datetime import datetime

# Type hints improve code clarity and enable static analysis
def process_orders(
    orders: List[Dict],
    start_date: datetime,
    end_date: Optional[datetime] = None,
    batch_size: int = 1000
) -> Tuple[int, List[str]]:
    """Process orders within date range.
    
    Returns:
        Tuple of (processed_count, error_messages)
    """
    processed = 0
    errors = []
    
    for order in orders:
        try:
            # Process logic here
            processed += 1
        except Exception as e:
            errors.append(f"Order {order['id']}: {str(e)}")
    
    return processed, errors

Pandas for Data Processing

Core Operations

import pandas as pd
import numpy as np

# Reading data
df = pd.read_csv('data.csv', dtype={'id': int, 'amount': float})
df = pd.read_parquet('data.parquet')
df = pd.read_sql("SELECT * FROM orders", connection)

# Basic operations
print(df.shape)           # (rows, columns)
print(df.dtypes)          # Column data types
print(df.head())          # First 5 rows
print(df.info())          # Column info and null counts
print(df.describe())      # Statistical summary

# Selection
df[['col1', 'col2']]                     # Select columns
df[df['amount'] > 100]                   # Filter rows
df.loc[0:10, ['col1', 'col2']]          # Label-based selection
df.iloc[0:10, 0:3]                       # Position-based selection

# Missing values
df.isnull().sum()                         # Count nulls per column
df.dropna(subset=['critical_col'])        # Drop rows with nulls
df['col'].fillna(df['col'].mean())        # Fill with mean
df['col'].fillna(method='ffill')          # Forward fill

Data Transformation Patterns

# Column transformations
df['total'] = df['quantity'] * df['unit_price']
df['category'] = df['category'].str.lower().str.strip()
df['date'] = pd.to_datetime(df['date'])
df['year_month'] = df['date'].dt.to_period('M')

# Apply custom functions
def categorize_amount(amount):
    if amount < 10:
        return 'low'
    elif amount < 100:
        return 'medium'
    else:
        return 'high'

df['amount_category'] = df['amount'].apply(categorize_amount)

# Vectorized operations (faster than apply)
df['amount_category'] = pd.cut(
    df['amount'], 
    bins=[0, 10, 100, float('inf')],
    labels=['low', 'medium', 'high']
)

# GroupBy operations
summary = df.groupby(['region', 'product_category']).agg(
    total_orders=('order_id', 'count'),
    total_revenue=('amount', 'sum'),
    avg_order_value=('amount', 'mean'),
    unique_customers=('customer_id', 'nunique')
).reset_index()

# Merge operations
result = pd.merge(
    orders_df, 
    customers_df, 
    on='customer_id', 
    how='left',
    validate='many_to_one'
)

# Reshape data
# Pivot: rows to columns
pivot_df = df.pivot_table(
    index='region',
    columns='quarter',
    values='revenue',
    aggfunc='sum',
    fill_value=0
)

# Melt: columns to rows
melted_df = pd.melt(
    df, 
    id_vars=['product'], 
    value_vars=['Q1', 'Q2', 'Q3', 'Q4'],
    var_name='quarter', 
    value_name='revenue'
)

Performance Optimization

# Use chunking for large files
def process_large_csv(filepath, chunk_size=100000):
    results = []
    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # Process each chunk
        processed = chunk.groupby('category').agg({'amount': 'sum'})
        results.append(processed)
    return pd.concat(results)

# Optimize dtypes to reduce memory
def optimize_dtypes(df):
    for col in df.select_dtypes(include=['int']).columns:
        if df[col].max() < 128:
            df[col] = df[col].astype('int8')
        elif df[col].max() < 32768:
            df[col] = df[col].astype('int16')
        elif df[col].max() < 2147483648:
            df[col] = df[col].astype('int32')
    
    for col in df.select_dtypes(include=['object']).columns:
        num_unique = df[col].nunique()
        num_total = len(df[col])
        if num_unique / num_total < 0.5:  # Less than 50% unique
            df[col] = df[col].astype('category')
    
    return df

# Parallel processing with Dask
import dask.dataframe as dd

ddf = dd.read_parquet('s3://bucket/data/*.parquet')
result = ddf.groupby('category').agg({'amount': 'sum'}).compute()

SQLAlchemy for Database Operations

Connection Management

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import pandas as pd

# Create engine with connection pooling
engine = create_engine(
    "postgresql://user:password@host:5432/dbname",
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,  # Verify connections before use
    echo=False
)

# Context manager for connections
@contextmanager
def get_session():
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Read data with pandas
with engine.connect() as conn:
    df = pd.read_sql(
        text("SELECT * FROM orders WHERE date >= :start_date"),
        conn,
        params={"start_date": "2024-01-01"}
    )

# Write data to database
df.to_sql(
    'processed_orders',
    engine,
    if_exists='append',
    index=False,
    chunksize=1000,
    method='multi'
)

ORM Pattern

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class Customer(Base):
    __tablename__ = 'customers'
    
    customer_id = Column(Integer, primary_key=True)
    first_name = Column(String(100), nullable=False)
    last_name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationship
    orders = relationship("Order", back_populates="customer")

class Order(Base):
    __tablename__ = 'orders'
    
    order_id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey('customers.customer_id'))
    amount = Column(Integer, nullable=False)  # Stored as cents
    status = Column(String(50), default='pending')
    
    customer = relationship("Customer", back_populates="orders")

# Query with ORM
with get_session() as session:
    # Get customer with orders
    customer = session.query(Customer).filter_by(customer_id=1).first()
    print(f"Customer: {customer.first_name} {customer.last_name}")
    print(f"Orders: {len(customer.orders)}")
    
    # Complex query
    high_value_orders = session.query(Order) \
        .join(Customer) \
        .filter(Order.amount > 10000) \
        .filter(Customer.country == 'USA') \
        .all()

psycopg2 for PostgreSQL

import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
import csv

class PostgresHandler:
    def __init__(self, host, database, user, password, port=5432):
        self.conn_params = {
            'host': host,
            'database': database,
            'user': user,
            'password': password,
            'port': port
        }
    
    def get_connection(self):
        return psycopg2.connect(**self.conn_params)
    
    def execute_query(self, query, params=None):
        """Execute a single query."""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, params)
                if cur.description:  # SELECT query
                    return cur.fetchall()
                conn.commit()
    
    def bulk_insert(self, table, columns, data, page_size=1000):
        """Efficient bulk insert using execute_values."""
        query = sql.SQL("INSERT INTO {} ({}) VALUES %s").format(
            sql.Identifier(table),
            sql.SQL(', ').join(map(sql.Identifier, columns))
        )
        
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                execute_values(cur, query, data, page_size=page_size)
                conn.commit()
    
    def copy_from_csv(self, table, filepath, delimiter=','):
        """Fast CSV import using COPY."""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                with open(filepath, 'r') as f:
                    cur.copy_expert(
                        sql.SQL("COPY {} FROM STDIN WITH CSV HEADER DELIMITER AS {}").format(
                            sql.Identifier(table),
                            sql.Literal(delimiter)
                        ),
                        f
                    )
                conn.commit()
    
    def upsert(self, table, columns, data, conflict_columns):
        """Insert or update on conflict."""
        insert_query = sql.SQL("""
            INSERT INTO {table} ({columns})
            VALUES %s
            ON CONFLICT ({conflict_cols})
            DO UPDATE SET {update_clause}
        """).format(
            table=sql.Identifier(table),
            columns=sql.SQL(', ').join(map(sql.Identifier, columns)),
            conflict_cols=sql.SQL(', ').join(map(sql.Identifier, conflict_columns)),
            update_clause=sql.SQL(', ').join(
                sql.SQL("{} = EXCLUDED.{}").format(
                    sql.Identifier(col), sql.Identifier(col)
                ) for col in columns if col not in conflict_columns
            )
        )
        
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_query, data, page_size=1000)
                conn.commit()

# Usage
db = PostgresHandler('localhost', 'mydb', 'user', 'password')

# Bulk insert
data = [
    (1, 'John', 'Doe', 'john@example.com'),
    (2, 'Jane', 'Smith', 'jane@example.com')
]
db.bulk_insert('users', ['id', 'first_name', 'last_name', 'email'], data)

# Upsert
db.upsert(
    'users',
    ['id', 'first_name', 'last_name', 'email'],
    data,
    conflict_columns=['email']
)

boto3 for AWS Services

import boto3
from botocore.exceptions import ClientError
import json
from io import BytesIO

class S3Handler:
    def __init__(self, bucket_name):
        self.bucket = bucket_name
        self.s3 = boto3.client('s3')
    
    def upload_file(self, local_path, s3_key):
        """Upload a file to S3."""
        self.s3.upload_file(local_path, self.bucket, s3_key)
    
    def upload_dataframe(self, df, s3_key, format='parquet'):
        """Upload a pandas DataFrame to S3."""
        buffer = BytesIO()
        
        if format == 'parquet':
            df.to_parquet(buffer, index=False)
        elif format == 'csv':
            df.to_csv(buffer, index=False)
        
        buffer.seek(0)
        self.s3.put_object(
            Bucket=self.bucket,
            Key=s3_key,
            Body=buffer.getvalue()
        )
    
    def read_parquet(self, s3_key):
        """Read a Parquet file from S3."""
        response = self.s3.get_object(Bucket=self.bucket, Key=s3_key)
        return pd.read_parquet(BytesIO(response['Body'].read()))
    
    def list_files(self, prefix=''):
        """List files with given prefix."""
        response = self.s3.list_objects_v2(
            Bucket=self.bucket,
            Prefix=prefix
        )
        return [obj['Key'] for obj in response.get('Contents', [])]
    
    def delete_file(self, s3_key):
        """Delete a file from S3."""
        self.s3.delete_object(Bucket=self.bucket, Key=s3_key)

# Usage
s3 = S3Handler('my-data-lake')

# Upload processed data
df = pd.read_csv('processed.csv')
s3.upload_dataframe(df, 'gold/processed_data/2024-01-01.parquet')

# Read from S3
df = s3.read_parquet('silver/orders/2024-01-01.parquet')

CLI Tools with Click

import click
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@click.group()
@click.option('--verbose', is_flag=True, help='Enable verbose logging')
def cli(verbose):
    """Data Engineering CLI Tool."""
    if verbose:
        logging.getLogger().setLevel(logging.DEBUG)

@cli.command()
@click.option('--source', required=True, help='Source database connection string')
@click.option('--target', required=True, help='Target table name')
@click.option('--date', type=click.DateTime(), default=datetime.now(), help='Processing date')
@click.option('--batch-size', default=1000, type=int, help='Batch size for processing')
def extract(source, target, date, batch_size):
    """Extract data from source database."""
    logger.info(f"Extracting data for {date.date()}")
    
    # Extraction logic here
    db = PostgresHandler.from_url(source)
    
    query = f"""
        SELECT * FROM orders 
        WHERE order_date = %s
    """
    data = db.execute_query(query, (date.date(),))
    
    logger.info(f"Extracted {len(data)} records")

@cli.command()
@click.argument('input_path')
@click.argument('output_path')
@click.option('--compression', type=click.Choice(['gzip', 'snappy', 'none']), default='snappy')
def transform(input_path, output_path, compression):
    """Transform raw data to clean format."""
    logger.info(f"Transforming {input_path} -> {output_path}")
    
    df = pd.read_parquet(input_path)
    
    # Transformations
    df = df.drop_duplicates(subset=['order_id'])
    df = df[df['amount'] > 0]
    df['processed_at'] = datetime.now()
    
    df.to_parquet(output_path, compression=compression, index=False)
    
    logger.info(f"Transformed {len(df)} records")

@cli.command()
@click.option('--table', required=True, help='Target table name')
@click.option('--data-path', required=True, help='Path to data file')
def load(table, data_path):
    """Load data into target table."""
    logger.info(f"Loading {data_path} into {table}")
    
    df = pd.read_parquet(data_path)
    
    engine = create_engine("postgresql://user:pass@host/db")
    df.to_sql(table, engine, if_exists='append', index=False, chunksize=1000)
    
    logger.info(f"Loaded {len(df)} records")

if __name__ == '__main__':
    cli()

Scheduling with APScheduler

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

scheduler = BlockingScheduler()

@scheduler.scheduled_job(CronTrigger(hour=2, minute=0))  # Daily at 2 AM
def daily_etl():
    """Run daily ETL pipeline."""
    logger.info(f"Starting daily ETL at {datetime.now()}")
    
    try:
        # Extract
        extract_data()
        # Transform
        transform_data()
        # Load
        load_data()
        logger.info("Daily ETL completed successfully")
    except Exception as e:
        logger.error(f"ETL failed: {e}")
        send_alert("ETL Pipeline Failed", str(e))

@scheduler.scheduled_job(CronTrigger(hour='*/6'))  # Every 6 hours
def incremental_sync():
    """Run incremental sync."""
    logger.info("Starting incremental sync")
    sync_incremental_data()

@scheduler.scheduled_job(CronTrigger(day_of_week='sun', hour=3))  # Weekly
def weekly_maintenance():
    """Weekly maintenance tasks."""
    logger.info("Running weekly maintenance")
    vacuum_analyze_tables()
    clean_old_files()

if __name__ == '__main__':
    scheduler.start()

Key Takeaways

  1. Virtual environments are mandatory — always isolate project dependencies
  2. Type hints improve maintainability — use them in all production code
  3. Pandas is essential — master DataFrame operations, groupby, and merge
  4. SQLAlchemy provides connection pooling — use it for production database access
  5. psycopg2 is faster for bulk operations — use COPY and execute_values
  6. boto3 is the AWS SDK — learn S3, EC2, and Lambda basics
  7. Click makes great CLI tools — build reusable data engineering scripts
  8. Schedule with APScheduler or Airflow — automate pipeline execution

Practice Exercises

  1. Pandas challenge: Write a function that reads a 10GB CSV file in chunks, performs aggregations, and writes the result to Parquet.

  2. Database handler: Create a PostgreSQL class that supports bulk insert, upsert, and data validation.

  3. CLI tool: Build a CLI tool that accepts a database connection string and generates a data profile report (column types, nulls, distributions).

  4. S3 data lake: Create a utility class that organizes S3 files into Bronze/Silver/Gold layers with proper partitioning.

  5. End-to-end pipeline: Build a complete ETL script that extracts from a database, transforms with pandas, validates with Pydantic, and loads to S3.

Advertisement

Need Expert Data Engineering Help?

Professional DE consulting, pipeline architecture, and data platform services.

Advertisement