Performance Optimization in Python
Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
Profiling and Benchmarking
import cProfile
import time
import functools
from typing import Callable, Any
from collections import defaultdict
import tracemalloc
# Simple timing decorator
def timing_decorator(func: Callable) -> Callable:
"""Measure execution time."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__}: {end - start:.6f}s")
return result
return wrapper
# Memory profiling decorator
def memory_profile(func: Callable) -> Callable:
"""Measure memory usage."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"{func.__name__} memory: {current / 1024 / 1024:.2f} MB (peak: {peak / 1024 / 1024:.2f} MB)")
return result
return wrapper
# Advanced profiler
class PerformanceProfiler:
"""Comprehensive performance profiler."""
def __init__(self):
self.metrics = defaultdict(list)
def profile(self, func: Callable) -> Callable:
"""Profile function execution."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Time profiling
start_time = time.perf_counter()
# Memory profiling
tracemalloc.start()
result = func(*args, **kwargs)
# Collect metrics
end_time = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.metrics[func.__name__].append({
'time': end_time - start_time,
'memory_current': current,
'memory_peak': peak,
'args': str(args)[:100],
'kwargs': str(kwargs)[:100]
})
return result
return wrapper
def get_stats(self, func_name: str = None) -> dict:
"""Get profiling statistics."""
if func_name:
return self._calculate_stats(self.metrics[func_name])
return {name: self._calculate_stats(metrics)
for name, metrics in self.metrics.items()}
def _calculate_stats(self, metrics: list) -> dict:
"""Calculate statistics for a function."""
times = [m['time'] for m in metrics]
memories = [m['memory_current'] for m in metrics]
return {
'calls': len(metrics),
'total_time': sum(times),
'avg_time': sum(times) / len(times) if times else 0,
'min_time': min(times) if times else 0,
'max_time': max(times) if times else 0,
'avg_memory': sum(memories) / len(memories) if memories else 0,
'peak_memory': max(m['memory_peak'] for m in metrics) if metrics else 0
}
def print_report(self):
"""Print profiling report."""
print("\n=== Performance Report ===")
for name, stats in self.get_stats().items():
print(f"\n{name}:")
print(f" Calls: {stats['calls']}")
print(f" Time: {stats['total_time']:.6f}s (avg: {stats['avg_time']:.6f}s)")
print(f" Memory: {stats['avg_memory'] / 1024 / 1024:.2f} MB (peak: {stats['peak_memory'] / 1024 / 1024:.2f} MB)")
# Usage
profiler = PerformanceProfiler()
@profiler.profile
def fibonacci_recursive(n: int) -> int:
"""Recursive fibonacci - slow for large n."""
if n <= 1:
return n
return fibonacci_recursive(n - 1) + fibonacci_recursive(n - 2)
@profiler.profile
def fibonacci_memoized(n: int, memo: dict = None) -> int:
"""Memoized fibonacci - much faster."""
if memo is None:
memo = {}
if n in memo:
return memo[n]
if n <= 1:
return n
memo[n] = fibonacci_memoized(n - 1, memo) + fibonacci_memoized(n - 2, memo)
return memo[n]
# Test
fibonacci_recursive(30)
fibonacci_memoized(30)
profiler.print_report()
βΉοΈ
Always profile before optimizing. Focus on the hot paths that consume most time or memory.
Algorithmic Optimization
from typing import List, Dict
from collections import defaultdict, Counter
import bisect
# Bad: O(n^2) solution
def two_sum_slow(nums: List[int], target: int) -> List[int]:
"""Brute force two sum - O(n^2)."""
for i in range(len(nums)):
for j in range(i + 1, len(nums)):
if nums[i] + nums[j] == target:
return [i, j]
return []
# Good: O(n) solution
def two_sum_fast(nums: List[int], target: int) -> List[int]:
"""Hash map two sum - O(n)."""
num_map = {}
for i, num in enumerate(nums):
complement = target - num
if complement in num_map:
return [num_map[complement], i]
num_map[num] = i
return []
# Bad: O(n^2) for sorted array search
def search_pair_slow(sorted_nums: List[int], target: int) -> List[int]:
"""Brute force pair search in sorted array."""
for i in range(len(sorted_nums)):
for j in range(i + 1, len(sorted_nums)):
if sorted_nums[i] + sorted_nums[j] == target:
return [i, j]
return []
# Good: O(n) with two pointers
def search_pair_fast(sorted_nums: List[int], target: int) -> List[int]:
"""Two pointer pair search in sorted array."""
left, right = 0, len(sorted_nums) - 1
while left < right:
current_sum = sorted_nums[left] + sorted_nums[right]
if current_sum == target:
return [left, right]
elif current_sum < target:
left += 1
else:
right -= 1
return []
# Subarray optimization
def max_subarray_brute(nums: List[int]) -> int:
"""Brute force maximum subarray - O(n^2)."""
max_sum = float('-inf')
for i in range(len(nums)):
current_sum = 0
for j in range(i, len(nums)):
current_sum += nums[j]
max_sum = max(max_sum, current_sum)
return max_sum
def max_subarray_kadane(nums: List[int]) -> int:
"""Kadane's algorithm - O(n)."""
max_sum = current_sum = nums[0]
for num in nums[1:]:
current_sum = max(num, current_sum + num)
max_sum = max(max_sum, current_sum)
return max_sum
# String concatenation optimization
def build_string_slow(parts: List[str]) -> str:
"""Slow string building - O(n^2) due to immutability."""
result = ""
for part in parts:
result += part # Creates new string each time
return result
def build_string_fast(parts: List[str]) -> str:
"""Fast string building - O(n) with join."""
return "".join(parts) # Single allocation
# List operations optimization
def find_duplicates_slow(nums: List[int]) -> List[int]:
"""Find duplicates - O(n^2)."""
duplicates = []
for i in range(len(nums)):
for j in range(i + 1, len(nums)):
if nums[i] == nums[j] and nums[i] not in duplicates:
duplicates.append(nums[i])
return duplicates
def find_duplicates_fast(nums: List[int]) -> List[int]:
"""Find duplicates - O(n) with hash set."""
seen = set()
duplicates = set()
for num in nums:
if num in seen:
duplicates.add(num)
else:
seen.add(num)
return list(duplicates)
Caching Strategies
from functools import lru_cache, wraps
from typing import Callable, Any, Optional
import time
from collections import OrderedDict
import hashlib
import json
# Simple LRU Cache
class LRUCache:
"""LRU Cache with TTL support."""
def __init__(self, maxsize: int = 128, ttl: float = 300):
self.maxsize = maxsize
self.ttl = ttl
self.cache = OrderedDict()
self.timestamps = {}
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
if key in self.cache:
# Check TTL
if time.time() - self.timestamps[key] < self.ttl:
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
else:
# Expired
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key: str, value: Any):
"""Set value in cache."""
if key in self.cache:
self.cache.move_to_end(key)
elif len(self.cache) >= self.maxsize:
# Remove oldest
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
del self.timestamps[oldest_key]
self.cache[key] = value
self.timestamps[key] = time.time()
def invalidate(self, key: str):
"""Remove key from cache."""
if key in self.cache:
del self.cache[key]
del self.timestamps[key]
def clear(self):
"""Clear all cache."""
self.cache.clear()
self.timestamps.clear()
# Function-level caching
def cache_with_ttl(ttl: float = 300):
"""Decorator for caching function results with TTL."""
cache = {}
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
key = hashlib.md5(
json.dumps((args, kwargs), sort_keys=True, default=str).encode()
).hexdigest()
# Check cache
if key in cache:
cached_time, cached_result = cache[key]
if time.time() - cached_time < ttl:
return cached_result
# Execute and cache
result = func(*args, **kwargs)
cache[key] = (time.time(), result)
return result
wrapper.cache = cache
wrapper.cache_clear = lambda: cache.clear()
return wrapper
return decorator
# Usage
@cache_with_ttl(ttl=60)
def expensive_computation(n: int) -> int:
"""Expensive computation with caching."""
time.sleep(0.1) # Simulate work
return n ** 2
# Built-in LRU cache
@lru_cache(maxsize=128)
def fibonacci_cached(n: int) -> int:
"""Fibonacci with built-in LRU cache."""
if n <= 1:
return n
return fibonacci_cached(n - 1) + fibonacci_cached(n - 2)
# Cache statistics
print(fibonacci_cached.cache_info())
β οΈ
Cache invalidation is one of the hardest problems in computer science. Always consider when cached data becomes stale.
Memory Optimization
import sys
from typing import List, Any
import array
class MemoryOptimizer:
"""Memory optimization techniques."""
@staticmethod
def compare_data_structures():
"""Compare memory usage of different structures."""
n = 100000
# List vs array vs generator
list_data = [i for i in range(n)]
array_data = array.array('i', range(n))
generator_data = (i for i in range(n))
print(f"List: {sys.getsizeof(list_data)} bytes")
print(f"Array: {sys.getsizeof(array_data)} bytes")
print(f"Generator: {sys.getsizeof(generator_data)} bytes")
# Dict vs slots
class RegularClass:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
class SlotClass:
__slots__ = ['x', 'y', 'z']
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
regular = RegularClass(1, 2, 3)
slotted = SlotClass(1, 2, 3)
print(f"Regular class: {sys.getsizeof(regular) + sys.getsizeof(regular.__dict__)} bytes")
print(f"Slotted class: {sys.getsizeof(slotted)} bytes")
class OptimizedList:
"""Memory-efficient list using array."""
def __init__(self, item_type: str = 'i'):
self.data = array.array(item_type)
def append(self, item):
self.data.append(item)
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)
def __iter__(self):
return iter(self.data)
class ObjectPool:
"""Object pool for reducing allocation overhead."""
def __init__(self, factory, max_size: int = 100):
self.factory = factory
self.max_size = max_size
self.pool = []
def acquire(self):
"""Get object from pool."""
if self.pool:
return self.pool.pop()
return self.factory()
def release(self, obj):
"""Return object to pool."""
if len(self.pool) < self.max_size:
self.pool.append(obj)
def __enter__(self):
return self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release(self)
return False
# Memory-efficient data processing
def process_large_file_memory_efficient(filename: str):
"""Process large file without loading entire content."""
with open(filename, 'r') as f:
for line in f: # Generator - one line at a time
yield line.strip()
def chunked_processing(data, chunk_size: int = 1000):
"""Process data in chunks."""
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
# Usage
MemoryOptimizer.compare_data_structures()
# Object pool example
pool = ObjectPool(lambda: {"data": [], "metadata": {}}, max_size=10)
with pool as obj:
obj["data"].append(1)
print(obj)
Concurrency Optimization
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import List
import time
# IO-bound: Use threading or asyncio
async def fetch_data_async(urls: List[str]) -> List[dict]:
"""Async fetching for IO-bound tasks."""
async def fetch_one(url):
await asyncio.sleep(0.1) # Simulate network delay
return {"url": url, "data": "response"}
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
def fetch_data_threaded(urls: List[str]) -> List[dict]:
"""Thread-based fetching for IO-bound tasks."""
def fetch_one(url):
time.sleep(0.1) # Simulate network delay
return {"url": url, "data": "response"}
with ThreadPoolExecutor(max_workers=10) as executor:
return list(executor.map(fetch_one, urls))
# CPU-bound: Use multiprocessing
def cpu_intensive_task(n: int) -> int:
"""CPU-intensive computation."""
return sum(i * i for i in range(n))
def parallel_cpu_tasks(numbers: List[int]) -> List[int]:
"""Parallel processing for CPU-bound tasks."""
with ProcessPoolExecutor(max_workers=4) as executor:
return list(executor.map(cpu_intensive_task, numbers))
# Hybrid approach
class AdaptiveExecutor:
"""Automatically choose between threading and processing."""
def __init__(self, io_workers: int = 10, cpu_workers: int = 4):
self.io_executor = ThreadPoolExecutor(max_workers=io_workers)
self.cpu_executor = ProcessPoolExecutor(max_workers=cpu_workers)
def execute_io(self, func, *args, **kwargs):
"""Execute IO-bound task."""
return self.io_executor.submit(func, *args, **kwargs)
def execute_cpu(self, func, *args, **kwargs):
"""Execute CPU-bound task."""
return self.cpu_executor.submit(func, *args, **kwargs)
def shutdown(self):
"""Shutdown both executors."""
self.io_executor.shutdown()
self.cpu_executor.shutdown()
# Benchmarking
def benchmark(func, *args, iterations: int = 100, **kwargs):
"""Benchmark function execution."""
times = []
for _ in range(iterations):
start = time.perf_counter()
func(*args, **kwargs)
end = time.perf_counter()
times.append(end - start)
return {
'avg': sum(times) / len(times),
'min': min(times),
'max': max(times),
'total': sum(times)
}
# Example benchmarks
def sequential_sum(n: int) -> int:
return sum(range(n))
def parallel_sum(n: int) -> int:
chunk_size = n // 4
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(sum, range(i, i + chunk_size))
for i in range(0, n, chunk_size)]
return sum(f.result() for f in futures)
# Compare
seq_stats = benchmark(sequential_sum, 1000000, iterations=10)
par_stats = benchmark(parallel_sum, 1000000, iterations=10)
print(f"Sequential: {seq_stats['avg']:.6f}s")
print(f"Parallel: {par_stats['avg']:.6f}s")
βΉοΈ
Profile first, then optimize. The fastest code is the code that doesn't run.
Follow-Up Questions
-
Explain the difference between threading and multiprocessing.
-
When would you use asyncio over threading?
-
How do you optimize Python code for memory efficiency?
-
What are the common performance pitfalls in Python?
-
Explain the concept of lazy evaluation and its benefits.