Introduction
Structured concurrency provides a way to manage groups of related async tasks, ensuring they complete or fail together. The TaskGroup pattern ensures proper cleanup.
TaskGroup (Python 3.11+)
import asyncio
async def fetch_data(id):
await asyncio.sleep(id * 0.5)
return f"Data {id}"
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1))
task2 = tg.create_task(fetch_data(2))
task3 = tg.create_task(fetch_data(3))
print(task1.result())
print(task2.result())
print(task3.result())
asyncio.run(main())
Handling Exceptions
import asyncio
async def risky_task(n):
if n == 2:
raise ValueError(f"Error in task {n}")
await asyncio.sleep(n)
return f"Task {n} done"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_task(1))
tg.create_task(risky_task(2)) # This will fail
tg.create_task(risky_task(3))
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Caught: {exc}")
Nursery Pattern
async def nursery(tasks):
async with asyncio.TaskGroup() as tg:
for task in tasks:
tg.create_task(task)
async def worker(n):
print(f"Worker {n} started")
await asyncio.sleep(n)
print(f"Worker {n} done")
async def main():
await nursery([worker(1), worker(2), worker(3)])
Spawn and Wait
async def spawn_and_wait(tasks):
results = []
async with asyncio.TaskGroup() as tg:
async def run_with_result(task, idx):
result = await task
results.append((idx, result))
for idx, task in enumerate(tasks):
tg.create_task(run_with_result(task, idx))
return results
async def main():
tasks = [asyncio.sleep(i, f"Result {i}") for i in range(3)]
results = await spawn_and_wait(tasks)
print(results)
Timeout with TaskGroup
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "Done"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_task())
except asyncio.TimeoutError:
print("Tasks timed out")
Practice Problems
- Use TaskGroup for parallel tasks
- Handle multiple exceptions with except*
- Implement nursery pattern
- Add timeout to TaskGroup
- Create task tracking system