Introduction
Coroutines are special generators that can receive values from outside. They enable producer-consumer patterns and data pipelines with bidirectional communication.
Basic Coroutine
def coroutine_example():
while True:
value = yield
print(f"Received: {value}")
coro = coroutine_example()
next(coro) # Prime the coroutine
coro.send("Hello")
coro.send("World")
coro.close()
Producer-Consumer
def producer(target):
for i in range(10):
target.send(i)
target.close()
def consumer():
while True:
try:
value = yield
print(f"Consumed: {value}")
except StopIteration:
break
c = consumer()
next(c)
producer(c)
Pipeline with Coroutines
def source(max_val):
for i in range(max_val):
yield i
def filter_transform(target):
while True:
value = yield
if value % 2 == 0:
target.send(value * 2)
def sink():
while True:
value = yield
print(f"Sink received: {value}")
# Connect pipeline
sk = sink()
next(sk)
ft = filter_transform(sk)
next(ft)
for i in range(5):
ft.send(i)
Coroutine with Return Value
def aggregate():
total = 0
count = 0
while True:
value = yield
if value is None:
break
total += value
count += 1
yield total / count if count > 0 else 0
agg = aggregate()
next(agg)
for i in [10, 20, 30]:
agg.send(i)
avg = agg.send(None)
print(f"Average: {avg}")
Exception Handling
def error_handling_coroutine():
try:
while True:
try:
value = yield
print(f"Processed: {value}")
except ValueError as e:
print(f"Caught error: {e}")
except GeneratorExit:
print("Coroutine closed")
coro = error_handling_coroutine()
next(coro)
coro.send(10)
coro.throw(ValueError("Test error"))
coro.close()
Practice Problems
- Implement producer-consumer with coroutines
- Create filter-transform pipeline
- Handle exceptions in coroutines
- Build coroutine with return values
- Implement broadcast pattern