🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python Context Managers — Resources Done Right

Python AdvancedContext Managers🟢 Free Lesson

Advertisement

Python Context Managers — Resources Done Right

Context managers ensure proper acquisition and release of resources. The with statement guarantees cleanup even if errors occur.

Learning Objectives

  • Understand the with statement protocol
  • Create context managers with classes and contextlib
  • Use nested and async context managers
  • Apply context managers for locks, transactions, and cleanup
  • Build real-world context managers for databases and file locking

The with Statement

# Without context manager — error-prone
f = open('file.txt', 'r')
try:
    data = f.read()
finally:
    f.close()

# With context manager — safe and clean
with open('file.txt', 'r') as f:
    data = f.read()
# File is automatically closed, even if exception occurs

How It Works

# The with statement calls __enter__ and __exit__
class ManagedResource:
    def __enter__(self):
        print("Entering context")
        return self  # Bound to 'as' variable

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Exiting context")
        return False  # Don't suppress exceptions

with ManagedResource() as r:
    print("Inside context")
# Entering context
# Inside context
# Exiting context

Multiple Context Managers

# Python 3.1+ supports multiple context managers in a single with statement
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
    outfile.write(infile.read())

# Python 3.10+ supports parentheses for grouping
with (
    open('input.txt', 'r') as infile,
    open('output.txt', 'w') as outfile,
    open('log.txt', 'a') as log
):
    outfile.write(infile.read())
    log.write("Copied input to output\n")

Class-Based Context Manager

class ManagedResource:
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        print(f"Acquiring {self.name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Releasing {self.name}")
        if exc_type is not None:
            print(f"Error occurred: {exc_val}")
        return False  # Don't suppress exceptions

with ManagedResource("database") as resource:
    print(f"Using {resource.name}")
# Acquiring database
# Using database
# Releasing database

Exception Handling in exit

class SafeFile:
    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None

    def __enter__(self):
        self.file = open(self.filename, self.mode)
        return self.file

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.file:
            self.file.close()
        if exc_type == ValueError:
            print(f"ValueError handled: {exc_val}")
            return True  # Suppress the exception
        return False  # Let other exceptions propagate

# ValueError is suppressed
with SafeFile('test.txt', 'w') as f:
    raise ValueError("Something went wrong")
# Value error handled, execution continues

# Other exceptions propagate
with SafeFile('test.txt', 'w') as f:
    raise TypeError("Type error")  # Propagates!

exit Parameters Explained

class DetailedContext:
    def __enter__(self):
        print("Entering context")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            print("No exception occurred")
        else:
            print(f"Exception type: {exc_type}")
            print(f"Exception value: {exc_val}")
            print(f"Traceback: {exc_tb}")
        return False

# Without exception
with DetailedContext() as d:
    print("Normal execution")
# Entering context
# Normal execution
# No exception occurred

# With exception
try:
    with DetailedContext() as d:
        raise ValueError("Test error")
except ValueError:
    pass
# Entering context
# Exception type: <class 'ValueError'>
# Exception value: Test error
# Traceback: <traceback object at 0x...>

contextlib Decorator

from contextlib import contextmanager

@contextmanager
def managed_file(filename, mode):
    try:
        f = open(filename, mode)
        yield f
    finally:
        f.close()

with managed_file('test.txt', 'w') as f:
    f.write("Hello, World!")

# The @contextmanager decorator converts a generator into a context manager
# 1. Code before yield runs in __enter__
# 2. yield provides the value for 'as'
# 3. Code after yield runs in __exit__
# 4. Exceptions propagate from yield point

More contextmanager Examples

from contextlib import contextmanager
import time

@contextmanager
def timer(label=""):
    start = time.perf_counter()
    yield
    elapsed = time.perf_counter() - start
    print(f"{label} Elapsed: {elapsed:.4f}s")

with timer("Sort:"):
    sorted(range(1000000, 0, -1))

@contextmanager
def temporary_directory():
    import tempfile
    import shutil
    tmp_dir = tempfile.mkdtemp()
    try:
        yield tmp_dir
    finally:
        shutil.rmtree(tmp_dir)

with temporary_directory() as td:
    # Use temporary directory
    pass
# Directory automatically cleaned up

Context Manager with Return Values

from contextlib import contextmanager

@contextmanager
def database_connection(host, port):
    print(f"Connecting to {host}:{port}")
    connection = {"host": host, "port": port, "connected": True}
    try:
        yield connection
    finally:
        connection["connected"] = False
        print(f"Disconnected from {host}:{port}")

# Use the context manager
with database_connection("localhost", 5432) as conn:
    print(f"Connected: {conn}")
# Connecting to localhost:5432
# Connected: {'host': 'localhost', 'port': 5432, 'connected': True}
# Disconnected from localhost:5432

Common contextlib Utilities

from contextlib import contextmanager, suppress, redirect_stdout, redirect_stderr
import io, os

# suppress — ignore specific exceptions
with suppress(FileNotFoundError):
    os.remove('nonexistent.txt')

# redirect_stdout — capture print output
f = io.StringIO()
with redirect_stdout(f):
    print("captured output")
captured = f.getvalue()
print(f"Got: {captured!r}")  # Got: 'captured output\n'

# redirect_stderr — capture error output
import sys
err_capture = io.StringIO()
with redirect_stderr(err_capture):
    print("error message", file=sys.stderr)

# ExitStack — dynamic number of context managers
from contextlib import ExitStack

def process_files(filenames):
    with ExitStack() as stack:
        files = [stack.enter_context(open(fn)) for fn in filenames]
        for f in files:
            print(f.read()[:100])

# callback — register cleanup functions
with ExitStack() as stack:
    import tempfile
    tmp = tempfile.NamedTemporaryFile(delete=False)
    stack.callback(os.unlink, tmp.name)
    # cleanup guaranteed even if exception occurs

contextlib.suppress Deep Dive

from contextlib import suppress

# Suppress multiple exceptions
with suppress(FileNotFoundError, PermissionError, OSError):
    os.remove('nonexistent.txt')

# Suppress in loop
import random
for i in range(10):
    with suppress(StopIteration):
        value = next(iter([]))

# Practical: safe dictionary access
def safe_dict_access(d, key, default=None):
    with suppress(KeyError):
        return d[key]
    return default

result = safe_dict_access({"a": 1}, "b", "not found")
print(result)  # "not found"

redirect_stdout and redirect_stderr

from contextlib import redirect_stdout, redirect_stderr
import io
import sys

# Capture all output
output_buffer = io.StringIO()
error_buffer = io.StringIO()

with redirect_stdout(output_buffer), redirect_stderr(error_buffer):
    print("This goes to stdout")
    print("This goes to stderr", file=sys.stderr)

stdout_content = output_buffer.getvalue()
stderr_content = error_buffer.getvalue()

print(f"Captured stdout: {stdout_content!r}")
print(f"Captured stderr: {stderr_content!r}")

# Redirect to file
with open('output.txt', 'w') as f:
    with redirect_stdout(f):
        print("This goes to file")

Exit Stack — Dynamic Context Managers

from contextlib import ExitStack

class ConnectionPool:
    def __init__(self):
        self.connections = []

    def get_connection(self, host):
        conn = {"host": host, "active": True}
        self.connections.append(conn)
        return conn

    def release_all(self):
        for conn in self.connections:
            conn["active"] = False
        self.connections.clear()

def dynamic_context_managers(hosts):
    pool = ConnectionPool()

    with ExitStack() as stack:
        # Dynamically add context managers
        connections = []
        for host in hosts:
            conn = pool.get_connection(host)
            connections.append(conn)
            stack.callback(pool.release_all)

        # Use all connections
        for conn in connections:
            print(f"Processing {conn['host']}")

# Nested context managers
with ExitStack() as stack:
    files = [
        stack.enter_context(open(f, 'r'))
        for f in ['file1.txt', 'file2.txt', 'file3.txt']
    ]
    # All files opened, all guaranteed to close

ExitStack Advanced Patterns

from contextlib import ExitStack

# Pattern 1: Dynamic resource management
def process_dynamic_resources(resource_list):
    with ExitStack() as stack:
        resources = []
        for resource in resource_list:
            # Assume resources have context manager protocol
            ctx = stack.enter_context(resource)
            resources.append(ctx)
        
        # Process all resources
        for resource in resources:
            resource.process()

# Pattern 2: Cleanup on error
def safe_operation():
    with ExitStack() as stack:
        # Register cleanup callbacks
        stack.callback(lambda: print("Cleanup 1"))
        stack.callback(lambda: print("Cleanup 2"))
        
        # Simulate work
        raise ValueError("Something went wrong")
# Cleanup 2 (LIFO order)
# Cleanup 1

# Pattern 3: Conditional context managers
def conditional_context(condition):
    with ExitStack() as stack:
        if condition:
            # Only enter context if condition is True
            stack.enter_context(some_context_manager())
        
        # Rest of the code
        print("Operation completed")

Real-World: Database Connection Manager

from contextlib import contextmanager
import time

class DatabaseConnection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connected = False
        self.in_transaction = False

    def connect(self):
        print(f"Connecting to {self.host}:{self.port}")
        self.connected = True

    def disconnect(self):
        print(f"Disconnecting from {self.host}:{self.port}")
        self.connected = False

    def begin_transaction(self):
        self.in_transaction = True

    def commit(self):
        self.in_transaction = False
        print("Transaction committed")

    def rollback(self):
        self.in_transaction = False
        print("Transaction rolled back")

    def execute(self, query):
        if not self.connected:
            raise RuntimeError("Not connected")
        print(f"Executing: {query}")
        return {"rows_affected": 1}

@contextmanager
def database_session(host="localhost", port=5432):
    conn = DatabaseConnection(host, port)
    conn.connect()
    try:
        conn.begin_transaction()
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.disconnect()

# Usage
with database_session() as db:
    db.execute("INSERT INTO users (name) VALUES ('Alice')")
    db.execute("INSERT INTO users (name) VALUES ('Bob')")
# Transaction committed, connection closed

# Error handling
try:
    with database_session() as db:
        db.execute("INSERT INTO users (name) VALUES ('Charlie')")
        raise ValueError("Simulated error")
except ValueError:
    pass
# Transaction rolled back, connection closed

Connection Pool Context Manager

from contextlib import contextmanager
import threading

class ConnectionPool:
    def __init__(self, max_connections=10):
        self.connections = []
        self.semaphore = threading.Semaphore(max_connections)
        self.lock = threading.Lock()

    @contextmanager
    def get_connection(self):
        self.semaphore.acquire()
        try:
            conn = self._create_connection()
            yield conn
        finally:
            self._release_connection(conn)
            self.semaphore.release()

    def _create_connection(self):
        # Create actual database connection
        return {"id": len(self.connections), "active": True}

    def _release_connection(self, conn):
        with self.lock:
            conn["active"] = False

# Usage
pool = ConnectionPool(max_connections=5)
with pool.get_connection() as conn:
    # Use connection
    print(f"Using connection {conn['id']}")
# Connection automatically released

Real-World: File Locking

from contextlib import contextmanager
import os
import time

@contextmanager
def file_lock(filepath, timeout=10):
    """Simple file-based locking mechanism."""
    lock_file = filepath + ".lock"
    start_time = time.time()

    while True:
        try:
            # Atomic creation (fails if exists)
            fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
            os.write(fd, str(os.getpid()).encode())
            os.close(fd)
            break
        except FileExistsError:
            if time.time() - start_time > timeout:
                raise TimeoutError(f"Could not acquire lock on {filepath}")
            time.sleep(0.1)

    try:
        yield filepath
    finally:
        # Release lock
        if os.path.exists(lock_file):
            os.remove(lock_file)

# Usage
with file_lock("data.json") as f:
    # Only one process can access this at a time
    with open(f, 'w') as file:
        file.write('{"data": "protected"}')

Cross-Process File Locking

import msvcrt
import os
import contextlib

@contextlib.contextmanager
def windows_file_lock(filepath):
    """Windows-specific file locking using msvcrt."""
    lock_fd = None
    try:
        lock_fd = open(filepath + ".lock", 'w')
        msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
        yield filepath
    finally:
        if lock_fd:
            msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
            lock_fd.close()
            os.remove(filepath + ".lock")

# Usage
with windows_file_lock("shared_data.txt") as f:
    # Exclusive access to file
    pass

Async Context Managers

import asyncio

class AsyncDatabase:
    async def connect(self):
        print("Async connecting...")
        await asyncio.sleep(0.1)
        return self

    async def disconnect(self):
        print("Async disconnecting...")
        await asyncio.sleep(0.1)

    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()
        return False

async def main():
    async with AsyncDatabase() as db:
        print("Using async database")

asyncio.run(main())

# Using contextlib for async
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer(label=""):
    start = time.perf_counter()
    yield
    elapsed = time.perf_counter() - start
    print(f"{label} Async elapsed: {elapsed:.4f}s")

Async Context Manager Patterns

import asyncio
from contextlib import asynccontextmanager

# Pattern 1: Async resource cleanup
@asynccontextmanager
async def async_file_handler(filename, mode='r'):
    """Async context manager for file operations."""
    file = await asyncio.to_thread(open, filename, mode)
    try:
        yield file
    finally:
        await asyncio.to_thread(file.close)

# Pattern 2: Async connection pool
@asynccontextmanager
async def async_connection_pool(max_size=10):
    """Async context manager for connection pool."""
    semaphore = asyncio.Semaphore(max_size)
    
    async def get_connection():
        async with semaphore:
            # Create async connection
            connection = {"id": id(semaphore), "active": True}
            yield connection
            connection["active"] = False
    
    yield get_connection

# Pattern 3: Async timeout
@asynccontextmanager
async def async_timeout(seconds):
    """Async context manager with timeout."""
    try:
        yield
    except asyncio.TimeoutError:
        print(f"Operation timed out after {seconds} seconds")

Common Mistakes

# Mistake 1: Forgetting to yield in contextmanager
@contextmanager
def bad_context():
    resource = acquire_resource()
    # Missing yield!  # This will fail

@contextmanager
def good_context():
    resource = acquire_resource()
    try:
        yield resource
    finally:
        release_resource(resource)

# Mistake 2: Not using try/finally in contextmanager
@contextmanager
def risky_context():
    resource = acquire_resource()
    yield resource
    release_resource(resource)  # Not reached if exception in yield!

@contextmanager
def safe_context():
    resource = acquire_resource()
    try:
        yield resource
    finally:
        release_resource(resource)  # Always runs

# Mistake 3: Suppressing all exceptions in __exit__
class BadContext:
    def __exit__(self, exc_type, exc_val, exc_tb):
        return True  # Suppresses ALL exceptions!

class GoodContext:
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type == ValueError:
            return True  # Only suppress specific exceptions
        return False

# Mistake 4: Not cleaning up on error
def bad_cleanup():
    f = open('file.txt', 'w')
    f.write("data")  # If this fails, f is never closed!

def good_cleanup():
    with open('file.txt', 'w') as f:
        f.write("data")  # File closed even if write fails

Additional Common Mistakes

# Mistake 5: Not returning from __exit__
class IncompleteContext:
    def __exit__(self, exc_type, exc_val, exc_tb):
        # Forgot to return! This will raise TypeError
        pass

# Fix:
class CompleteContext:
    def __exit__(self, exc_type, exc_val, exc_tb):
        return False  # Always return a boolean

# Mistake 6: Modifying __enter__ return value
@contextmanager
def bad_return():
    resource = {"status": "active"}
    yield resource
    resource["status"] = "inactive"  # This modifies the yielded object!

@contextmanager
def good_return():
    resource = {"status": "active"}
    try:
        yield resource.copy()  # Return a copy
    finally:
        pass  # Original resource cleanup

# Mistake 7: Not handling exceptions in contextmanager
@contextmanager
def fragile_context():
    resource = acquire_resource()
    yield resource
    # If yield raises, this code won't run!
    release_resource(resource)

@contextmanager
def robust_context():
    resource = acquire_resource()
    try:
        yield resource
    finally:
        release_resource(resource)  # Always runs

Key Takeaways

  1. Use with for any resource that needs cleanup
  2. __enter__ acquires, __exit__ releases
  3. contextlib.contextmanager simplifies creation
  4. ExitStack handles dynamic numbers of context managers
  5. Context managers guarantee cleanup even on exceptions
  6. Return True from __exit__ to suppress exceptions
  7. Always use try/finally in @contextmanager functions
  8. Use suppress() to ignore specific exceptions cleanly
  9. Async context managers use __aenter__ and __aexit__
  10. Multiple context managers can be combined with ExitStack

Premium Content

Python Context Managers — Resources Done Right

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement