Context Managers: enter, exit, contextlib, async
Resource management patterns in Python
Interview Question
"Explain context managers in Python. How do __enter__ and __exit__ work? How do you create context managers using contextlib? Show async context managers."
Difficulty: Medium | Frequently asked at Google, Meta, Amazon
Theoretical Foundation
What is a Context Manager?
A context manager manages resources by ensuring proper setup and teardown, even when exceptions occur.
# Without context manager
file = open('data.txt', 'r')
try:
data = file.read()
finally:
file.close() # Must remember to close!
# With context manager
with open('data.txt', 'r') as file:
data = file.read() # Auto-closed when block exits
ℹ️
Key Concept: Context managers guarantee cleanup code runs, preventing resource leaks.
Class-Based Context Managers
Basic Implementation
class FileManager:
"""Context manager for file operations."""
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
"""Setup: called when entering with block."""
print(f"Opening {self.filename}")
self.file = open(self.filename, self.mode)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
"""Teardown: called when exiting with block."""
print(f"Closing {self.filename}")
if self.file:
self.file.close()
# Return False to propagate exceptions
# Return True to suppress exceptions
if exc_type is not None:
print(f"Exception occurred: {exc_val}")
return False
# Usage
with FileManager('test.txt', 'w') as f:
f.write("Hello, World!")
print("File written")
# File is automatically closed here
Output:
Opening test.txt
File written
Closing test.txt
Exception Handling
class SafeResource:
"""Context manager with exception handling."""
def __init__(self, name):
self.name = name
self.resource = None
def __enter__(self):
print(f"Acquiring {self.name}")
self.resource = {"name": self.name, "active": True}
return self.resource
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"Releasing {self.name}")
if self.resource:
self.resource["active"] = False
# Handle specific exceptions
if exc_type == ValueError:
print(f"ValueError caught: {exc_val}")
return True # Suppress exception
if exc_type == TypeError:
print(f"TypeError caught: {exc_val}")
return False # Let exception propagate
return False
# Usage - no exception
print("Test 1: No exception")
with SafeResource("database") as res:
print(f"Using {res['name']}")
print()
# Usage - ValueError (suppressed)
print("Test 2: ValueError (suppressed)")
with SafeResource("database") as res:
raise ValueError("Invalid value")
print("Continues after ValueError")
print()
# Usage - TypeError (not suppressed)
print("Test 3: TypeError (not suppressed)")
try:
with SafeResource("database") as res:
raise TypeError("Type error")
except TypeError as e:
print(f"Caught outside: {e}")
Output:
Test 1: No exception
Acquiring database
Using database
Releasing database
Test 2: ValueError (suppressed)
Acquiring database
Releasing database
ValueError caught: Invalid value
Continues after ValueError
Test 3: TypeError (not suppressed)
Acquiring database
Releasing database
TypeError caught: Type error
Caught outside: Type error
⚠️
Exception Handling: Return True from __exit__ to suppress exceptions, False to propagate them.
contextlib Module
@contextmanager Decorator
from contextlib import contextmanager
import time
@contextmanager
def timer(label):
"""Timer context manager using generator."""
start = time.time()
print(f"Starting timer: {label}")
try:
yield # Control passes to with block
finally:
# Cleanup code runs here
elapsed = time.time() - start
print(f"{label} took {elapsed:.3f}s")
# Usage
with timer("Operation"):
time.sleep(0.1)
print("Doing work...")
# Output:
# Starting timer: Operation
# Doing work...
# Operation took 0.100s
contextmanager with Return Value
from contextlib import contextmanager
@contextmanager
def database_connection(db_name):
"""Database connection context manager."""
print(f"Connecting to {db_name}")
connection = {"name": db_name, "connected": True}
try:
yield connection # Return value to 'as' clause
finally:
print(f"Disconnecting from {db_name}")
connection["connected"] = False
# Usage
with database_connection("mydb") as conn:
print(f"Connected to {conn['name']}")
print(f"Status: {conn['connected']}")
# Output:
# Connecting to mydb
# Connected to mydb
# Status: True
# Disconnecting from mydb
Nested Context Managers
from contextlib import contextmanager
@contextmanager
def open_file(filename, mode):
print(f"Opening {filename}")
f = open(filename, mode)
try:
yield f
finally:
f.close()
print(f"Closed {filename}")
@contextmanager
def temporary_directory():
import tempfile
import shutil
dir_path = tempfile.mkdtemp()
print(f"Created temp dir: {dir_path}")
try:
yield dir_path
finally:
shutil.rmtree(dir_path)
print(f"Removed temp dir: {dir_path}")
# Nested context managers
with temporary_directory() as tmp_dir:
with open_file(f"{tmp_dir}/test.txt", 'w') as f:
f.write("Hello!")
print(f"Wrote to {f.name}")
# Or using contextlib.ExitStack for dynamic nesting
from contextlib import ExitStack
with ExitStack() as stack:
files = []
for i in range(3):
f = stack.enter_context(open(f"file_{i}.txt", 'w'))
files.append(f)
f.write(f"Content {i}")
contextlib.suppress
from contextlib import suppress
# Suppress specific exceptions
with suppress(FileNotFoundError):
open("nonexistent.txt")
print("This won't print")
print("Continues after suppressed exception")
# Equivalent to:
try:
open("nonexistent.txt")
except FileNotFoundError:
pass
Output:
Continues after suppressed exception
contextlib.redirect_stdout
from contextlib import redirect_stdout
from io import StringIO
# Redirect stdout
f = StringIO()
with redirect_stdout(f):
print("This goes to string buffer")
print("Not to console")
output = f.getvalue()
print(f"Captured: {output}")
# Redirect to file
with open("output.txt", "w") as file:
with redirect_stdout(file):
print("This goes to file")
Output:
Captured: This goes to string buffer
Not to console
💡
Interview Tip: contextlib provides shortcuts for common context manager patterns.
Async Context Managers
Basic Async Context Manager
import asyncio
class AsyncDatabase:
"""Async context manager for database."""
def __init__(self, db_name):
self.db_name = db_name
self.connection = None
async def __aenter__(self):
"""Async setup."""
print(f"Connecting to {self.db_name}...")
await asyncio.sleep(0.1) # Simulate async connection
self.connection = {"name": self.db_name, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async teardown."""
print(f"Disconnecting from {self.db_name}...")
await asyncio.sleep(0.1) # Simulate async disconnection
if self.connection:
self.connection["connected"] = False
return False
# Usage
async def main():
async with AsyncDatabase("mydb") as conn:
print(f"Connected to {conn['name']}")
await asyncio.sleep(0.1) # Simulate async work
print(f"Status: {conn['connected']}")
asyncio.run(main())
Output:
Connecting to mydb...
Connected to mydb
Status: True
Disconnecting from mydb...
contextlib.asynccontextmanager
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_timer(label):
"""Async timer context manager."""
start = time.time()
print(f"Starting {label}")
try:
yield
finally:
elapsed = time.time() - start
print(f"{label} took {elapsed:.3f}s")
@asynccontextmanager
async def async_resource(name):
"""Async resource management."""
print(f"Acquiring {name}")
resource = {"name": name, "active": True}
try:
yield resource
finally:
print(f"Releasing {name}")
resource["active"] = False
# Usage
async def main():
async with async_timer("Async Operation"):
async with async_resource("database") as res:
await asyncio.sleep(0.1)
print(f"Using {res['name']}")
asyncio.run(main())
Output:
Starting Async Operation
Acquiring database
Using database
Releasing database
Async Operation took 0.101s
Async ExitStack
import asyncio
from contextlib import AsyncExitStack
async def async_operation(name):
print(f"Starting {name}")
await asyncio.sleep(0.1)
print(f"Finished {name}")
async def main():
async with AsyncExitStack() as stack:
# Dynamically add async context managers
tasks = []
for i in range(3):
task = asyncio.create_task(async_operation(f"task_{i}"))
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(main())
Advanced Patterns
Connection Pool
import asyncio
from typing import List, Optional
from contextlib import asynccontextmanager
class ConnectionPool:
"""Async connection pool."""
def __init__(self, max_size: int = 10):
self.max_size = max_size
self.connections: List[dict] = []
self.available: asyncio.Queue = asyncio.Queue()
self.lock = asyncio.Lock()
async def create_connection(self) -> dict:
"""Create new connection."""
await asyncio.sleep(0.01) # Simulate connection time
return {"id": len(self.connections), "active": True}
@asynccontextmanager
async def get_connection(self):
"""Get connection from pool."""
connection = None
async with self.lock:
if self.available.empty() and len(self.connections) < self.max_size:
connection = await self.create_connection()
self.connections.append(connection)
if connection is None:
connection = await self.available.get()
try:
yield connection
finally:
await self.available.put(connection)
# Usage
async def main():
pool = ConnectionPool(max_size=3)
async with pool.get_connection() as conn:
print(f"Using connection {conn['id']}")
await asyncio.sleep(0.1)
print(f"Connection returned to pool")
asyncio.run(main())
Retry Context Manager
import asyncio
from contextlib import asynccontextmanager
from typing import Type
@asynccontextmanager
async def retry(max_attempts: int = 3, delay: float = 1.0,
exceptions: tuple = (Exception,)):
"""Async retry context manager."""
last_exception = None
for attempt in range(max_attempts):
try:
yield
return # Success, exit
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
print(f"Attempt {attempt + 1} failed: {e}")
await asyncio.sleep(delay)
raise last_exception
# Usage
async def unreliable_operation():
import random
if random.random() < 0.7:
raise ValueError("Random failure")
print("Success!")
async def main():
try:
async with retry(max_attempts=5, delay=0.1):
await unreliable_operation()
except ValueError as e:
print(f"Final failure: {e}")
asyncio.run(main())
Transaction Context Manager
from contextlib import contextmanager
from typing import Any, Generator
class Transaction:
"""Transaction context manager."""
def __init__(self, connection):
self.connection = connection
self.operations = []
def __enter__(self):
print("Starting transaction")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
print(f"Rolling back due to: {exc_val}")
self.rollback()
else:
print("Committing transaction")
self.commit()
return False
def execute(self, operation: str, data: Any = None):
self.operations.append({"op": operation, "data": data})
print(f"Queued: {operation}")
def commit(self):
for op in self.operations:
print(f"Executing: {op['op']}")
self.operations.clear()
def rollback(self):
self.operations.clear()
print("Rollback complete")
# Usage
class MockConnection:
pass
# Successful transaction
print("Test 1: Successful transaction")
with Transaction(MockConnection()) as t:
t.execute("INSERT INTO users VALUES (1, 'Alice')")
t.execute("UPDATE users SET name = 'Bob' WHERE id = 1")
print()
# Failed transaction
print("Test 2: Failed transaction")
try:
with Transaction(MockConnection()) as t:
t.execute("INSERT INTO users VALUES (2, 'Charlie')")
raise ValueError("Constraint violation")
except ValueError as e:
print(f"Caught: {e}")
Output:
Test 1: Successful transaction
Starting transaction
Queued: INSERT INTO users VALUES (1, 'Alice')
Queued: UPDATE users SET name = 'Bob' WHERE id = 1
Committing transaction
Executing: INSERT INTO users VALUES (1, 'Alice')
Executing: UPDATE users SET name = 'Bob' WHERE id = 1
Test 2: Failed transaction
Starting transaction
Queued: INSERT INTO users VALUES (2, 'Charlie')
Rolling back due to: Constraint violation
Rollback complete
Caught: Constraint violation
ℹ️
Pattern: Transaction context managers ensure atomic operations with automatic rollback on failure.
Real-World Examples
File Processing
import os
import tempfile
import shutil
from contextlib import contextmanager
@contextmanager
def processed_files(directory: str, pattern: str):
"""Process files in a temporary directory."""
temp_dir = tempfile.mkdtemp()
print(f"Created temp dir: {temp_dir}")
try:
# Create temporary files
temp_files = []
for i in range(3):
temp_file = os.path.join(temp_dir, f"{pattern}_{i}.txt")
with open(temp_file, 'w') as f:
f.write(f"Content {i}")
temp_files.append(temp_file)
yield temp_files
finally:
# Cleanup
shutil.rmtree(temp_dir)
print(f"Cleaned up temp dir")
# Usage
with processed_files("/tmp", "data") as files:
for file in files:
with open(file, 'r') as f:
print(f"Read: {f.read()}")
Cache with TTL
import time
from contextlib import contextmanager
from typing import Any, Dict, Optional
class TTLCache:
"""Cache with time-to-live expiration."""
def __init__(self, ttl: float = 60.0):
self.ttl = ttl
self.cache: Dict[str, tuple] = {}
@contextmanager
def cache(self, key: str, compute_fn):
"""Cache context manager."""
# Check if cached and valid
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
print(f"Cache hit: {key}")
yield value
return
# Compute and cache
print(f"Cache miss: {key}")
value = compute_fn()
self.cache[key] = (value, time.time())
yield value
# Usage
cache = TTLCache(ttl=2.0)
def expensive_computation():
time.sleep(0.1)
return 42
# First call - cache miss
with cache("result", expensive_computation) as value:
print(f"Value: {value}")
# Second call - cache hit
with cache("result", expensive_computation) as value:
print(f"Value: {value}")
# After TTL expires
time.sleep(2.5)
with cache("result", expensive_computation) as value:
print(f"Value: {value}")
Output:
Cache miss: result
Value: 42
Cache hit: result
Value: 42
Cache miss: result
Value: 42
Complexity Analysis
Performance Impact
| Pattern | Overhead | Use Case |
|---|---|---|
| Class-based | Low | Complex state |
| @contextmanager | Minimal | Simple generators |
| Async | Event loop | Async I/O |
| ExitStack | Low | Dynamic nesting |
Memory Usage
import sys
class RegularClass:
def __enter__(self):
return self
def __exit__(self, *args):
pass
from contextlib import contextmanager
@contextmanager
def regular_contextmanager():
yield
# Memory comparison
regular = RegularClass()
print(f"Class-based: {sys.getsizeof(regular)} bytes")
print(f"contextmanager: {sys.getsizeof(regular_contextmanager)} bytes")
Interview Tips
Common Follow-up Questions
-
"When should you use context managers?"
- Resource management (files, connections)
- Setup/teardown operations
- Exception-safe cleanup
- Transaction boundaries
-
"What's the difference between
__exit__returning True vs False?"True: Suppress exceptionFalse: Propagate exception- Always be explicit about exception handling
-
"How do async context managers differ?"
- Use
__aenter__and__aexit__ - Can await in setup/teardown
- Use
async withsyntax
- Use
Code Review Tips
# BAD: Forgetting cleanup
class BadResource:
def __enter__(self):
self.resource = acquire_resource()
return self.resource
def __exit__(self, *args):
pass # Never cleans up!
# GOOD: Always cleanup
class GoodResource:
def __enter__(self):
self.resource = acquire_resource()
return self.resource
def __exit__(self, *args):
release_resource(self.resource)
# BAD: Suppressing all exceptions
def __exit__(self, *args):
return True # Hides bugs!
# GOOD: Specific exception handling
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type == ExpectedException:
return True # Intentionally suppress
return False # Let others propagate
⚠️
Common Mistake: Suppressing all exceptions without logging. Always log exceptions you suppress.
Summary
| Type | Use Case | Complexity |
|---|---|---|
| Class-based | Complex state management | Medium |
| @contextmanager | Simple resource management | Low |
| Async | Async I/O operations | Medium |
| ExitStack | Dynamic nesting | Low |
Best Practices
- Always clean up resources in
__exit__ - Be explicit about exception handling
- Use contextlib for simple cases
- Log suppressed exceptions
- Keep context managers focused
- Use async for I/O-bound
ℹ️
Key Takeaway: Context managers ensure proper resource management and exception safety. Use them for any setup/teardown pattern.
Practice Problems
- Database Connection: Create a context manager for database connections
- File Processing: Build a context manager for temporary file processing
- Cache System: Implement a TTL cache with context manager
- Transaction: Create a transaction context manager with rollback
- Async Pool: Build an async connection pool
Further Reading
- Python Docs:
contextlibmodule - PEP 343: The
withstatement - Async Context Managers: PEP 525
- Books: "Python Cookbook" by David Beazley
Remember: Context managers are essential for writing robust, resource-safe Python code.