Python File I/O — Advanced Patterns
Advanced file I/O patterns handle large files, concurrent access, and safe writes. These patterns are essential for production applications.
Learning Objectives
- Use pathlib for modern file system operations
- Process large files efficiently with generators
- Work with ZIP, TAR, and GZ archives
- Use temporary files and directories safely
- Apply memory-mapped files for fast random access
- Write atomic files to prevent corruption
pathlib Module Deep Dive
pathlib provides an object-oriented interface to file system paths, replacing os.path.
from pathlib import Path
# Create Path objects
p = Path('/home/user/documents')
p = Path.home() / 'documents' # Using / operator
p = Path('.') # Current directory
# Common operations
print(p.name) # 'documents' — filename
print(p.stem) # 'documents' — filename without suffix
print(p.suffix) # '' — file extension
print(p.parent) # '/home/user' — parent directory
print(p.exists()) # True/False
print(p.is_dir()) # True/False
print(p.is_file()) # True/False
# Resolve relative paths
p = Path('..') / 'other' / 'file.txt'
print(p.resolve()) # Absolute path
# Read/write (built-in methods)
content = p.read_text() # Read entire file as string
p.write_text('Hello') # Write string to file
data = p.read_bytes() # Read as bytes
p.write_bytes(b'Hello') # Write bytes
# Create directories
p = Path('data/results/2024')
p.mkdir(parents=True, exist_ok=True) # Create all parents
# Change extension
new_path = p.with_suffix('.csv')
Globbing and Finding Files
from pathlib import Path
# Find files by pattern
py_files = Path('.').glob('**/*.py') # All Python files recursively
txt_files = Path('data').glob('*.txt') # TXT files in data/
specific = Path('.').glob('test_*.py') # Test files
# Iterate over results
for f in Path('.').glob('**/*.py'):
print(f"{f.name}: {f.stat().st_size} bytes")
# rglob (recursive glob, same as **)
for f in Path('.').rglob('*.py'):
print(f)
# Filter by criteria
large_files = [f for f in Path('.').rglob('*') if f.stat().st_size > 1_000_000]
recent_files = sorted(Path('.').glob('*.py'), key=lambda f: f.stat().st_mtime)
pathlib vs os.path
| Operation | pathlib | os.path |
|---|---|---|
| Join paths | p / 'file' | os.path.join(p, 'file') |
| Get name | p.name | os.path.basename(p) |
| Get parent | p.parent | os.path.dirname(p) |
| Check exists | p.exists() | os.path.exists(p) |
| Make directory | p.mkdir() | os.makedirs(p) |
| List directory | list(p.iterdir()) | os.listdir(p) |
| Glob | p.glob('*.py') | glob.glob(p + '/*.py') |
Working with ZIP Archives
import zipfile
# Create a ZIP file
with zipfile.ZipFile('archive.zip', 'w') as zf:
zf.write('file1.txt')
zf.write('file2.py')
# Add with different name inside archive
zf.write('long/path/to/file.txt', 'file.txt')
# Extract all
with zipfile.ZipFile('archive.zip', 'r') as zf:
zf.extractall('extracted/')
# Extract specific file
with zipfile.ZipFile('archive.zip', 'r') as zf:
zf.extract('file1.txt', 'extracted/')
# List contents
with zipfile.ZipFile('archive.zip', 'r') as zf:
print(zf.namelist())
for info in zf.infolist():
print(f"{info.filename}: {info.file_size} bytes")
# Add file to existing ZIP
with zipfile.ZipFile('archive.zip', 'a') as zf:
zf.write('new_file.txt')
# Read file from ZIP without extracting
with zipfile.ZipFile('archive.zip', 'r') as zf:
content = zf.read('file1.txt').decode('utf-8')
Batch ZIP Processor
import zipfile
from pathlib import Path
def zip_directory(source_dir, output_path):
"""Zip an entire directory."""
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for file in Path(source_dir).rglob('*'):
if file.is_file():
arcname = file.relative_to(source_dir)
zf.write(file, arcname)
print(f"Created {output_path}")
def unzip_all(zip_dir, output_dir):
"""Extract all ZIP files in a directory."""
for zip_file in Path(zip_dir).glob('*.zip'):
with zipfile.ZipFile(zip_file, 'r') as zf:
zf.extractall(output_dir / zip_file.stem)
print(f"Extracted {zip_file.name}")
# Usage
zip_directory('my_project', 'project_backup.zip')
unzip_all(Path('backups'), Path('restored'))
Working with TAR and GZ Archives
import tarfile
# Create tar.gz
with tarfile.open('archive.tar.gz', 'w:gz') as tar:
tar.add('file1.txt')
tar.add('file2.py')
tar.add('directory/', arcname='dir') # Add directory with custom name
# Extract tar.gz
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
tar.extractall('extracted/')
# List contents
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
for member in tar.getmembers():
print(f"{member.name}: {member.size} bytes")
# Extract specific file
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
tar.extract('file1.txt', 'extracted/')
# Safe extraction (prevent path traversal attacks)
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
for member in tar.getmembers():
if member.name.startswith('/') or '..' in member.name:
raise ValueError(f"Unsafe path: {member.name}")
tar.extractall('extracted/')
Tar Format Options
| Mode | Description | Compression |
|---|---|---|
'w' | Write, no compression | None |
'w:gz' | Write with gzip | gzip |
'w:bz2' | Write with bzip2 | bzip2 |
'w:xz' | Write with xz | xz |
'r' | Read | None |
'r:gz' | Read gzip | gzip |
Temporary Files and Directories
import tempfile
import os
# Temporary file (auto-deleted when closed)
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.txt',
prefix='myapp_',
dir='/tmp',
delete=True # Auto-delete on close
) as f:
f.write("temporary data")
temp_path = f.name
# Process data using temp_path
# Temporary directory (all contents deleted)
with tempfile.TemporaryDirectory() as tmpdir:
# Create files in tmpdir
path = os.path.join(tmpdir, 'data.txt')
with open(path, 'w') as f:
f.write("temp")
# All files in tmpdir are deleted when exiting
# Persistent temporary file (you must delete manually)
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
# ... use tmp.name ...
os.unlink(tmp.name) # Delete when done
# Get temp directory location
print(tempfile.gettempdir()) # '/tmp' on Linux
tempfile Best Practices
import tempfile
import os
# Use TemporaryDirectory for cleanup safety
with tempfile.TemporaryDirectory() as tmpdir:
# All files created here are auto-cleaned
data_file = os.path.join(tmpdir, 'data.csv')
with open(data_file, 'w') as f:
f.write('col1,col2\n')
# Process data_file
# When exiting, tmpdir and all contents are removed
# For long-running processes, use delete=False
tmp = tempfile.NamedTemporaryFile(
mode='w+b',
suffix='.bin',
delete=False
)
try:
# Write data
tmp.write(b'binary data')
tmp.flush()
# Process...
finally:
tmp.close()
os.unlink(tmp.name) # Clean up manually
Memory-Mapped Files
Memory mapping lets you access file contents as if they were in memory, but the OS handles loading pages on demand. This is perfect for random access in large files.
import mmap
# Create a memory-mapped file
with open('large_file.bin', 'r+b') as f:
# Map the entire file (0 = entire file)
mm = mmap.mmap(f.fileno(), 0)
# Access like a byte array
print(mm[0:100]) # First 100 bytes
print(mm[1000:2000]) # Bytes 1000-2000
# Seek and read
mm.seek(500)
data = mm.read(100) # Read 100 bytes from position 500
# Write
mm.seek(0)
mm.write(b'HEADER')
# Find bytes
pos = mm.find(b'search_term')
mm.close()
Memory Mapping vs Regular File I/O
| Scenario | Regular File | Memory Map |
|---|---|---|
| Sequential read | Good | Good |
| Random access | Slow (seek) | Fast |
| Small file (less than 1MB) | Good | Overkill |
| Large file (greater than 100MB) | OK | Better |
| Multiple processes reading | Complex | Easy |
| Writing | Standard | Direct modification |
| Max size | Unlimited | Limited by address space |
Practical Memory Mapping Example
import mmap
import struct
def create_index_file(filename, entries):
"""Create a binary index file with memory mapping."""
with open(filename, 'wb') as f:
for entry in entries:
# Write 4-byte integer + 20-byte string
f.write(struct.pack('i20s', entry['id'], entry['name'].encode()[:20]))
def read_entry(filename, entry_size, index):
"""Read a specific entry using memory mapping."""
with open(filename, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
offset = index * entry_size
data = mm[offset:offset + entry_size]
id_val, name = struct.unpack('i20s', data)
mm.close()
return {'id': id_val, 'name': name.decode().rstrip('\x00')}
# Usage
entries = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]
create_index_file('index.bin', entries)
entry = read_entry('index.bin', 24, 0) # Read first entry
Large File Processing with Generators
Loading entire files into memory is dangerous for large files. A 10GB log file will crash your program if you try f.read(). Instead, process line by line.
def process_large_file(filename):
"""Process file line by line without loading into memory."""
with open(filename, 'r', buffering=8192) as f:
for line in f: # Python reads in chunks automatically
yield line.strip()
# Memory-efficient line counting
def count_lines(filename):
return sum(1 for _ in process_large_file(filename))
# Memory-efficient word counting
def count_words(filename):
word_count = 0
for line in process_large_file(filename):
word_count += len(line.split())
return word_count
# Process only specific lines
def process_every_nth(filename, n=10):
for i, line in enumerate(process_large_file(filename)):
if i % n == 0:
yield line
# Filter lines
def grep(filename, pattern):
"""Find lines matching a pattern."""
for line in process_large_file(filename):
if pattern in line:
yield line
Why This Works
When you iterate over a file object (for line in f), Python does NOT read the entire file into memory. Instead:
- It reads a buffer (default 8KB)
- Yields one line at a time
- Reads more from disk when buffer is exhausted
This means you can process gigabyte files with only megabytes of RAM.
Atomic Writes
Atomic writes prevent file corruption. If your program crashes mid-write, the file is either completely written or not written at all — never partially written.
import tempfile
import os
def atomic_write(filename, content):
"""Write atomically using temp file + rename."""
dir_name = os.path.dirname(filename) or '.'
# Write to temp file in same directory (same filesystem)
with tempfile.NamedTemporaryFile(
mode='w',
dir=dir_name,
delete=False,
suffix='.tmp'
) as tmp:
tmp.write(content)
tmp_path = tmp.name
# Atomic on most filesystems (POSIX)
os.replace(tmp_path, filename)
# Usage
atomic_write('config.json', '{"key": "value"}')
Why This Matters
Without atomic writes:
- Program starts writing to config.json
- Program crashes at 50% — file is now corrupt
- On restart, program reads corrupt config -> crash loop
With atomic writes:
- Program writes to config.json.tmp
- Program crashes — config.json is untouched
- On restart, program reads valid config.json
File Watching
import time
from pathlib import Path
def watch_file(filename, callback, interval=1.0):
"""Watch for file changes and call callback."""
last_mtime = Path(filename).stat().st_mtime
while True:
try:
current_mtime = Path(filename).stat().st_mtime
if current_mtime != last_mtime:
callback(filename)
last_mtime = current_mtime
except FileNotFoundError:
pass
time.sleep(interval)
# Usage
def on_change(filename):
print(f"{filename} was modified!")
watch_file('config.json', on_change)
Real-World Example: Batch File Processor
from pathlib import Path
import csv
import json
from datetime import datetime
def process_log_files(log_dir, output_file):
"""Process multiple log files and generate summary."""
results = []
for log_file in Path(log_dir).glob('*.log'):
print(f"Processing {log_file.name}...")
with open(log_file, 'r') as f:
for line in f:
if 'ERROR' in line:
parts = line.strip().split(' | ')
if len(parts) >= 3:
results.append({
'file': log_file.name,
'timestamp': parts[0],
'level': parts[1],
'message': parts[2]
})
# Write summary
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"Found {len(results)} errors across {len(list(Path(log_dir).glob('*.log')))} files")
# Usage
process_log_files('logs/', 'error_summary.json')
Real-World Example: Log Archiver
import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta
def archive_old_logs(log_dir, archive_dir, days_old=30):
"""Archive logs older than N days."""
archive_dir = Path(archive_dir)
archive_dir.mkdir(parents=True, exist_ok=True)
cutoff = datetime.now() - timedelta(days=days_old)
archived = 0
for log_file in Path(log_dir).glob('*.log'):
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime < cutoff:
# Create gzipped archive
archive_path = archive_dir / f"{log_file.name}.gz"
with open(log_file, 'rb') as f_in:
with gzip.open(archive_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove original
log_file.unlink()
archived += 1
print(f"Archived: {log_file.name} -> {archive_path.name}")
print(f"Archived {archived} log files")
# Usage
archive_old_logs('logs/', 'archives/', days_old=30)
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| Not closing files | Resource leaks | Use context managers (with) |
| Reading entire large file | Memory overflow | Use generators, line by line |
| Hardcoding paths | Not portable | Use pathlib.Path |
| Not using atomic writes | File corruption on crash | Use temp file + os.replace() |
| Ignoring encoding | Unicode errors | Specify encoding='utf-8' |
| Not handling FileNotFoundError | Crashes on missing files | Use try/except or Path.exists() |
Key Takeaways
- Use pathlib for modern, object-oriented file operations
- Use generators for memory-efficient line-by-line processing
- Memory-mapped files are fast for random access in large files
- Atomic writes prevent corruption on crashes
- Use
tempfilefor temporary storage (auto-cleanup) - Work with ZIP and TAR archives using standard library
- Always close files (use context managers)
- Use
bufferingparameter for custom buffer sizes - Use
os.replace()for atomic file replacement - Prefer
pathliboveros.pathfor new code