🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python Concurrency — Threading & Multiprocessing

Python AdvancedConcurrency🟢 Free Lesson

Advertisement

Python Concurrency — Threading & Multiprocessing

Concurrency lets you do multiple things at once. Python offers threading for I/O-bound tasks, multiprocessing for CPU-bound tasks, and the GIL (Global Interpreter Lock) affects which approach works best.

Learning Objectives

  • Use threading for I/O-bound tasks like network requests and file operations
  • Use multiprocessing for CPU-bound tasks like data processing and calculations
  • Understand the GIL and its implications for performance
  • Apply ThreadPoolExecutor and ProcessPoolExecutor for clean concurrency
  • Handle race conditions with locks and thread-safe data structures
  • Choose the right concurrency approach for your use case

Threading Basics

Threads share the same memory space, making them lightweight but subject to the GIL:

import threading
import time

def download(url):
    """Simulate downloading a file."""
    print(f"Starting download from {url}")
    time.sleep(2)  # Simulate network I/O
    print(f"Completed download from {url}")
    return f"Data from {url}"

# Create and start threads
urls = ["url1", "url2", "url3", "url4"]
threads = []

for url in urls:
    t = threading.Thread(target=download, args=(url,))
    threads.append(t)
    t.start()  # Start the thread

# Wait for all threads to complete
for t in threads:
    t.join()

print("All downloads completed")
# Total time: ~2 seconds (concurrent) vs ~8 seconds (sequential)

Thread with Return Values

import threading

class ResultThread(threading.Thread):
    """Thread that stores return value."""

    def __init__(self, target, args=()):
        super().__init__()
        self.target = target
        self.args = args
        self.result = None

    def run(self):
        self.result = self.target(*self.args)

def compute(n):
    """Simulate CPU work."""
    time.sleep(1)
    return n * n

# Usage
threads = [ResultThread(target=compute, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

results = [t.result for t in threads]
print(results)  # [0, 1, 4, 9, 16]

ThreadPoolExecutor

The concurrent.futures module provides a high-level interface for running concurrent tasks:

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch_url(url):
    """Fetch a URL and return status code."""
    response = requests.get(url, timeout=10)
    return {'url': url, 'status': response.status_code, 'length': len(response.text)}

urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
    "https://httpbin.org/headers",
    "https://httpbin.org/delay/1"
]

# Basic usage with map
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))
    for result in results:
        print(f"{result['url']}: {result['status']}")

Handling Results as They Complete

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def download_file(url):
    """Download file and return its content."""
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return url, response.content

urls = ["https://example.com/file1.zip", "https://example.com/file2.zip"]

with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit all tasks
    future_to_url = {
        executor.submit(download_file, url): url
        for url in urls
    }

    # Process results as they complete
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            url, content = future.result()
            print(f"Downloaded {url}: {len(content)} bytes")
        except Exception as e:
            print(f"Failed to download {url}: {e}")

Error Handling in ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def risky_operation(url):
    """Operation that might fail."""
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

urls = [
    "https://httpbin.org/get",
    "https://httpbin.org/status/404",  # This will fail
    "https://httpbin.org/status/500",  # This will also fail
]

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(risky_operation, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result(timeout=10)
            print(f"Success: {url}")
        except requests.exceptions.HTTPError as e:
            print(f"HTTP error for {url}: {e}")
        except requests.exceptions.Timeout:
            print(f"Timeout for {url}")
        except Exception as e:
            print(f"Unexpected error for {url}: {e}")

Multiprocessing

Each process has its own memory space and Python interpreter, bypassing the GIL:

from multiprocessing import Pool, cpu_count
import math

def compute_heavy(n):
    """CPU-intensive computation."""
    return sum(math.factorial(i) % 1000 for i in range(n))

if __name__ == '__main__':
    # Use all available CPU cores
    numbers = [5000, 6000, 7000, 8000, 9000, 10000]

    # Sequential execution
    start = time.time()
    sequential_results = [compute_heavy(n) for n in numbers]
    print(f"Sequential: {time.time() - start:.2f}s")

    # Parallel execution with multiprocessing
    start = time.time()
    with Pool() as pool:
        parallel_results = pool.map(compute_heavy, numbers)
    print(f"Parallel: {time.time() - start:.2f}s")

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor, as_completed
import os

def cpu_bound_task(n):
    """CPU-intensive task."""
    result = 0
    for i in range(n):
        result += i * i
    return result

if __name__ == '__main__':
    numbers = [10**6, 2*10**6, 3*10**6, 4*10**6]

    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(cpu_bound_task, n): n
            for n in numbers
        }

        for future in as_completed(futures):
            n = futures[future]
            result = future.result()
            print(f"Task {n}: result = {result}")

Sharing Data Between Processes

from multiprocessing import Process, Value, Array, Manager

def increment_counter(counter):
    """Increment shared counter."""
    for _ in range(1000):
        counter.value += 1

def process_shared_list(shared_list):
    """Append to shared list."""
    for i in range(5):
        shared_list.append(os.getpid())

if __name__ == '__main__':
    # Shared value (atomic operations)
    counter = Value('i', 0)
    processes = [
        Process(target=increment_counter, args=(counter,))
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"Counter: {counter.value}")  # Should be 4000

    # Shared list via Manager
    with Manager() as manager:
        shared_list = manager.list()
        processes = [
            Process(target=process_shared_list, args=(shared_list,))
            for _ in range(3)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(f"Shared list: {list(shared_list)}")

GIL — Global Interpreter Lock

The GIL is a mutex that allows only one thread to execute Python bytecode at a time:

import threading
import multiprocessing
import time

def cpu_bound():
    """CPU-intensive task."""
    total = 0
    for i in range(10**7):
        total += i * i
    return total

# CPU-bound with threading (NO speedup due to GIL)
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Threading: {time.time() - start:.2f}s")

# CPU-bound with multiprocessing (SPEEDUP from multiple cores)
start = time.time()
processes = [multiprocessing.Process(target=cpu_bound) for _ in range(4)]
for p in processes:
    p.start()
for p in processes:
    p.join()
print(f"Multiprocessing: {time.time() - start:.2f}s")

# I/O-bound with threading (SPEEDUP despite GIL)
def io_bound():
    time.sleep(1)  # I/O operation releases GIL
    return True

start = time.time()
threads = [threading.Thread(target=io_bound) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"I/O threading: {time.time() - start:.2f}s")  # ~1s, not 10s

When to Use What

# USE THREADING FOR:
# - Network requests (HTTP, API calls)
# - File I/O (reading/writing multiple files)
# - Database queries
# - Any I/O-bound task
# - When you need shared state between tasks

# USE MULTIPROCESSING FOR:
# - Mathematical computations
# - Data processing (pandas, numpy operations)
# - Image/video processing
# - Any CPU-bound task
# - When GIL is the bottleneck

# USE ASYNCIO FOR:
# - High-concurrency network operations (thousands of connections)
# - WebSocket servers
# - When you need fine-grained control over task switching
# - When thread overhead is too high

Race Conditions and Thread Safety

When multiple threads access shared data, race conditions can occur:

import threading
import time

# Unsafe: Race condition
counter = 0

def unsafe_increment():
    global counter
    for _ in range(100000):
        temp = counter
        counter = temp + 1  # Not atomic!

threads = [threading.Thread(target=unsafe_increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Expected: 500000, Got: {counter}")  # Usually less due to race condition

Locks for Thread Safety

import threading

counter = 0
lock = threading.Lock()

def safe_increment():
    global counter
    for _ in range(100000):
        with lock:  # Only one thread can execute this block at a time
            counter += 1

threads = [threading.Thread(target=safe_increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Expected: 500000, Got: {counter}")  # Always 500000

RLock for Reentrant Locking

import threading

class Counter:
    def __init__(self):
        self._lock = threading.RLock()  # Reentrant lock
        self._count = 0

    def increment(self):
        with self._lock:
            self._count += 1

    def increment_many(self, times):
        with self._lock:
            for _ in range(times):
                self.increment()  # Can acquire the same lock again

counter = Counter()
threads = [threading.Thread(target=counter.increment_many, args=(1000,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Count: {counter._count}")  # 5000

Thread-Safe Data Structures

import queue
import threading
import time

# Queue is thread-safe by default
task_queue = queue.Queue()

def producer():
    for i in range(10):
        task_queue.put(f"task_{i}")
        print(f"Produced: task_{i}")
        time.sleep(0.1)
    task_queue.put(None)  # Signal to stop

def consumer():
    while True:
        task = task_queue.get()
        if task is None:
            break
        print(f"Consumed: {task}")
        task_queue.task_done()

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Real-World Examples

Example 1: Parallel Download Manager

import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
import time

class DownloadManager:
    """Download multiple files in parallel."""

    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.results = []

    def download_file(self, url, output_dir='.'):
        """Download a single file."""
        filename = os.path.basename(urlparse(url).path) or 'download'
        filepath = os.path.join(output_dir, filename)

        start_time = time.time()
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))
        downloaded = 0

        with open(filepath, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
                downloaded += len(chunk)

        elapsed = time.time() - start_time
        return {
            'url': url,
            'filepath': filepath,
            'size': downloaded,
            'time': elapsed,
            'speed': downloaded / elapsed if elapsed > 0 else 0
        }

    def download_all(self, urls, output_dir='.'):
        """Download all URLs in parallel."""
        os.makedirs(output_dir, exist_ok=True)

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_url = {
                executor.submit(self.download_file, url, output_dir): url
                for url in urls
            }

            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    self.results.append(result)
                    print(f"Downloaded: {result['filepath']} ({result['size']} bytes)")
                except Exception as e:
                    print(f"Failed to download {url}: {e}")

        return self.results

# Usage
manager = DownloadManager(max_workers=5)
urls = [
    "https://example.com/file1.zip",
    "https://example.com/file2.zip",
    "https://example.com/file3.zip",
]
results = manager.download_all(urls, output_dir='./downloads')

Example 2: Parallel Data Processing

import csv
from concurrent.futures import ProcessPoolExecutor
import os

def process_chunk(chunk):
    """Process a chunk of data (runs in separate process)."""
    results = []
    for row in chunk:
        # Simulate processing
        processed = {
            'name': row['name'].upper(),
            'value': float(row['value']) * 2,
            'category': categorize(row['value'])
        }
        results.append(processed)
    return results

def categorize(value):
    v = float(value)
    if v < 10:
        return 'low'
    elif v < 100:
        return 'medium'
    else:
        return 'high'

def parallel_csv_processing(input_file, chunk_size=1000):
    """Process large CSV file in parallel chunks."""
    # Read file into chunks
    chunks = []
    current_chunk = []

    with open(input_file, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            current_chunk.append(row)
            if len(current_chunk) >= chunk_size:
                chunks.append(current_chunk)
                current_chunk = []
        if current_chunk:
            chunks.append(current_chunk)

    # Process chunks in parallel
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(process_chunk, chunks))

    # Flatten results
    all_results = [item for chunk_result in results for item in chunk_result]
    return all_results

Example 3: Thread Pool with Progress Tracking

import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class ProgressTracker:
    """Track progress of concurrent tasks."""

    def __init__(self, total):
        self.total = total
        self.completed = 0
        self.lock = threading.Lock()
        self.start_time = time.time()

    def update(self):
        with self.lock:
            self.completed += 1
            progress = self.completed / self.total * 100
            elapsed = time.time() - self.start_time
            rate = self.completed / elapsed if elapsed > 0 else 0
            eta = (self.total - self.completed) / rate if rate > 0 else 0

            print(f"\rProgress: {progress:.1f}% ({self.completed}/{self.total}) "
                  f"ETA: {eta:.1f}s", end='', flush=True)

def task(n):
    """Simulate work."""
    time.sleep(0.5)
    return n * n

# Usage
numbers = list(range(20))
tracker = ProgressTracker(len(numbers))

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(task, n): n for n in numbers}

    for future in as_completed(futures):
        result = future.result()
        tracker.update()

print(f"\nCompleted in {time.time() - tracker.start_time:.2f}s")

Common Mistakes

MistakeProblemSolution
Using threading for CPU-boundNo speedup due to GILUse multiprocessing for CPU-bound tasks
Not joining threadsZombie threads, resource leaksAlways join threads after start
Sharing mutable stateRace conditionsUse locks, queues, or thread-local data
Creating too many threadsMemory overhead, context switchingUse ThreadPoolExecutor with limits
Not using if __name__ == '__main__'Multiprocessing issues on WindowsAlways guard multiprocessing code
Ignoring daemon threadsProgram hangs on exitSet daemon=True for background threads

Best Practices

# 1. Use ThreadPoolExecutor for I/O-bound tasks
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(process_url, urls))

# 2. Use ProcessPoolExecutor for CPU-bound tasks
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    results = list(executor.map(cpu_intensive, data))

# 3. Use queue.Queue for producer-consumer patterns
import queue

q = queue.Queue()
# Producer: q.put(item)
# Consumer: item = q.get()

# 4. Use thread-local data for thread-specific state
import threading

local = threading.local()
local.connection = create_connection()

# 5. Set daemon threads for background tasks
t = threading.Thread(target=background_task, daemon=True)
t.start()

# 6. Use as_completed for better responsiveness
for future in as_completed(futures):
    result = future.result()
    process(result)

Key Takeaways

  1. Use ThreadPoolExecutor for I/O-bound tasks (network, file, database)
  2. Use ProcessPoolExecutor for CPU-bound tasks (math, data processing)
  3. The GIL limits threading to one core for Python bytecode — use multiprocessing to bypass it
  4. Always use locks when sharing mutable state between threads
  5. Queue is thread-safe by default — use it for producer-consumer patterns
  6. Use as_completed() to process results as they become available
  7. Always guard multiprocessing code with if __name__ == '__main__'

Premium Content

Python Concurrency — Threading & Multiprocessing

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement