CW

Advanced Pandas: Performance and Patterns

Module 2: NumPy & PandasFree Lesson

Advertisement

Advanced Pandas: Performance and Patterns

Master advanced Pandas — performance optimization, multi-index, and production patterns.

Advanced Pandas ArchitectureMultiIndexHierarchical indexingStack / UnstackCross-section slicingNamed levelsWindow FunctionsRolling (fixed window)Expanding (cumulative)EWMA (exponential)Time-aware offsetsPerformanceVectorize over applyAvoid Python loopsChunked processingParquet / PyArrowMethod ChainingFluent API pattern.pipe() for custom opsReadability + debuggabilityMemory OptimizationDtype downcastingCategory columnsSparse arraysProduction PatternsError handlingLogging and profilingSchema validation

MultiIndex (Hierarchical Indexing)

DfMultiIndex

A MultiIndex allows a DataFrame or Series to have multiple levels of indexing on any axis. This enables representation of higher-dimensional data in lower-dimensional structures. Mathematically, a MultiIndex creates a Cartesian product of index levels: textIndex=L1timesL2timescdotstimesLk\\text{Index} = L_1 \\times L_2 \\times \\cdots \\times L_k, where each LiL_i is a level.

import pandas as pd
import numpy as np

# Create MultiIndex from arrays
arrays = [
    ['North', 'North', 'South', 'South'],
    ['Q1', 'Q2', 'Q1', 'Q2']
]
mi = pd.MultiIndex.from_arrays(arrays, names=['region', 'quarter'])
df = pd.DataFrame({
    'revenue': [100, 120, 90, 110],
    'cost': [60, 70, 55, 65]
}, index=mi)
print(df)
#                revenue  cost
# region quarter
# North  Q1          100    60
#        Q2          120    70
# South  Q1           90    55
#        Q2          110    65

# From product
mi2 = pd.MultiIndex.from_product(
    [['A', 'B'], [1, 2, 3]],
    names=['letter', 'number']
)

MultiIndex Selection

# Cross-section: single level
print(df.loc['North'])           # All North data
print(df.loc['North'].loc['Q1']) # North, Q1

# Cross-section with xs()
print(df.xs('Q1', level='quarter'))

# Multiple levels
print(df.loc[('North', 'Q1')])   # Tuple for multiple levels

# Slice with IndexSlice
idx = pd.IndexSlice
print(df.loc[idx['North':'South', 'Q1':'Q2']])

# Drop/swap levels
print(df.droplevel('quarter'))
print(df.swaplevel('region', 'quarter'))

MultiIndex Aggregation

# Aggregate across levels
print(df.groupby(level='region').sum())
print(df.groupby(level='quarter').mean())

# Aggregate on specific level
print(df.groupby(level=['region', 'quarter']).agg({
    'revenue': 'sum',
    'cost': 'mean'
}))

# Unstack (long to wide)
unstacked = df.unstack(level='quarter')
print(unstacked)
#         revenue      cost
# quarter  Q1   Q2     Q1  Q2
# North     100  120    60  70
# South      90  110    55  65

# Stack (wide to long)
restacked = unstacked.stack()

MultiIndex vs SingleIndex with Multiple Columns

MultiIndex stores hierarchical relationships efficiently in memory. SingleIndex with extra columns requires filtering operations that are slower for cross-sectional queries. Use MultiIndex when you frequently slice across hierarchical dimensions.

Window Functions

DfRolling Window

A rolling window computes statistics over a fixed-size sliding window. For window size ww and position tt: textstat(t)=f(xtw+1,ldots,xt)\\text{stat}(t) = f(x_{t-w+1}, \\ldots, x_t). This is fundamental for time series smoothing, moving averages, and volatility calculations.

# Rolling window
df_ts = pd.DataFrame({
    'value': [10, 12, 14, 13, 16, 15, 18, 17, 20, 19]
}, index=pd.date_range('2024-01-01', periods=10, freq='D'))

# Simple moving average (SMA)
df_ts['sma_3'] = df_ts['value'].rolling(window=3).mean()
df_ts['sma_5'] = df_ts['value'].rolling(window=5).mean()

# Rolling statistics
df_ts['rolling_std'] = df_ts['value'].rolling(window=3).std()
df_ts['rolling_min'] = df_ts['value'].rolling(window=3).min()
df_ts['rolling_max'] = df_ts['value'].rolling(window=3).max()

# Custom rolling function
df_ts['rolling_range'] = df_ts['value'].rolling(window=3).apply(
    lambda x: x.max() - x.min(), raw=True
)

# Expanding window (cumulative from start)
df_ts['expanding_mean'] = df_ts['value'].expanding().mean()
df_ts['expanding_sum'] = df_ts['value'].expanding().sum()

# Exponentially weighted moving average (EWMA)
df_ts['ewma'] = df_ts['value'].ewm(span=3).mean()
print(df_ts)

Exponentially Weighted Moving Average

textEWMAt=alphacdotxt+(1alpha)cdottextEWMAt1\\text{EWMA}_t = \\alpha \\cdot x_t + (1 - \\alpha) \\cdot \\text{EWMA}_{t-1}

Here,

  • α\alpha=Smoothing factor: \alpha = \frac{2}{span + 1}
  • xtx_t=Value at time t
  • spanspan=Window parameter — larger span = smoother curve

Rolling Apply with Arguments

# Rolling apply with args
def weighted_mean(window, weights):
    return np.average(window, weights=weights)

weights = [1, 2, 3]  # More weight on recent values
df_ts['weighted_3'] = df_ts['value'].rolling(window=3).apply(
    weighted_mean, args=(weights,), raw=True
)

# Time-based rolling
df_ts['value'].rolling('3D').mean()  # 3-day rolling window

# Centered rolling (for smoothing)
df_ts['sma_3_centered'] = df_ts['value'].rolling(window=3, center=True).mean()

apply() vs Vectorized Operations

DfVectorization

Vectorization applies operations to entire arrays at once using optimized C routines. Vectorized operations are 10-100x faster than equivalent Python loops because they avoid per-element Python overhead (type checking, function calls, reference counting).

# SLOW: apply with lambda
df['result'] = df.apply(lambda row: row['a'] + row['b'], axis=1)

# FAST: vectorized
df['result'] = df['a'] + df['b']

# SLOW: apply for string operation
df['clean'] = df['name'].apply(lambda x: x.strip().lower())

# FAST: vectorized string methods
df['clean'] = df['name'].str.strip().str.lower()

# SLOW: conditional with apply
df['grade'] = df.apply(
    lambda row: 'A' if row['score'] >= 90 else 'B' if row['score'] >= 80 else 'C',
    axis=1
)

# FAST: np.where (vectorized)
df['grade'] = np.where(df['score'] >= 90, 'A',
               np.where(df['score'] >= 80, 'B', 'C'))

When apply Is Acceptable

apply() is appropriate when:

  1. The operation is inherently row-wise (depends on complex logic across multiple columns)
  2. The function cannot be expressed as vectorized operations
  3. You're using it once during preprocessing, not in a hot loop
  4. The dataset is small enough that performance isn't critical

Always check if a vectorized alternative exists first. Use %timeit to benchmark.

Performance Benchmark

import pandas as pd
import numpy as np
import time

n = 1_000_000
df = pd.DataFrame({
    'a': np.random.randn(n),
    'b': np.random.randn(n),
    'c': np.random.choice(['X', 'Y', 'Z'], n)
})

# Method 1: apply (slow)
start = time.time()
df['result_apply'] = df.apply(lambda row: row['a'] * 2 + row['b'], axis=1)
t_apply = time.time() - start

# Method 2: vectorized (fast)
start = time.time()
df['result_vec'] = df['a'] * 2 + df['b']
t_vec = time.time() - start

# Method 3: np.where (fast, conditional)
start = time.time()
df['result_cond'] = np.where(df['a'] > 0, df['a'] + df['b'], df['a'] - df['b'])
t_cond = time.time() - start

print(f"apply:      {t_apply:.3f}s")
print(f"vectorized: {t_vec:.6f}s")
print(f"np.where:   {t_cond:.6f}s")
# Typical output:
# apply:      2.340s
# vectorized: 0.003s
# np.where:   0.004s

Method Chaining

DfMethod Chaining

Method chaining is a programming pattern where multiple methods are called sequentially on the same object, with each method returning an object that the next method can act on. In Pandas, this creates a readable, pipeline-like flow of transformations.

# Without chaining (hard to read)
df1 = df.dropna()
df2 = df1[df1['age'] > 25]
df3 = df2.sort_values('salary', ascending=False)
df4 = df3.head(10)

# With chaining (readable pipeline)
result = (
    df.dropna()
      .query('age > 25')
      .sort_values('salary', ascending=False)
      .head(10)
)

# With pipe() for custom functions
def filter_by_quantile(df, column, lower=0.25, upper=0.75):
    q_low = df[column].quantile(lower)
    q_high = df[column].quantile(upper)
    return df[df[column].between(q_low, q_high)]

def add_features(df):
    df = df.copy()
    df['salary_per_age'] = df['salary'] / df['age']
    df['is_high_earner'] = df['salary'] > df['salary'].median()
    return df

result = (
    df
      .pipe(filter_by_quantile, 'age', 0.25, 0.75)
      .pipe(add_features)
      .sort_values('salary_per_age', ascending=False)
      .head(10)
)

# Advanced: pipe with debugging
def log_shape(df, name="step"):
    print(f"[{name}] Shape: {df.shape}")
    return df

result = (
    df
      .pipe(log_shape, "start")
      .dropna()
      .pipe(log_shape, "after dropna")
      .query('age > 25')
      .pipe(log_shape, "after filter")
)

pipe() for Composability

pipe() enables passing DataFrames through a chain of functions that don't belong to Pandas. It makes the pipeline testable — each function can be unit-tested independently. Use pipe() when you need to intersperse custom logic with Pandas methods.

Memory Optimization

Memory Usage by Dtype (bytes per element)int64: 8 bytesfloat64: 8 bytesobject: 8+ bytescategory: ~1 byteint8: 1 byteExample: 1M rows, 10 columnsint64/object: ~80 MB → optimized: ~10-15 MB5-8x memory reduction with dtype optimization
def optimize_memory(df):
    """Optimize DataFrame memory usage."""
    start_mem = df.memory_usage(deep=True).sum() / 1024**2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type == 'int64':
            if df[col].min() >= -128 and df[col].max() <= 127:
                df[col] = df[col].astype('int8')
            elif df[col].min() >= -32768 and df[col].max() <= 32767:
                df[col] = df[col].astype('int16')
            elif df[col].min() >= -2**31 and df[col].max() <= 2**31-1:
                df[col] = df[col].astype('int32')

        elif col_type == 'float64':
            if df[col].min() >= np.finfo(np.float32).min:
                df[col] = df[col].astype('float32')

        elif col_type == 'object':
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype('category')

    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    print(f"Memory: {start_mem:.1f} MB → {end_mem:.1f} MB ({100*(start_mem-end_mem)/start_mem:.1f}% reduction)")

    return df

# Usage
df = optimize_memory(df)

Sparse Arrays

# For DataFrames with many zeros/NaN values
from scipy.sparse import csr_matrix

# Sparse DataFrame
df_sparse = pd.DataFrame({
    'A': pd.arrays.SparseArray([0, 0, 0, 1, 0, 0, 2, 0]),
    'B': pd.arrays.SparseArray([0.0, np.nan, 0.0, 3.14, 0.0, 0.0, 2.71, 0.0])
})
print(df_sparse.memory_usage(deep=True).sum())  # Much less than dense

# Convert to scipy sparse for ML
sparse_matrix = csr_matrix(df.values)

Chunked Processing

# Process large files in chunks
chunk_size = 100_000
results = []

for chunk in pd.read_csv('large_data.csv', chunksize=chunk_size):
    # Process each chunk
    filtered = chunk[chunk['value'] > 0]
    summary = filtered.groupby('category')['value'].sum()
    results.append(summary)

# Combine results
final = pd.concat(results).groupby(level=0).sum()

# Alternative: use pyarrow for faster CSV reading
df = pd.read_csv('large_data.csv', engine='pyarrow')

# Or read in parquet format (columnar, compressed)
df = pd.read_parquet('large_data.parquet', columns=['col1', 'col2'])  # Column pruning

Chunked Processing Trade-offs

Chunking trades memory for complexity. Each chunk is processed independently, so you must handle:

  1. Aggregation: Accumulate results across chunks (requires manual combination)
  2. Filtering: Simple — just concatenate filtered chunks
  3. Transformations: Column-wise transforms work per-chunk; row-wise may need full data
  4. Order-dependent operations: Sorting across chunks requires full data or special handling

Production Patterns

Error Handling

def safe_transform(df, col, func, default=np.nan):
    """Apply function with error handling."""
    try:
        result = func(df[col])
        if len(result) != len(df):
            raise ValueError(f"Length mismatch: {len(result)} != {len(df)}")
        return result
    except Exception as e:
        print(f"Warning: {col}: {e}. Using default.")
        return pd.Series(default, index=df.index)

# Usage
df['result'] = safe_transform(df, 'price', lambda x: x.astype(float))

Schema Validation

def validate_schema(df, schema):
    """Validate DataFrame against expected schema."""
    errors = []

    for col, expected in schema.items():
        if col not in df.columns:
            errors.append(f"Missing column: {col}")
            continue

        if 'dtype' in expected and str(df[col].dtype) != expected['dtype']:
            errors.append(f"{col}: expected {expected['dtype']}, got {df[col].dtype}")

        if 'min' in expected and df[col].min() < expected['min']:
            errors.append(f"{col}: min value {df[col].min()} < {expected['min']}")

        if 'max' in expected and df[col].max() > expected['max']:
            errors.append(f"{col}: max value {df[col].max()} > {expected['max']}")

        if 'not_null' in expected and expected['not_null'] and df[col].isnull().any():
            errors.append(f"{col}: contains null values")

    return errors

# Usage
schema = {
    'age': {'dtype': 'int64', 'min': 0, 'max': 120, 'not_null': True},
    'salary': {'dtype': 'float64', 'min': 0, 'not_null': True},
    'name': {'dtype': 'object', 'not_null': True}
}

errors = validate_schema(df, schema)
if errors:
    raise ValueError(f"Schema validation failed:\n" + "\n".join(errors))

Logging and Profiling

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def profile_dataframe(df, name="DataFrame"):
    """Profile a DataFrame for data quality."""
    logger.info(f"Profiling {name}:")
    logger.info(f"  Shape: {df.shape}")
    logger.info(f"  Memory: {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
    logger.info(f"  Missing: {df.isnull().sum().sum()} ({df.isnull().mean().mean()*100:.1f}%)")
    logger.info(f"  Duplicates: {df.duplicated().sum()}")

    for col in df.columns:
        missing_pct = df[col].isnull().mean() * 100
        if missing_pct > 0:
            logger.warning(f"  {col}: {missing_pct:.1f}% missing")

    return df

# Usage in pipeline
result = (
    df
      .pipe(profile_dataframe, "raw data")
      .drop_duplicates()
      .pipe(profile_dataframe, "after dedup")
)

Performance Tips

Pandas Performance Cheat Sheet

PatternSpeedMemoryWhen to Use
Vectorized ops⚡⚡⚡⚡⚡⚡Default choice
np.where / np.select⚡⚡⚡⚡⚡⚡Conditional logic
str methods⚡⚡⚡⚡⚡String operations
apply()⚡⚡⚡Complex row-wise logic
itertuples()⚡⚡⚡When apply is too slow
eval()⚡⚡⚡⚡⚡Complex expressions
Chained indexing⚡⚡⚡Avoid — SettingWithCopyWarning
# Fast conditional selection
conditions = [df['a'] > 0, df['b'] > 0, df['c'] == 'X']
choices = ['positive_a', 'positive_b', 'is_X']
df['category'] = np.select(conditions, choices, default='other')

# eval() for complex expressions (avoids temporary arrays)
df['result'] = df.eval('a * b + c / (a + b + 1)')

# query() for filtering (avoids creating boolean mask)
result = df.query('a > 0 and b > 0 and c == "X"')

# Cache column access
name = df['name']  # Cache reference
result = name.str.strip().str.lower()  # Avoid repeated __getitem__

Key Takeaways

Summary: Advanced Pandas

  1. MultiIndex enables hierarchical indexing for multi-dimensional data — use xs() and IndexSlice for efficient cross-sectional queries
  2. Window functions: rolling() for fixed windows, expanding() for cumulative, ewm() for exponential weighting
  3. Vectorize operations — apply() is 10-100x slower than vectorized equivalents; always check for vectorized alternatives first
  4. Method chaining with pipe() creates readable, testable data pipelines
  5. Memory optimization: downcast dtypes, use category for low-cardinality strings, sparse for many zeros
  6. Chunked processing handles datasets larger than RAM — process in pieces and combine
  7. Production patterns: validate schemas, handle errors gracefully, profile data quality, log transformations
  8. Prefer Parquet over CSV for storage (columnar, compressed, fast reads with column pruning)

Practice Exercise

  1. Create a MultiIndex DataFrame with 3 levels (country, city, year). Compute year-over-year growth rate for each city using groupby() and pct_change().
  2. Implement a rolling window function that computes both SMA and EWMA, then generates buy/sell signals when SMA crosses EWMA.
  3. Benchmark apply() vs vectorized vs np.select() for a conditional column creation with 5 conditions on a 1M row DataFrame. Report the speedup factor.
  4. Build a production-ready ETL pipeline using method chaining: validate schema → clean data → transform → profile → export. Each step should log its results.
  5. Optimize a large DataFrame (10M rows) by downcasting dtypes and converting strings to categories. Measure memory savings and verify data integrity.

Advertisement

Need Expert Data Science Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement