Async/Await & Event Loop in Python
Difficulty: Hard | Companies: Google, Meta, Amazon, Netflix, Stripe
Coroutine Fundamentals
import asyncio
from typing import Coroutine, Any
import time
# Basic coroutine
async def fetch_data(url: str) -> dict:
"""Simulate async data fetching."""
print(f"Starting fetch from {url}")
await asyncio.sleep(1) # Simulate network delay
return {"url": url, "data": "response"}
# Running coroutines
async def main():
"""Main function demonstrating coroutine usage."""
# Sequential execution
start = time.time()
result1 = await fetch_data("api.example.com/1")
result2 = await fetch_data("api.example.com/2")
print(f"Sequential: {time.time() - start:.2f}s") # ~2s
# Concurrent execution
start = time.time()
result1, result2 = await asyncio.gather(
fetch_data("api.example.com/1"),
fetch_data("api.example.com/2")
)
print(f"Concurrent: {time.time() - start:.2f}s") # ~1s
# Run the async function
asyncio.run(main())
βΉοΈ
asyncio.run() is the entry point for running async code. It creates a new event loop and runs the coroutine until it completes.
Event Loop Deep Dive
import asyncio
from asyncio import Queue
import random
class EventLoopDemo:
"""Demonstrate event loop concepts."""
def __init__(self):
self.loop = None
async def producer(self, queue: Queue, name: str):
"""Async producer."""
for i in range(5):
item = f"{name}-{i}"
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None) # Sentinel
async def consumer(self, queue: Queue, name: str):
"""Async consumer."""
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Pass sentinel to other consumers
break
print(f"{name} consumed: {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
queue.task_done()
async def run_producer_consumer(self):
"""Run producer-consumer pattern."""
queue = Queue(maxsize=10)
producers = [
self.producer(queue, f"Producer-{i}")
for i in range(2)
]
consumers = [
self.consumer(queue, f"Consumer-{i}")
for i in range(3)
]
await asyncio.gather(*producers, *consumers)
# Custom event loop integration
async def custom_event_loop_demo():
"""Demonstrate custom event loop usage."""
loop = asyncio.get_event_loop()
# Schedule a callback
def callback(future):
print(f"Callback result: {future.result()}")
# Create task
async def background_task():
await asyncio.sleep(1)
return "Task complete"
task = asyncio.create_task(background_task())
task.add_done_callback(callback)
await task
asyncio.run(custom_event_loop_demo())
Async Context Managers
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
# Async context manager class
class AsyncDatabaseConnection:
"""Async database connection with context manager."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.connection = None
async def __aenter__(self):
print(f"Connecting to {self.connection_string}")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"status": "connected", "string": self.connection_string}
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.05)
self.connection = None
return False # Don't suppress exceptions
async def execute(self, query: str):
if not self.connection:
raise RuntimeError("Not connected")
await asyncio.sleep(0.05)
return f"Result for: {query}"
# Function-based async context manager
@asynccontextmanager
async def async_file_handler(filename: str, mode: str = 'r'):
"""Async context manager for file operations."""
print(f"Opening file: {filename}")
# Simulate async file open
await asyncio.sleep(0.01)
file_obj = {"name": filename, "mode": mode, "closed": False}
try:
yield file_obj
finally:
print(f"Closing file: {filename}")
await asyncio.sleep(0.01)
file_obj["closed"] = True
# Usage
async def async_context_demo():
# Class-based
async with AsyncDatabaseConnection("postgresql://localhost/db") as db:
result = await db.execute("SELECT * FROM users")
print(result)
# Function-based
async with async_file_handler("data.txt", "w") as f:
print(f"Writing to {f['name']}")
asyncio.run(async_context_demo())
Async Generators and Iterators
import asyncio
from typing import AsyncIterator, AsyncGenerator
# Async iterator
class AsyncCounter:
"""Async iterator implementation."""
def __init__(self, start: int = 0, stop: int = 10):
self.current = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async operation
value = self.current
self.current += 1
return value
# Async generator
async def async_range(start: int, stop: int) -> AsyncGenerator[int, None]:
"""Async generator yielding numbers."""
for i in range(start, stop):
await asyncio.sleep(0.01)
yield i
async def async_filter(predicate, async_iterable) -> AsyncGenerator:
"""Async filter function."""
async for item in async_iterable:
if predicate(item):
yield item
async def async_map(function, async_iterable) -> AsyncGenerator:
"""Async map function."""
async for item in async_iterable:
yield function(item)
# Usage
async def async_iteration_demo():
# Async generator
async for num in async_range(0, 5):
print(num, end=" ")
print()
# Async iterator
counter = AsyncCounter(0, 5)
async for num in counter:
print(num, end=" ")
print()
# Composing async operations
async def double(x):
return x * 2
async def is_even(x):
return x % 2 == 0
doubled = async_map(double, async_range(0, 10))
evens = async_filter(is_even, doubled)
async for num in evens:
print(num, end=" ")
asyncio.run(async_iteration_demo())
β οΈ
Async generators cannot be used with standard list comprehensions. Use async comprehensions: [x async for x in async_gen]
Concurrent Async Operations
import asyncio
from asyncio import TaskGroup
from typing import List, Any
import random
# Gathering tasks
async def fetch_with_timeout(url: str, timeout: float) -> dict:
"""Fetch with timeout protection."""
try:
async with asyncio.timeout(timeout):
await asyncio.sleep(random.uniform(0.1, 2.0))
return {"url": url, "status": "success"}
except asyncio.TimeoutError:
return {"url": url, "status": "timeout"}
async def concurrent_fetch_demo():
"""Demonstrate concurrent fetching with different patterns."""
urls = [
"api.example.com/1",
"api.example.com/2",
"api.example.com/3",
"api.example.com/4",
"api.example.com/5",
]
# Basic gather
tasks = [fetch_with_timeout(url, 1.0) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(result)
# TaskGroup for structured concurrency
async def task_group_demo():
"""Demonstrate TaskGroup for structured concurrency."""
async with TaskGroup() as tg:
task1 = tg.create_task(fetch_with_timeout("api/1", 1.0))
task2 = tg.create_task(fetch_with_timeout("api/2", 1.0))
task3 = tg.create_task(fetch_with_timeout("api/3", 1.0))
# All tasks guaranteed to be complete here
print(task1.result())
print(task2.result())
print(task3.result())
# Semaphore for rate limiting
async def rate_limited_fetch(url: str, semaphore: asyncio.Semaphore):
"""Fetch with rate limiting."""
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(0.5)
return {"url": url, "status": "success"}
async def rate_limiting_demo():
"""Demonstrate rate limiting with semaphore."""
semaphore = asyncio.Semaphore(3) # Max 3 concurrent requests
urls = [f"api.example.com/{i}" for i in range(10)]
tasks = [rate_limited_fetch(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(concurrent_fetch_demo())
Async Patterns and Best Practices
import asyncio
from dataclasses import dataclass
from typing import Callable, Any
from functools import wraps
# Async retry decorator
def async_retry(max_attempts: int = 3, delay: float = 1.0):
"""Decorator for retrying async functions."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(delay * (2 ** attempt))
raise last_exception
return wrapper
return decorator
# Async caching
class AsyncLRUCache:
"""LRU cache for async functions."""
def __init__(self, maxsize: int = 128):
self.maxsize = maxsize
self.cache = {}
self.order = []
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = (args, tuple(sorted(kwargs.items())))
if key in self.cache:
self.order.remove(key)
self.order.append(key)
return self.cache[key]
result = await func(*args, **kwargs)
if len(self.cache) >= self.maxsize:
oldest = self.order.pop(0)
del self.cache[oldest]
self.cache[key] = result
self.order.append(key)
return result
wrapper.cache_clear = lambda: (self.cache.clear(), self.order.clear())
return wrapper
# Usage
@async_retry(max_attempts=3, delay=0.5)
async def unreliable_operation():
import random
if random.random() < 0.7:
raise ConnectionError("Service unavailable")
return "Success"
@AsyncLRUCache(maxsize=100)
async def expensive_computation(n: int) -> int:
await asyncio.sleep(0.1) # Simulate expensive work
return n ** 2
async def best_practices_demo():
# Use asyncio.gather for concurrent operations
tasks = [expensive_computation(i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(results)
# Use asyncio.wait for more control
tasks = [asyncio.create_task(expensive_computation(i)) for i in range(5)]
done, pending = await asyncio.wait(tasks, timeout=0.5)
for task in done:
print(f"Completed: {task.result()}")
asyncio.run(best_practices_demo())
Follow-Up Questions
-
Explain the difference between asyncio and threading for I/O-bound tasks.
-
How does the event loop handle blocking calls in async code?
-
What is structured concurrency and why is it important?
-
How do you test async code effectively?
-
Explain the concept of backpressure in async systems.