Threading & Multiprocessing in Python
Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
Understanding the GIL
import threading
import multiprocessing
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# GIL Impact Demonstration
def cpu_bound_task(n):
"""CPU-intensive task affected by GIL."""
total = 0
for i in range(n):
total += i * i
return total
def io_bound_task():
"""IO-bound task not affected by GIL."""
time.sleep(1)
return "IO complete"
# Threading - Good for IO-bound tasks
def threading_demo():
"""Threading doesn't help CPU-bound tasks due to GIL."""
start = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(cpu_bound_task, 10**7) for _ in range(2)]
results = [f.result() for f in futures]
end = time.time()
print(f"Threading: {end - start:.2f}s") # ~2x slower than single thread
return results
# Multiprocessing - Good for CPU-bound tasks
def multiprocessing_demo():
"""Multiprocessing bypasses GIL for CPU-bound tasks."""
start = time.time()
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(cpu_bound_task, 10**7) for _ in range(2)]
results = [f.result() for f in futures]
end = time.time()
print(f"Multiprocessing: {end - start:.2f}s") # ~1x time with 2 cores
return results
βΉοΈ
Use threading for IO-bound tasks (network calls, file I/O). Use multiprocessing for CPU-bound tasks (data processing, computations).
Thread Synchronization
Lock and RLock
import threading
from contextlib import contextmanager
class SafeCounter:
"""Thread-safe counter using Lock."""
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def get_value(self):
with self._lock:
return self._value
class BankAccount:
"""Thread-safe bank account with reentrant lock."""
def __init__(self, balance: float = 0):
self._balance = balance
self._lock = threading.RLock() # Reentrant lock
@property
def balance(self):
with self._lock:
return self._balance
def deposit(self, amount: float):
with self._lock:
if amount <= 0:
raise ValueError("Deposit must be positive")
self._balance += amount
return self._balance
def withdraw(self, amount: float):
with self._lock:
if amount <= 0:
raise ValueError("Withdrawal must be positive")
if amount > self._balance:
raise ValueError("Insufficient funds")
self._balance -= amount
return self._balance
def transfer(self, other: 'BankAccount', amount: float):
"""Transfer money between accounts with deadlock prevention."""
# Always lock in consistent order (by id)
first, second = (self, other) if id(self) < id(other) else (other, self)
with first._lock:
with second._lock:
self.withdraw(amount)
other.deposit(amount)
β οΈ
Always acquire locks in a consistent order to prevent deadlocks. Use RLock when a thread may need to acquire the same lock multiple times.
Condition Variables and Semaphores
import threading
import time
from collections import deque
class BoundedBuffer:
"""Producer-consumer pattern using Condition variables."""
def __init__(self, capacity: int):
self.buffer = deque()
self.capacity = capacity
self.not_full = threading.Condition(threading.Lock())
self.not_empty = threading.Condition(threading.Lock())
def put(self, item):
with self.not_full:
while len(self.buffer) >= self.capacity:
self.not_full.wait()
self.buffer.append(item)
self.not_empty.notify()
def get(self):
with self.not_empty:
while len(self.buffer) == 0:
self.not_empty.wait()
item = self.buffer.popleft()
self.not_full.notify()
return item
class ConnectionPool:
"""Connection pool using Semaphore."""
def __init__(self, max_connections: int):
self.semaphore = threading.Semaphore(max_connections)
self.connections = []
self._lock = threading.Lock()
def get_connection(self):
"""Get a connection from the pool."""
self.semaphore.acquire()
with self._lock:
if self.connections:
return self.connections.pop()
return self._create_connection()
def release_connection(self, conn):
"""Return connection to the pool."""
with self._lock:
self.connections.append(conn)
self.semaphore.release()
def _create_connection(self):
"""Create a new database connection."""
return {"id": len(self.connections), "active": True}
# Usage
pool = ConnectionPool(max_connections=5)
def worker(worker_id):
conn = pool.get_connection()
try:
print(f"Worker {worker_id} using connection {conn['id']}")
time.sleep(0.1)
finally:
pool.release_connection(conn)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
Multiprocessing Patterns
Process Pool for Parallel Processing
from multiprocessing import Pool, Queue, Process, Manager
from functools import partial
import os
def worker_function(data_chunk, multiplier):
"""Worker function for parallel processing."""
pid = os.getpid()
result = [x * multiplier for x in data_chunk]
return pid, result
def parallel_map_example():
"""Demonstrate parallel map with process pool."""
data = list(range(20))
chunk_size = 5
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
with Pool(processes=4) as pool:
func = partial(worker_function, multiplier=2)
results = pool.map(func, chunks)
for pid, chunk_result in results:
print(f"PID {pid}: {chunk_result}")
class ProgressTracker:
"""Track progress across multiple processes."""
def __init__(self):
self.manager = Manager()
self.progress = self.manager.dict()
self.lock = self.manager.Lock()
def update(self, task_id, progress):
with self.lock:
self.progress[task_id] = progress
def get_progress(self):
with self.lock:
return dict(self.progress)
def process_with_progress(task_id, data, tracker):
"""Process data with progress tracking."""
for i, item in enumerate(data):
# Process item
time.sleep(0.01)
tracker.update(task_id, (i + 1) / len(data))
return f"Task {task_id} complete"
Inter-Process Communication
from multiprocessing import Process, Queue, Pipe
import time
def producer(queue, n):
"""Producer putting items in queue."""
for i in range(n):
item = {"id": i, "data": f"item_{i}"}
queue.put(item)
print(f"Produced: {item}")
time.sleep(0.1)
queue.put(None) # Sentinel value
def consumer(queue):
"""Consumer getting items from queue."""
while True:
item = queue.get()
if item is None:
break
print(f"Consumed: {item}")
def pipe_communication():
"""Demonstrate bidirectional communication with Pipe."""
parent_conn, child_conn = Pipe()
def sender(connection):
for i in range(5):
connection.send({"message": f"Hello {i}"})
connection.close()
def receiver(connection):
while True:
try:
msg = connection.recv()
print(f"Received: {msg}")
except EOFError:
break
p1 = Process(target=sender, args=(parent_conn,))
p2 = Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
# Shared Memory
from multiprocessing import Array, Value
def shared_memory_example():
"""Demonstrate shared memory between processes."""
counter = Value('i', 0) # Shared integer
array = Array('d', [0.0] * 5) # Shared array of doubles
def increment_counter(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1
def update_array(array, index, value):
array[index] = value
processes = [
Process(target=increment_counter, args=(counter, 1000)),
Process(target=increment_counter, args=(counter, 1000)),
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Final counter: {counter.value}") # 2000
Thread Safety Patterns
Thread-Local Storage
import threading
from typing import Dict, Any
class ThreadContext:
"""Thread-local storage for request context."""
def __init__(self):
self._local = threading.local()
def get(self, key: str, default: Any = None) -> Any:
return getattr(self._local, key, default)
def set(self, key: str, value: Any):
setattr(self._local, key, value)
def clear(self):
self._local.__dict__.clear()
# Global context
context = ThreadContext()
def handle_request(request_id):
"""Simulate handling a request."""
context.set("request_id", request_id)
context.set("user", f"user_{request_id}")
# Process request
time.sleep(0.1)
# Access context
print(f"Request {context.get('request_id')} handled by {context.get('user')}")
context.clear()
# Multiple threads with separate contexts
threads = [threading.Thread(target=handle_request, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
Producer-Consumer with Queue
from queue import Queue, Empty
from threading import Thread, Event
class ThreadPool:
"""Simple thread pool implementation."""
def __init__(self, num_workers: int):
self.tasks = Queue()
self.workers = []
self.shutdown_event = Event()
for _ in range(num_workers):
worker = Thread(target=self._worker_loop, daemon=True)
worker.start()
self.workers.append(worker)
def _worker_loop(self):
"""Worker thread main loop."""
while not self.shutdown_event.is_set():
try:
task = self.tasks.get(timeout=0.1)
try:
task()
except Exception as e:
print(f"Task error: {e}")
finally:
self.tasks.task_done()
except Empty:
continue
def submit(self, task):
"""Submit a task to the pool."""
self.tasks.put(task)
def shutdown(self, wait=True):
"""Shutdown the thread pool."""
self.shutdown_event.set()
if wait:
self.tasks.join()
# Usage
def sample_task(task_id):
print(f"Task {task_id} running in {threading.current_thread().name}")
time.sleep(0.1)
pool = ThreadPool(num_workers=3)
for i in range(10):
pool.submit(lambda i=i: sample_task(i))
pool.shutdown()
βΉοΈ
For production systems, consider using
concurrent.futures ThreadPoolExecutor or ProcessPoolExecutor instead of manual thread management.Follow-Up Questions
-
Explain the Global Interpreter Lock (GIL) and its impact on Python concurrency.
-
When would you use threading vs multiprocessing vs asyncio?
-
How do you prevent deadlocks in multi-threaded applications?
-
Explain the difference between Lock and RLock.
-
How does Python's garbage collector interact with multi-threaded programs?