🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Context Managers: __enter__, __exit__, contextlib, async

PythonContext Managers⭐ Premium

Advertisement

Google, Meta & Amazon Interview

Context Managers: enter, exit, contextlib, async

Resource management patterns in Python

Interview Question

"Explain context managers in Python. How do __enter__ and __exit__ work? How do you create context managers using contextlib? Show async context managers."

Difficulty: Medium | Frequently asked at Google, Meta, Amazon


Theoretical Foundation

What is a Context Manager?

A context manager manages resources by ensuring proper setup and teardown, even when exceptions occur.

# Without context manager
file = open('data.txt', 'r')
try:
    data = file.read()
finally:
    file.close()  # Must remember to close!

# With context manager
with open('data.txt', 'r') as file:
    data = file.read()  # Auto-closed when block exits

ℹ️

Key Concept: Context managers guarantee cleanup code runs, preventing resource leaks.


Class-Based Context Managers

Basic Implementation

class FileManager:
    """Context manager for file operations."""
    
    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None
    
    def __enter__(self):
        """Setup: called when entering with block."""
        print(f"Opening {self.filename}")
        self.file = open(self.filename, self.mode)
        return self.file
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Teardown: called when exiting with block."""
        print(f"Closing {self.filename}")
        if self.file:
            self.file.close()
        
        # Return False to propagate exceptions
        # Return True to suppress exceptions
        if exc_type is not None:
            print(f"Exception occurred: {exc_val}")
        return False

# Usage
with FileManager('test.txt', 'w') as f:
    f.write("Hello, World!")
    print("File written")
# File is automatically closed here

Output:

Architecture Diagram
Opening test.txt
File written
Closing test.txt

Exception Handling

class SafeResource:
    """Context manager with exception handling."""
    
    def __init__(self, name):
        self.name = name
        self.resource = None
    
    def __enter__(self):
        print(f"Acquiring {self.name}")
        self.resource = {"name": self.name, "active": True}
        return self.resource
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Releasing {self.name}")
        if self.resource:
            self.resource["active"] = False
        
        # Handle specific exceptions
        if exc_type == ValueError:
            print(f"ValueError caught: {exc_val}")
            return True  # Suppress exception
        
        if exc_type == TypeError:
            print(f"TypeError caught: {exc_val}")
            return False  # Let exception propagate
        
        return False

# Usage - no exception
print("Test 1: No exception")
with SafeResource("database") as res:
    print(f"Using {res['name']}")
print()

# Usage - ValueError (suppressed)
print("Test 2: ValueError (suppressed)")
with SafeResource("database") as res:
    raise ValueError("Invalid value")
print("Continues after ValueError")
print()

# Usage - TypeError (not suppressed)
print("Test 3: TypeError (not suppressed)")
try:
    with SafeResource("database") as res:
        raise TypeError("Type error")
except TypeError as e:
    print(f"Caught outside: {e}")

Output:

Architecture Diagram
Test 1: No exception
Acquiring database
Using database
Releasing database

Test 2: ValueError (suppressed)
Acquiring database
Releasing database
ValueError caught: Invalid value
Continues after ValueError

Test 3: TypeError (not suppressed)
Acquiring database
Releasing database
TypeError caught: Type error
Caught outside: Type error

⚠️

Exception Handling: Return True from __exit__ to suppress exceptions, False to propagate them.


contextlib Module

@contextmanager Decorator

from contextlib import contextmanager
import time

@contextmanager
def timer(label):
    """Timer context manager using generator."""
    start = time.time()
    print(f"Starting timer: {label}")
    
    try:
        yield  # Control passes to with block
    finally:
        # Cleanup code runs here
        elapsed = time.time() - start
        print(f"{label} took {elapsed:.3f}s")

# Usage
with timer("Operation"):
    time.sleep(0.1)
    print("Doing work...")

# Output:
# Starting timer: Operation
# Doing work...
# Operation took 0.100s

contextmanager with Return Value

from contextlib import contextmanager

@contextmanager
def database_connection(db_name):
    """Database connection context manager."""
    print(f"Connecting to {db_name}")
    connection = {"name": db_name, "connected": True}
    
    try:
        yield connection  # Return value to 'as' clause
    finally:
        print(f"Disconnecting from {db_name}")
        connection["connected"] = False

# Usage
with database_connection("mydb") as conn:
    print(f"Connected to {conn['name']}")
    print(f"Status: {conn['connected']}")

# Output:
# Connecting to mydb
# Connected to mydb
# Status: True
# Disconnecting from mydb

Nested Context Managers

from contextlib import contextmanager

@contextmanager
def open_file(filename, mode):
    print(f"Opening {filename}")
    f = open(filename, mode)
    try:
        yield f
    finally:
        f.close()
        print(f"Closed {filename}")

@contextmanager
def temporary_directory():
    import tempfile
    import shutil
    
    dir_path = tempfile.mkdtemp()
    print(f"Created temp dir: {dir_path}")
    try:
        yield dir_path
    finally:
        shutil.rmtree(dir_path)
        print(f"Removed temp dir: {dir_path}")

# Nested context managers
with temporary_directory() as tmp_dir:
    with open_file(f"{tmp_dir}/test.txt", 'w') as f:
        f.write("Hello!")
        print(f"Wrote to {f.name}")

# Or using contextlib.ExitStack for dynamic nesting
from contextlib import ExitStack

with ExitStack() as stack:
    files = []
    for i in range(3):
        f = stack.enter_context(open(f"file_{i}.txt", 'w'))
        files.append(f)
        f.write(f"Content {i}")

contextlib.suppress

from contextlib import suppress

# Suppress specific exceptions
with suppress(FileNotFoundError):
    open("nonexistent.txt")
    print("This won't print")
print("Continues after suppressed exception")

# Equivalent to:
try:
    open("nonexistent.txt")
except FileNotFoundError:
    pass

Output:

Architecture Diagram
Continues after suppressed exception

contextlib.redirect_stdout

from contextlib import redirect_stdout
from io import StringIO

# Redirect stdout
f = StringIO()
with redirect_stdout(f):
    print("This goes to string buffer")
    print("Not to console")

output = f.getvalue()
print(f"Captured: {output}")

# Redirect to file
with open("output.txt", "w") as file:
    with redirect_stdout(file):
        print("This goes to file")

Output:

Architecture Diagram
Captured: This goes to string buffer
Not to console

💡

Interview Tip: contextlib provides shortcuts for common context manager patterns.


Async Context Managers

Basic Async Context Manager

import asyncio

class AsyncDatabase:
    """Async context manager for database."""
    
    def __init__(self, db_name):
        self.db_name = db_name
        self.connection = None
    
    async def __aenter__(self):
        """Async setup."""
        print(f"Connecting to {self.db_name}...")
        await asyncio.sleep(0.1)  # Simulate async connection
        self.connection = {"name": self.db_name, "connected": True}
        return self.connection
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async teardown."""
        print(f"Disconnecting from {self.db_name}...")
        await asyncio.sleep(0.1)  # Simulate async disconnection
        if self.connection:
            self.connection["connected"] = False
        return False

# Usage
async def main():
    async with AsyncDatabase("mydb") as conn:
        print(f"Connected to {conn['name']}")
        await asyncio.sleep(0.1)  # Simulate async work
        print(f"Status: {conn['connected']}")

asyncio.run(main())

Output:

Architecture Diagram
Connecting to mydb...
Connected to mydb
Status: True
Disconnecting from mydb...

contextlib.asynccontextmanager

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer(label):
    """Async timer context manager."""
    start = time.time()
    print(f"Starting {label}")
    
    try:
        yield
    finally:
        elapsed = time.time() - start
        print(f"{label} took {elapsed:.3f}s")

@asynccontextmanager
async def async_resource(name):
    """Async resource management."""
    print(f"Acquiring {name}")
    resource = {"name": name, "active": True}
    
    try:
        yield resource
    finally:
        print(f"Releasing {name}")
        resource["active"] = False

# Usage
async def main():
    async with async_timer("Async Operation"):
        async with async_resource("database") as res:
            await asyncio.sleep(0.1)
            print(f"Using {res['name']}")

asyncio.run(main())

Output:

Architecture Diagram
Starting Async Operation
Acquiring database
Using database
Releasing database
Async Operation took 0.101s

Async ExitStack

import asyncio
from contextlib import AsyncExitStack

async def async_operation(name):
    print(f"Starting {name}")
    await asyncio.sleep(0.1)
    print(f"Finished {name}")

async def main():
    async with AsyncExitStack() as stack:
        # Dynamically add async context managers
        tasks = []
        for i in range(3):
            task = asyncio.create_task(async_operation(f"task_{i}"))
            tasks.append(task)
        
        await asyncio.gather(*tasks)

asyncio.run(main())

Advanced Patterns

Connection Pool

import asyncio
from typing import List, Optional
from contextlib import asynccontextmanager

class ConnectionPool:
    """Async connection pool."""
    
    def __init__(self, max_size: int = 10):
        self.max_size = max_size
        self.connections: List[dict] = []
        self.available: asyncio.Queue = asyncio.Queue()
        self.lock = asyncio.Lock()
    
    async def create_connection(self) -> dict:
        """Create new connection."""
        await asyncio.sleep(0.01)  # Simulate connection time
        return {"id": len(self.connections), "active": True}
    
    @asynccontextmanager
    async def get_connection(self):
        """Get connection from pool."""
        connection = None
        
        async with self.lock:
            if self.available.empty() and len(self.connections) < self.max_size:
                connection = await self.create_connection()
                self.connections.append(connection)
        
        if connection is None:
            connection = await self.available.get()
        
        try:
            yield connection
        finally:
            await self.available.put(connection)

# Usage
async def main():
    pool = ConnectionPool(max_size=3)
    
    async with pool.get_connection() as conn:
        print(f"Using connection {conn['id']}")
        await asyncio.sleep(0.1)
    
    print(f"Connection returned to pool")

asyncio.run(main())

Retry Context Manager

import asyncio
from contextlib import asynccontextmanager
from typing import Type

@asynccontextmanager
async def retry(max_attempts: int = 3, delay: float = 1.0, 
                exceptions: tuple = (Exception,)):
    """Async retry context manager."""
    last_exception = None
    
    for attempt in range(max_attempts):
        try:
            yield
            return  # Success, exit
        except exceptions as e:
            last_exception = e
            if attempt < max_attempts - 1:
                print(f"Attempt {attempt + 1} failed: {e}")
                await asyncio.sleep(delay)
    
    raise last_exception

# Usage
async def unreliable_operation():
    import random
    if random.random() < 0.7:
        raise ValueError("Random failure")
    print("Success!")

async def main():
    try:
        async with retry(max_attempts=5, delay=0.1):
            await unreliable_operation()
    except ValueError as e:
        print(f"Final failure: {e}")

asyncio.run(main())

Transaction Context Manager

from contextlib import contextmanager
from typing import Any, Generator

class Transaction:
    """Transaction context manager."""
    
    def __init__(self, connection):
        self.connection = connection
        self.operations = []
    
    def __enter__(self):
        print("Starting transaction")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            print(f"Rolling back due to: {exc_val}")
            self.rollback()
        else:
            print("Committing transaction")
            self.commit()
        return False
    
    def execute(self, operation: str, data: Any = None):
        self.operations.append({"op": operation, "data": data})
        print(f"Queued: {operation}")
    
    def commit(self):
        for op in self.operations:
            print(f"Executing: {op['op']}")
        self.operations.clear()
    
    def rollback(self):
        self.operations.clear()
        print("Rollback complete")

# Usage
class MockConnection:
    pass

# Successful transaction
print("Test 1: Successful transaction")
with Transaction(MockConnection()) as t:
    t.execute("INSERT INTO users VALUES (1, 'Alice')")
    t.execute("UPDATE users SET name = 'Bob' WHERE id = 1")
print()

# Failed transaction
print("Test 2: Failed transaction")
try:
    with Transaction(MockConnection()) as t:
        t.execute("INSERT INTO users VALUES (2, 'Charlie')")
        raise ValueError("Constraint violation")
except ValueError as e:
    print(f"Caught: {e}")

Output:

Architecture Diagram
Test 1: Successful transaction
Starting transaction
Queued: INSERT INTO users VALUES (1, 'Alice')
Queued: UPDATE users SET name = 'Bob' WHERE id = 1
Committing transaction
Executing: INSERT INTO users VALUES (1, 'Alice')
Executing: UPDATE users SET name = 'Bob' WHERE id = 1

Test 2: Failed transaction
Starting transaction
Queued: INSERT INTO users VALUES (2, 'Charlie')
Rolling back due to: Constraint violation
Rollback complete
Caught: Constraint violation

ℹ️

Pattern: Transaction context managers ensure atomic operations with automatic rollback on failure.


Real-World Examples

File Processing

import os
import tempfile
import shutil
from contextlib import contextmanager

@contextmanager
def processed_files(directory: str, pattern: str):
    """Process files in a temporary directory."""
    temp_dir = tempfile.mkdtemp()
    print(f"Created temp dir: {temp_dir}")
    
    try:
        # Create temporary files
        temp_files = []
        for i in range(3):
            temp_file = os.path.join(temp_dir, f"{pattern}_{i}.txt")
            with open(temp_file, 'w') as f:
                f.write(f"Content {i}")
            temp_files.append(temp_file)
        
        yield temp_files
        
    finally:
        # Cleanup
        shutil.rmtree(temp_dir)
        print(f"Cleaned up temp dir")

# Usage
with processed_files("/tmp", "data") as files:
    for file in files:
        with open(file, 'r') as f:
            print(f"Read: {f.read()}")

Cache with TTL

import time
from contextlib import contextmanager
from typing import Any, Dict, Optional

class TTLCache:
    """Cache with time-to-live expiration."""
    
    def __init__(self, ttl: float = 60.0):
        self.ttl = ttl
        self.cache: Dict[str, tuple] = {}
    
    @contextmanager
    def cache(self, key: str, compute_fn):
        """Cache context manager."""
        # Check if cached and valid
        if key in self.cache:
            value, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                print(f"Cache hit: {key}")
                yield value
                return
        
        # Compute and cache
        print(f"Cache miss: {key}")
        value = compute_fn()
        self.cache[key] = (value, time.time())
        yield value

# Usage
cache = TTLCache(ttl=2.0)

def expensive_computation():
    time.sleep(0.1)
    return 42

# First call - cache miss
with cache("result", expensive_computation) as value:
    print(f"Value: {value}")

# Second call - cache hit
with cache("result", expensive_computation) as value:
    print(f"Value: {value}")

# After TTL expires
time.sleep(2.5)
with cache("result", expensive_computation) as value:
    print(f"Value: {value}")

Output:

Architecture Diagram
Cache miss: result
Value: 42
Cache hit: result
Value: 42
Cache miss: result
Value: 42

Complexity Analysis

Performance Impact

PatternOverheadUse Case
Class-basedLowComplex state
@contextmanagerMinimalSimple generators
AsyncEvent loopAsync I/O
ExitStackLowDynamic nesting

Memory Usage

import sys

class RegularClass:
    def __enter__(self):
        return self
    def __exit__(self, *args):
        pass

from contextlib import contextmanager

@contextmanager
def regular_contextmanager():
    yield

# Memory comparison
regular = RegularClass()

print(f"Class-based: {sys.getsizeof(regular)} bytes")
print(f"contextmanager: {sys.getsizeof(regular_contextmanager)} bytes")

Interview Tips

Common Follow-up Questions

  1. "When should you use context managers?"

    • Resource management (files, connections)
    • Setup/teardown operations
    • Exception-safe cleanup
    • Transaction boundaries
  2. "What's the difference between __exit__ returning True vs False?"

    • True: Suppress exception
    • False: Propagate exception
    • Always be explicit about exception handling
  3. "How do async context managers differ?"

    • Use __aenter__ and __aexit__
    • Can await in setup/teardown
    • Use async with syntax

Code Review Tips

# BAD: Forgetting cleanup
class BadResource:
    def __enter__(self):
        self.resource = acquire_resource()
        return self.resource
    
    def __exit__(self, *args):
        pass  # Never cleans up!

# GOOD: Always cleanup
class GoodResource:
    def __enter__(self):
        self.resource = acquire_resource()
        return self.resource
    
    def __exit__(self, *args):
        release_resource(self.resource)

# BAD: Suppressing all exceptions
def __exit__(self, *args):
    return True  # Hides bugs!

# GOOD: Specific exception handling
def __exit__(self, exc_type, exc_val, exc_tb):
    if exc_type == ExpectedException:
        return True  # Intentionally suppress
    return False  # Let others propagate

⚠️

Common Mistake: Suppressing all exceptions without logging. Always log exceptions you suppress.


Summary

TypeUse CaseComplexity
Class-basedComplex state managementMedium
@contextmanagerSimple resource managementLow
AsyncAsync I/O operationsMedium
ExitStackDynamic nestingLow

Best Practices

  1. Always clean up resources in __exit__
  2. Be explicit about exception handling
  3. Use contextlib for simple cases
  4. Log suppressed exceptions
  5. Keep context managers focused
  6. Use async for I/O-bound

ℹ️

Key Takeaway: Context managers ensure proper resource management and exception safety. Use them for any setup/teardown pattern.


Practice Problems

  1. Database Connection: Create a context manager for database connections
  2. File Processing: Build a context manager for temporary file processing
  3. Cache System: Implement a TTL cache with context manager
  4. Transaction: Create a transaction context manager with rollback
  5. Async Pool: Build an async connection pool

Further Reading

  • Python Docs: contextlib module
  • PEP 343: The with statement
  • Async Context Managers: PEP 525
  • Books: "Python Cookbook" by David Beazley

Remember: Context managers are essential for writing robust, resource-safe Python code.

Advertisement