Python Concurrency — Threading & Multiprocessing
Concurrency lets you do multiple things at once. Python offers threading for I/O-bound tasks, multiprocessing for CPU-bound tasks, and the GIL (Global Interpreter Lock) affects which approach works best.
Learning Objectives
- Use threading for I/O-bound tasks like network requests and file operations
- Use multiprocessing for CPU-bound tasks like data processing and calculations
- Understand the GIL and its implications for performance
- Apply ThreadPoolExecutor and ProcessPoolExecutor for clean concurrency
- Handle race conditions with locks and thread-safe data structures
- Choose the right concurrency approach for your use case
Threading Basics
Threads share the same memory space, making them lightweight but subject to the GIL:
import threading
import time
def download(url):
"""Simulate downloading a file."""
print(f"Starting download from {url}")
time.sleep(2) # Simulate network I/O
print(f"Completed download from {url}")
return f"Data from {url}"
# Create and start threads
urls = ["url1", "url2", "url3", "url4"]
threads = []
for url in urls:
t = threading.Thread(target=download, args=(url,))
threads.append(t)
t.start() # Start the thread
# Wait for all threads to complete
for t in threads:
t.join()
print("All downloads completed")
# Total time: ~2 seconds (concurrent) vs ~8 seconds (sequential)
Thread with Return Values
import threading
class ResultThread(threading.Thread):
"""Thread that stores return value."""
def __init__(self, target, args=()):
super().__init__()
self.target = target
self.args = args
self.result = None
def run(self):
self.result = self.target(*self.args)
def compute(n):
"""Simulate CPU work."""
time.sleep(1)
return n * n
# Usage
threads = [ResultThread(target=compute, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
results = [t.result for t in threads]
print(results) # [0, 1, 4, 9, 16]
ThreadPoolExecutor
The concurrent.futures module provides a high-level interface for running concurrent tasks:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch_url(url):
"""Fetch a URL and return status code."""
response = requests.get(url, timeout=10)
return {'url': url, 'status': response.status_code, 'length': len(response.text)}
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
"https://httpbin.org/delay/1"
]
# Basic usage with map
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
for result in results:
print(f"{result['url']}: {result['status']}")
Handling Results as They Complete
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def download_file(url):
"""Download file and return its content."""
response = requests.get(url, timeout=30)
response.raise_for_status()
return url, response.content
urls = ["https://example.com/file1.zip", "https://example.com/file2.zip"]
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit all tasks
future_to_url = {
executor.submit(download_file, url): url
for url in urls
}
# Process results as they complete
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
url, content = future.result()
print(f"Downloaded {url}: {len(content)} bytes")
except Exception as e:
print(f"Failed to download {url}: {e}")
Error Handling in ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def risky_operation(url):
"""Operation that might fail."""
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/404", # This will fail
"https://httpbin.org/status/500", # This will also fail
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(risky_operation, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result(timeout=10)
print(f"Success: {url}")
except requests.exceptions.HTTPError as e:
print(f"HTTP error for {url}: {e}")
except requests.exceptions.Timeout:
print(f"Timeout for {url}")
except Exception as e:
print(f"Unexpected error for {url}: {e}")
Multiprocessing
Each process has its own memory space and Python interpreter, bypassing the GIL:
from multiprocessing import Pool, cpu_count
import math
def compute_heavy(n):
"""CPU-intensive computation."""
return sum(math.factorial(i) % 1000 for i in range(n))
if __name__ == '__main__':
# Use all available CPU cores
numbers = [5000, 6000, 7000, 8000, 9000, 10000]
# Sequential execution
start = time.time()
sequential_results = [compute_heavy(n) for n in numbers]
print(f"Sequential: {time.time() - start:.2f}s")
# Parallel execution with multiprocessing
start = time.time()
with Pool() as pool:
parallel_results = pool.map(compute_heavy, numbers)
print(f"Parallel: {time.time() - start:.2f}s")
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
def cpu_bound_task(n):
"""CPU-intensive task."""
result = 0
for i in range(n):
result += i * i
return result
if __name__ == '__main__':
numbers = [10**6, 2*10**6, 3*10**6, 4*10**6]
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(cpu_bound_task, n): n
for n in numbers
}
for future in as_completed(futures):
n = futures[future]
result = future.result()
print(f"Task {n}: result = {result}")
Sharing Data Between Processes
from multiprocessing import Process, Value, Array, Manager
def increment_counter(counter):
"""Increment shared counter."""
for _ in range(1000):
counter.value += 1
def process_shared_list(shared_list):
"""Append to shared list."""
for i in range(5):
shared_list.append(os.getpid())
if __name__ == '__main__':
# Shared value (atomic operations)
counter = Value('i', 0)
processes = [
Process(target=increment_counter, args=(counter,))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}") # Should be 4000
# Shared list via Manager
with Manager() as manager:
shared_list = manager.list()
processes = [
Process(target=process_shared_list, args=(shared_list,))
for _ in range(3)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Shared list: {list(shared_list)}")
GIL — Global Interpreter Lock
The GIL is a mutex that allows only one thread to execute Python bytecode at a time:
import threading
import multiprocessing
import time
def cpu_bound():
"""CPU-intensive task."""
total = 0
for i in range(10**7):
total += i * i
return total
# CPU-bound with threading (NO speedup due to GIL)
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Threading: {time.time() - start:.2f}s")
# CPU-bound with multiprocessing (SPEEDUP from multiple cores)
start = time.time()
processes = [multiprocessing.Process(target=cpu_bound) for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")
# I/O-bound with threading (SPEEDUP despite GIL)
def io_bound():
time.sleep(1) # I/O operation releases GIL
return True
start = time.time()
threads = [threading.Thread(target=io_bound) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"I/O threading: {time.time() - start:.2f}s") # ~1s, not 10s
When to Use What
# USE THREADING FOR:
# - Network requests (HTTP, API calls)
# - File I/O (reading/writing multiple files)
# - Database queries
# - Any I/O-bound task
# - When you need shared state between tasks
# USE MULTIPROCESSING FOR:
# - Mathematical computations
# - Data processing (pandas, numpy operations)
# - Image/video processing
# - Any CPU-bound task
# - When GIL is the bottleneck
# USE ASYNCIO FOR:
# - High-concurrency network operations (thousands of connections)
# - WebSocket servers
# - When you need fine-grained control over task switching
# - When thread overhead is too high
Race Conditions and Thread Safety
When multiple threads access shared data, race conditions can occur:
import threading
import time
# Unsafe: Race condition
counter = 0
def unsafe_increment():
global counter
for _ in range(100000):
temp = counter
counter = temp + 1 # Not atomic!
threads = [threading.Thread(target=unsafe_increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Expected: 500000, Got: {counter}") # Usually less due to race condition
Locks for Thread Safety
import threading
counter = 0
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(100000):
with lock: # Only one thread can execute this block at a time
counter += 1
threads = [threading.Thread(target=safe_increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Expected: 500000, Got: {counter}") # Always 500000
RLock for Reentrant Locking
import threading
class Counter:
def __init__(self):
self._lock = threading.RLock() # Reentrant lock
self._count = 0
def increment(self):
with self._lock:
self._count += 1
def increment_many(self, times):
with self._lock:
for _ in range(times):
self.increment() # Can acquire the same lock again
counter = Counter()
threads = [threading.Thread(target=counter.increment_many, args=(1000,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Count: {counter._count}") # 5000
Thread-Safe Data Structures
import queue
import threading
import time
# Queue is thread-safe by default
task_queue = queue.Queue()
def producer():
for i in range(10):
task_queue.put(f"task_{i}")
print(f"Produced: task_{i}")
time.sleep(0.1)
task_queue.put(None) # Signal to stop
def consumer():
while True:
task = task_queue.get()
if task is None:
break
print(f"Consumed: {task}")
task_queue.task_done()
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Real-World Examples
Example 1: Parallel Download Manager
import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import time
class DownloadManager:
"""Download multiple files in parallel."""
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.results = []
def download_file(self, url, output_dir='.'):
"""Download a single file."""
filename = os.path.basename(urlparse(url).path) or 'download'
filepath = os.path.join(output_dir, filename)
start_time = time.time()
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
elapsed = time.time() - start_time
return {
'url': url,
'filepath': filepath,
'size': downloaded,
'time': elapsed,
'speed': downloaded / elapsed if elapsed > 0 else 0
}
def download_all(self, urls, output_dir='.'):
"""Download all URLs in parallel."""
os.makedirs(output_dir, exist_ok=True)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_url = {
executor.submit(self.download_file, url, output_dir): url
for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
self.results.append(result)
print(f"Downloaded: {result['filepath']} ({result['size']} bytes)")
except Exception as e:
print(f"Failed to download {url}: {e}")
return self.results
# Usage
manager = DownloadManager(max_workers=5)
urls = [
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
]
results = manager.download_all(urls, output_dir='./downloads')
Example 2: Parallel Data Processing
import csv
from concurrent.futures import ProcessPoolExecutor
import os
def process_chunk(chunk):
"""Process a chunk of data (runs in separate process)."""
results = []
for row in chunk:
# Simulate processing
processed = {
'name': row['name'].upper(),
'value': float(row['value']) * 2,
'category': categorize(row['value'])
}
results.append(processed)
return results
def categorize(value):
v = float(value)
if v < 10:
return 'low'
elif v < 100:
return 'medium'
else:
return 'high'
def parallel_csv_processing(input_file, chunk_size=1000):
"""Process large CSV file in parallel chunks."""
# Read file into chunks
chunks = []
current_chunk = []
with open(input_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
current_chunk.append(row)
if len(current_chunk) >= chunk_size:
chunks.append(current_chunk)
current_chunk = []
if current_chunk:
chunks.append(current_chunk)
# Process chunks in parallel
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_chunk, chunks))
# Flatten results
all_results = [item for chunk_result in results for item in chunk_result]
return all_results
Example 3: Thread Pool with Progress Tracking
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class ProgressTracker:
"""Track progress of concurrent tasks."""
def __init__(self, total):
self.total = total
self.completed = 0
self.lock = threading.Lock()
self.start_time = time.time()
def update(self):
with self.lock:
self.completed += 1
progress = self.completed / self.total * 100
elapsed = time.time() - self.start_time
rate = self.completed / elapsed if elapsed > 0 else 0
eta = (self.total - self.completed) / rate if rate > 0 else 0
print(f"\rProgress: {progress:.1f}% ({self.completed}/{self.total}) "
f"ETA: {eta:.1f}s", end='', flush=True)
def task(n):
"""Simulate work."""
time.sleep(0.5)
return n * n
# Usage
numbers = list(range(20))
tracker = ProgressTracker(len(numbers))
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(task, n): n for n in numbers}
for future in as_completed(futures):
result = future.result()
tracker.update()
print(f"\nCompleted in {time.time() - tracker.start_time:.2f}s")
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| Using threading for CPU-bound | No speedup due to GIL | Use multiprocessing for CPU-bound tasks |
| Not joining threads | Zombie threads, resource leaks | Always join threads after start |
| Sharing mutable state | Race conditions | Use locks, queues, or thread-local data |
| Creating too many threads | Memory overhead, context switching | Use ThreadPoolExecutor with limits |
Not using if __name__ == '__main__' | Multiprocessing issues on Windows | Always guard multiprocessing code |
| Ignoring daemon threads | Program hangs on exit | Set daemon=True for background threads |
Best Practices
# 1. Use ThreadPoolExecutor for I/O-bound tasks
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_url, urls))
# 2. Use ProcessPoolExecutor for CPU-bound tasks
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive, data))
# 3. Use queue.Queue for producer-consumer patterns
import queue
q = queue.Queue()
# Producer: q.put(item)
# Consumer: item = q.get()
# 4. Use thread-local data for thread-specific state
import threading
local = threading.local()
local.connection = create_connection()
# 5. Set daemon threads for background tasks
t = threading.Thread(target=background_task, daemon=True)
t.start()
# 6. Use as_completed for better responsiveness
for future in as_completed(futures):
result = future.result()
process(result)
Key Takeaways
- Use
ThreadPoolExecutorfor I/O-bound tasks (network, file, database) - Use
ProcessPoolExecutorfor CPU-bound tasks (math, data processing) - The GIL limits threading to one core for Python bytecode — use multiprocessing to bypass it
- Always use locks when sharing mutable state between threads
- Queue is thread-safe by default — use it for producer-consumer patterns
- Use
as_completed()to process results as they become available - Always guard multiprocessing code with
if __name__ == '__main__'