Python Generators — Lazy Evaluation & Memory Efficiency
Generators produce values one at a time instead of building entire lists in memory. They are essential for processing large datasets, streams, and pipelines.
Learning Objectives
- Write generator functions with
yield
- Create generator expressions for concise lazy evaluation
- Build generator pipelines for data processing
- Understand
send(), throw(), and close() methods
- Master
yield from for delegation
- Apply generators for real-world data processing
- Build generator-based context managers
Generator Basics — yield vs return
# yield pauses the function and returns a value
# return exits the function completely
def simple_generator():
print("First yield")
yield 1
print("Between yields")
yield 2
print("Last yield")
yield 3
print("Generator complete")
gen = simple_generator()
print(next(gen)) # Prints "First yield", returns 1
print(next(gen)) # Prints "Between yields", returns 2
print(next(gen)) # Prints "Last yield", returns 3
next(gen) # Prints "Generator complete", raises StopIteration
# State is preserved between calls
def counter(start=0):
n = start
while True:
yield n
n += 1
c = counter(10)
print(next(c)) # 10
print(next(c)) # 11
print(next(c)) # 12
How yield Works Internally
# yield is like a breakpoint that returns a value
# and pauses execution until next() is called again
def demo_yield():
print("Step 1")
yield "A"
print("Step 2")
yield "B"
print("Step 3")
gen = demo_yield()
print("Before first next")
result1 = next(gen) # Prints "Step 1", returns "A"
print(f"Got: {result1}")
result2 = next(gen) # Prints "Step 2", returns "B"
print(f"Got: {result2}")
result3 = next(gen) # Prints "Step 3", raises StopIteration
print(f"Got: {result3}")
# Generator state machine
def state_machine():
state = "IDLE"
while True:
if state == "IDLE":
print("Processing...")
state = "RUNNING"
yield "started"
elif state == "RUNNING":
print("Completing...")
state = "DONE"
yield "finished"
else:
return # StopIteration
yield vs return Comparison
| Feature | return | yield |
|---|
| Exits function | Yes | No (pauses) |
| Can resume | No | Yes |
| Returns value | Yes | Yes (to caller) |
| Preserves state | No | Yes |
| Creates generator | No | Yes |
| Can be called multiple times | N/A | Each next() resumes |
Generator Functions
# Fibonacci generator
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# Use in a for loop (auto-stops on StopIteration)
for i, num in enumerate(fibonacci()):
if i >= 10:
break
print(num, end=" ") # 0 1 1 2 3 5 8 13 21 34
# Range generator
def my_range(start, stop=None, step=1):
if stop is None:
stop = start
start = 0
while (step > 0 and start < stop) or (step < 0 and start > stop):
yield start
start += step
print(list(my_range(5))) # [0, 1, 2, 3, 4]
print(list(my_range(1, 10, 2))) # [1, 3, 5, 7, 9]
print(list(my_range(10, 0, -3))) # [10, 7, 4, 1]
# Windowed average generator
def windowed_average(data, window_size):
window = []
for value in data:
window.append(value)
if len(window) > window_size:
window.pop(0)
yield sum(window) / len(window)
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for avg in windowed_average(data, 3):
print(f"{avg:.2f}", end=" ")
# 1.00 1.50 2.00 3.00 4.00 5.00 6.00 7.00 8.00 9.00
Generator Expressions
# Like list comprehension but with parentheses
squares_list = [x**2 for x in range(1000000)] # Uses ~8MB RAM
squares_gen = (x**2 for x in range(1000000)) # Uses ~0 bytes
# Consume lazily
total = sum(x**2 for x in range(1000)) # No list created
first_big = next(x for x in range(1000000) if x > 999990)
# Generator expressions in function calls (parentheses optional)
list_of_squares = list(x**2 for x in range(10))
max_val = max(x**2 for x in range(100))
any_negative = any(x < 0 for x in [1, 2, -3, 4])
all_positive = all(x > 0 for x in [1, 2, 3, 4])
# Nested generator expressions
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat = (x for row in matrix for x in row)
print(list(flat)) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# Conditional generator
def process_only_positive(numbers):
return (n * 2 for n in numbers if n > 0)
result = list(process_only_positive([-5, 3, -1, 8, 0, 4]))
# [6, 16, 8]
# Generator expression vs list comprehension performance
import sys
import time
# Memory
list_size = sys.getsizeof([x**2 for x in range(10000)])
gen_size = sys.getsizeof((x**2 for x in range(10000)))
print(f"List: {list_size:,} bytes, Generator: {gen_size} bytes")
print(f"List is {list_size/gen_size:.0f}x larger")
# Speed for single-pass operations
start = time.perf_counter()
total = sum([x**2 for x in range(1_000_000)])
list_time = time.perf_counter() - start
start = time.perf_counter()
total = sum(x**2 for x in range(1_000_000))
gen_time = time.perf_counter() - start
print(f"List sum: {list_time:.4f}s, Generator sum: {gen_time:.4f}s")
Generator Expression Syntax Reference
| Pattern | Syntax | Example |
|---|
| Basic | (expr for x in iter) | (x**2 for x in range(10)) |
| Filtered | (expr for x in iter if cond) | (x for x in range(10) if x % 2 == 0) |
| Nested | (expr for x in outer for y in inner) | (x for row in matrix for x in row) |
| Conditional | (a if cond else b for x in iter) | ("even" if x%2==0 else "odd" for x in range(5)) |
Generator Pipelines
def read_large_file(path):
"""Stage 1: Read lines from file."""
with open(path, 'r') as f:
for line in f:
yield line.strip()
def filter_comments(lines):
"""Stage 2: Remove comment lines."""
for line in lines:
if not line.startswith('#'):
yield line
def parse_csv(lines):
"""Stage 3: Parse CSV lines into dicts."""
header = next(lines)
fields = header.split(',')
for line in lines:
values = line.split(',')
yield dict(zip(fields, values))
def filter_by_field(records, field, value):
"""Stage 4: Filter records by field value."""
for record in records:
if record.get(field) == value:
yield record
def transform(records, func):
"""Stage 5: Apply transformation."""
for record in records:
yield func(record)
# Pipeline — processes data lazily end-to-end
# lines = read_large_file('data.csv')
# non_comments = filter_comments(lines)
# records = parse_csv(non_comments)
# active = filter_by_field(records, 'status', 'active')
# formatted = transform(active, lambda r: f"{r['name']}: {r['email']}")
#
# for item in formatted:
# print(item) # Process one record at a time, minimal memory
Real Pipeline: Log Processing
def tail_file(path):
"""Simulate tail -f for log files."""
import time
with open(path, 'r') as f:
f.seek(0, 2) # Go to end
while True:
line = f.readline()
if line:
yield line.strip()
else:
time.sleep(0.1)
def parse_log_line(line):
"""Extract timestamp and message from log line."""
try:
parts = line.split(' ', 2)
return {
'timestamp': parts[0],
'level': parts[1],
'message': parts[2] if len(parts) > 2 else ''
}
except (IndexError, ValueError):
return None
def filter_errors(records):
"""Keep only error-level records."""
for record in records:
if record and record['level'] == 'ERROR':
yield record
def format_alert(record):
"""Format error for alert."""
return f"ALERT: [{record['timestamp']}] {record['message']}"
# Full pipeline
# log_lines = tail_file('/var/log/app.log')
# parsed = (parse_log_line(line) for line in log_lines)
# errors = filter_errors(parsed)
# for error in errors:
# send_alert(format_alert(error))
# Batched pipeline
def batched(iterable, n):
"""Batch iterable into chunks of size n."""
from itertools import islice
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
break
yield batch
# for batch in batched(errors, 10):
# send_batch_alert(batch)
yield From for Delegation
def flatten(nested_list):
"""Recursively flatten nested lists."""
for item in nested_list:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
data = [1, [2, 3], [4, [5, 6]], 7]
print(list(flatten(data))) # [1, 2, 3, 4, 5, 6, 7]
def chain(*iterables):
"""Chain multiple iterables together."""
for iterable in iterables:
yield from iterable
result = list(chain([1, 2], [3, 4], [5, 6]))
# [1, 2, 3, 4, 5, 6]
# yield from with return values (subgenerator results)
def accumulate():
total = 0
while True:
value = yield total
if value is None:
break
total += value
return total # Final value
def sub_accumulator():
yield from accumulate()
# Delegating generators
def caller():
acc = sub_accumulator()
next(acc)
acc.send(10) # 10
acc.send(20) # 30
try:
acc.send(None) # Triggers StopIteration with return value
except StopIteration as e:
return e.value # 30
# yield from with chained generators
def generator_a():
yield 1
yield 2
return "A done"
def generator_b():
yield 3
result = yield from generator_a()
yield result # "A done"
yield 4
print(list(generator_b())) # [3, 1, 2, 'A done', 4]
yield from Reference
| Use Case | Pattern | Example |
|---|
| Flatten nested | yield from flatten(item) | [1,[2,3]] -> [1,2,3] |
| Chain iterables | yield from iterable | [1,2] + [3,4] |
| Delegate to sub-gen | result = yield from sub_gen() | Capture return value |
| Forward all values | yield from gen() | Passthrough generator |
Generator Send, Throw, and Close
# send() — send values INTO the generator
def accumulator():
total = 0
while True:
value = yield total
if value is None:
break
total += value
acc = accumulator()
next(acc) # Prime the generator
print(acc.send(10)) # 10
print(acc.send(20)) # 30
print(acc.send(5)) # 35
# throw() — inject exception into generator
def safe_generator():
try:
while True:
yield "running"
except ValueError:
yield "caught ValueError"
except TypeError:
yield "caught TypeError"
gen = safe_generator()
print(next(gen)) # "running"
print(gen.throw(ValueError)) # "caught ValueError"
# close() — terminate generator (raises GeneratorExit)
def controlled_gen():
try:
while True:
yield "alive"
except GeneratorExit:
print("Cleaning up...")
return
gen = controlled_gen()
print(next(gen)) # "alive"
gen.close() # Prints "Cleaning up..."
# Real example: coroutine-like task
def data_fetcher():
results = []
while True:
url = yield
if url is None:
break
results.append(f"data from {url}")
yield f"Fetched: {url}"
fetcher = data_fetcher()
next(fetcher)
print(fetcher.send("https://api.example.com")) # "Fetched: https://api.example.com"
# Coroutine pipeline with send()
def pipeline_stage(name):
result = None
while True:
data = yield result
if data is None:
break
result = f"{name}: processed({data})"
# Chain stages
stage1 = pipeline_stage("Stage1")
stage2 = pipeline_stage("Stage2")
next(stage1)
next(stage2)
result1 = stage1.send("raw_data") # "Stage1: processed(raw_data)"
result2 = stage2.send(result1) # "Stage2: processed(Stage1: processed(raw_data))"
print(result2)
Generator Methods Reference
| Method | Description | Example |
|---|
send(value) | Send value to generator's yield expression | gen.send(10) |
throw(type) | Inject exception into generator | gen.throw(ValueError) |
throw(type, val) | Inject exception with value | gen.throw(ValueError, "msg") |
close() | Raise GeneratorExit, terminate generator | gen.close() |
Generator-Based Context Managers
import contextlib
# Using @contextmanager decorator
@contextlib.contextmanager
def managed_resource(name):
print(f"Acquiring {name}")
try:
yield name # Value is bound to 'as' variable
finally:
print(f"Releasing {name}")
with managed_resource("database") as res:
print(f"Using {res}")
# Acquiring database
# Using database
# Releasing database
# Generator-based file context manager
@contextlib.contextmanager
def open_file(path, mode='r'):
f = open(path, mode)
try:
yield f
finally:
f.close()
with open_file('test.txt', 'w') as f:
f.write("Hello, World!")
# Error handling in generator context manager
@contextlib.contextmanager
def error_handler():
try:
yield
except ValueError as e:
print(f"Caught ValueError: {e}")
except TypeError as e:
print(f"Caught TypeError: {e}")
with error_handler():
raise ValueError("Something went wrong")
# Caught ValueError: Something went wrong
# Nested context managers
@contextlib.contextmanager
def database_connection(host, port):
print(f"Connecting to {host}:{port}")
conn = {"host": host, "port": port, "connected": True}
try:
yield conn
finally:
conn["connected"] = False
print("Disconnected")
@contextlib.contextmanager
def transaction(conn):
print("Starting transaction")
try:
yield conn
print("Committing transaction")
except:
print("Rolling back transaction")
raise
with database_connection("localhost", 5432) as conn:
with transaction(conn) as tx:
print(f"Executing query on {tx['host']}")
Memory Efficiency Comparison
import sys
import time
# Memory comparison
list_data = [x ** 2 for x in range(1_000_000)]
gen_data = (x ** 2 for x in range(1_000_000))
print(f"List size: {sys.getsizeof(list_data):,} bytes") # ~8,000,056 bytes
print(f"Generator size: {sys.getsizeof(gen_data)} bytes") # ~208 bytes
# Speed comparison for sum
start = time.perf_counter()
list_sum = sum([x ** 2 for x in range(1_000_000)])
list_time = time.perf_counter() - start
start = time.perf_counter()
gen_sum = sum(x ** 2 for x in range(1_000_000))
gen_time = time.perf_counter() - start
print(f"List sum: {list_time:.4f}s") # ~0.08s
print(f"Generator sum: {gen_time:.4f}s") # ~0.06s
# When generators win:
# 1. Large datasets that don't fit in memory
# 2. Chained transformations (pipelines)
# 3. Infinite sequences
# 4. Single-pass processing
# Memory profile for large data
def process_with_list(data):
"""Load all into memory, then process."""
processed = [x * 2 for x in data]
return sum(processed)
def process_with_generator(data):
"""Process lazily, one item at a time."""
return sum(x * 2 for x in data)
# Both produce same result, but generator uses constant memory
| Approach | Memory | Speed | Reusable | Use When |
|---|
| List comprehension | High | Fast | Yes | Need to iterate multiple times |
| Generator expression | Minimal | Lazy | No | Single-pass, large data |
| Generator function | Minimal | Lazy | No | Complex generation logic |
itertools | Minimal | Lazy | No | Standard iteration patterns |
Real-World: Lazy File Reader and Data Pipeline
def lazy_csv_reader(filepath, delimiter=','):
"""Memory-efficient CSV reader for large files."""
with open(filepath, 'r') as f:
header = f.readline().strip().split(delimiter)
for line_num, line in enumerate(f, 2):
fields = line.strip().split(delimiter)
if len(fields) == len(header):
yield dict(zip(header, fields))
else:
print(f"Warning: line {line_num} has {len(fields)} fields, expected {len(header)}")
def batch_records(records, batch_size=1000):
"""Group records into batches."""
batch = []
for record in records:
batch.append(record)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
def transform_record(record):
"""Apply transformations to a record."""
return {
'id': int(record['id']),
'name': record['name'].strip().title(),
'email': record['email'].lower(),
'active': record.get('status', '').lower() == 'active'
}
# Usage for processing millions of records
# records = lazy_csv_reader('users.csv')
# transformed = (transform_record(r) for r in records if r.get('email'))
# for batch in batch_records(transformed, batch_size=500):
# bulk_insert_to_database(batch)
# Real-world: Infinite data stream processor
def infinite_counter(start=0, step=1):
"""Infinite counter generator."""
n = start
while True:
yield n
n += step
def take(n, iterable):
"""Take first n items from iterable."""
from itertools import islice
return islice(iterable, n)
def window(iterable, size):
"""Sliding window over iterable."""
from collections import deque
it = iter(iterable)
window_deque = deque(maxlen=size)
for _ in range(size):
window_deque.append(next(it))
yield tuple(window_deque)
for item in it:
window_deque.append(item)
yield tuple(window_deque)
# Usage
counter = infinite_counter(1, 2)
first_10 = list(take(10, counter))
print(first_10) # [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
# Real-world: Data ETL pipeline
def extract(source):
"""Extract data from source."""
for item in source:
yield item
def transform(items):
"""Transform each item."""
for item in items:
yield {
'key': item['id'].upper(),
'value': item['value'] * 2,
'processed': True
}
def load(items, batch_size=100):
"""Load items in batches."""
from itertools import islice
it = iter(items)
while True:
batch = list(islice(it, batch_size))
if not batch:
break
yield batch
# Full ETL pipeline
# source = read_from_database()
# extracted = extract(source)
# transformed = transform(extracted)
# for batch in load(transformed):
# write_to_warehouse(batch)
Common Mistakes
# Mistake 1: Forgetting generators are single-pass
def my_gen():
yield 1
yield 2
g = my_gen()
list(g) # [1, 2]
list(g) # [] — empty!
# Fix: recreate or cache
def cached_gen():
data = list(my_gen())
return iter(data)
# Mistake 2: Trying to index a generator
# gen = (x for x in range(10))
# gen[5] # TypeError!
# Fix: convert to list or use islice
from itertools import islice
gen = (x for x in range(10))
print(list(islice(gen, 5, 6))) # [5]
# Mistake 3: Generator expression scope confusion
# Python 3 — generator expression has its own scope
x = 10
gen = (x for _ in range(3))
x = 20
print(list(gen)) # [10, 10, 10] — captured at creation time
# Mistake 4: Not priming generators before send()
def my_gen():
value = yield
yield f"Got: {value}"
g = my_gen()
# g.send("hello") # TypeError: can't send non-None value to a just-started generator
next(g) # Prime the generator
g.send("hello") # Works: "Got: hello"
# Mistake 5: Memory leak with large generators
def big_gen():
for i in range(10_000_000):
yield i * i
# This holds the generator in memory until consumed
# large = list(big_gen()) # 80+ MB!
# Fix: process and discard
for val in big_gen():
process(val) # val is freed after each iteration
# Mistake 6: Not handling GeneratorExit in close()
def resource_gen():
try:
while True:
yield "resource"
except GeneratorExit:
# Cleanup code here
print("Resource cleaned up")
# Don't yield here — GeneratorExit can't be caught
# Mistake 7: Using return value in generator
def gen_with_return():
yield 1
yield 2
return 3 # Value is NOT yielded — available as StopIteration.value
g = gen_with_return()
print(next(g)) # 1
print(next(g)) # 2
try:
next(g)
except StopIteration as e:
print(e.value) # 3
Key Takeaways
- Generators produce values lazily — one at a time, not all at once
- Generator expressions use
() not [] — they create generator objects
yield from delegates to sub-generators — enables composition and delegation
- Pipelines chain generators for memory-efficient data processing — each stage processes one item
- Use generators for large files, streams, and infinite sequences — constant memory usage
send() sends values into generators, throw() injects exceptions, close() terminates
- Generators are single-use — recreate them for multiple passes over the same data
@contextlib.contextmanager enables generator-based context managers with try/finally
- Generator state is preserved between
next() calls — local variables survive across yields
StopIteration.value captures the return value of a generator — useful for final results