🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Python File I/O — Advanced Patterns

Python Standard LibraryFile I/O🟢 Free Lesson

Advertisement

Python File I/O — Advanced Patterns

Advanced file I/O patterns handle large files, concurrent access, and safe writes. These patterns are essential for production applications.

Learning Objectives

  • Use pathlib for modern file system operations
  • Process large files efficiently with generators
  • Work with ZIP, TAR, and GZ archives
  • Use temporary files and directories safely
  • Apply memory-mapped files for fast random access
  • Write atomic files to prevent corruption

pathlib Module Deep Dive

pathlib provides an object-oriented interface to file system paths, replacing os.path.

from pathlib import Path

# Create Path objects
p = Path('/home/user/documents')
p = Path.home() / 'documents'  # Using / operator
p = Path('.')  # Current directory

# Common operations
print(p.name)       # 'documents' — filename
print(p.stem)       # 'documents' — filename without suffix
print(p.suffix)     # '' — file extension
print(p.parent)     # '/home/user' — parent directory
print(p.exists())   # True/False
print(p.is_dir())   # True/False
print(p.is_file())  # True/False

# Resolve relative paths
p = Path('..') / 'other' / 'file.txt'
print(p.resolve())  # Absolute path

# Read/write (built-in methods)
content = p.read_text()      # Read entire file as string
p.write_text('Hello')        # Write string to file
data = p.read_bytes()        # Read as bytes
p.write_bytes(b'Hello')      # Write bytes

# Create directories
p = Path('data/results/2024')
p.mkdir(parents=True, exist_ok=True)  # Create all parents

# Change extension
new_path = p.with_suffix('.csv')

Globbing and Finding Files

from pathlib import Path

# Find files by pattern
py_files = Path('.').glob('**/*.py')        # All Python files recursively
txt_files = Path('data').glob('*.txt')       # TXT files in data/
specific = Path('.').glob('test_*.py')       # Test files

# Iterate over results
for f in Path('.').glob('**/*.py'):
    print(f"{f.name}: {f.stat().st_size} bytes")

# rglob (recursive glob, same as **)
for f in Path('.').rglob('*.py'):
    print(f)

# Filter by criteria
large_files = [f for f in Path('.').rglob('*') if f.stat().st_size > 1_000_000]
recent_files = sorted(Path('.').glob('*.py'), key=lambda f: f.stat().st_mtime)

pathlib vs os.path

Operationpathlibos.path
Join pathsp / 'file'os.path.join(p, 'file')
Get namep.nameos.path.basename(p)
Get parentp.parentos.path.dirname(p)
Check existsp.exists()os.path.exists(p)
Make directoryp.mkdir()os.makedirs(p)
List directorylist(p.iterdir())os.listdir(p)
Globp.glob('*.py')glob.glob(p + '/*.py')

Working with ZIP Archives

import zipfile

# Create a ZIP file
with zipfile.ZipFile('archive.zip', 'w') as zf:
    zf.write('file1.txt')
    zf.write('file2.py')
    # Add with different name inside archive
    zf.write('long/path/to/file.txt', 'file.txt')

# Extract all
with zipfile.ZipFile('archive.zip', 'r') as zf:
    zf.extractall('extracted/')

# Extract specific file
with zipfile.ZipFile('archive.zip', 'r') as zf:
    zf.extract('file1.txt', 'extracted/')

# List contents
with zipfile.ZipFile('archive.zip', 'r') as zf:
    print(zf.namelist())
    for info in zf.infolist():
        print(f"{info.filename}: {info.file_size} bytes")

# Add file to existing ZIP
with zipfile.ZipFile('archive.zip', 'a') as zf:
    zf.write('new_file.txt')

# Read file from ZIP without extracting
with zipfile.ZipFile('archive.zip', 'r') as zf:
    content = zf.read('file1.txt').decode('utf-8')

Batch ZIP Processor

import zipfile
from pathlib import Path

def zip_directory(source_dir, output_path):
    """Zip an entire directory."""
    with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
        for file in Path(source_dir).rglob('*'):
            if file.is_file():
                arcname = file.relative_to(source_dir)
                zf.write(file, arcname)
    print(f"Created {output_path}")

def unzip_all(zip_dir, output_dir):
    """Extract all ZIP files in a directory."""
    for zip_file in Path(zip_dir).glob('*.zip'):
        with zipfile.ZipFile(zip_file, 'r') as zf:
            zf.extractall(output_dir / zip_file.stem)
        print(f"Extracted {zip_file.name}")

# Usage
zip_directory('my_project', 'project_backup.zip')
unzip_all(Path('backups'), Path('restored'))

Working with TAR and GZ Archives

import tarfile

# Create tar.gz
with tarfile.open('archive.tar.gz', 'w:gz') as tar:
    tar.add('file1.txt')
    tar.add('file2.py')
    tar.add('directory/', arcname='dir')  # Add directory with custom name

# Extract tar.gz
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
    tar.extractall('extracted/')

# List contents
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
    for member in tar.getmembers():
        print(f"{member.name}: {member.size} bytes")

# Extract specific file
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
    tar.extract('file1.txt', 'extracted/')

# Safe extraction (prevent path traversal attacks)
with tarfile.open('archive.tar.gz', 'r:gz') as tar:
    for member in tar.getmembers():
        if member.name.startswith('/') or '..' in member.name:
            raise ValueError(f"Unsafe path: {member.name}")
    tar.extractall('extracted/')

Tar Format Options

ModeDescriptionCompression
'w'Write, no compressionNone
'w:gz'Write with gzipgzip
'w:bz2'Write with bzip2bzip2
'w:xz'Write with xzxz
'r'ReadNone
'r:gz'Read gzipgzip

Temporary Files and Directories

import tempfile
import os

# Temporary file (auto-deleted when closed)
with tempfile.NamedTemporaryFile(
    mode='w',
    suffix='.txt',
    prefix='myapp_',
    dir='/tmp',
    delete=True  # Auto-delete on close
) as f:
    f.write("temporary data")
    temp_path = f.name
    # Process data using temp_path

# Temporary directory (all contents deleted)
with tempfile.TemporaryDirectory() as tmpdir:
    # Create files in tmpdir
    path = os.path.join(tmpdir, 'data.txt')
    with open(path, 'w') as f:
        f.write("temp")
    # All files in tmpdir are deleted when exiting

# Persistent temporary file (you must delete manually)
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
# ... use tmp.name ...
os.unlink(tmp.name)  # Delete when done

# Get temp directory location
print(tempfile.gettempdir())  # '/tmp' on Linux

tempfile Best Practices

import tempfile
import os

# Use TemporaryDirectory for cleanup safety
with tempfile.TemporaryDirectory() as tmpdir:
    # All files created here are auto-cleaned
    data_file = os.path.join(tmpdir, 'data.csv')
    with open(data_file, 'w') as f:
        f.write('col1,col2\n')
    # Process data_file
    # When exiting, tmpdir and all contents are removed

# For long-running processes, use delete=False
tmp = tempfile.NamedTemporaryFile(
    mode='w+b',
    suffix='.bin',
    delete=False
)
try:
    # Write data
    tmp.write(b'binary data')
    tmp.flush()
    # Process...
finally:
    tmp.close()
    os.unlink(tmp.name)  # Clean up manually

Memory-Mapped Files

Memory mapping lets you access file contents as if they were in memory, but the OS handles loading pages on demand. This is perfect for random access in large files.

import mmap

# Create a memory-mapped file
with open('large_file.bin', 'r+b') as f:
    # Map the entire file (0 = entire file)
    mm = mmap.mmap(f.fileno(), 0)

    # Access like a byte array
    print(mm[0:100])       # First 100 bytes
    print(mm[1000:2000])   # Bytes 1000-2000

    # Seek and read
    mm.seek(500)
    data = mm.read(100)    # Read 100 bytes from position 500

    # Write
    mm.seek(0)
    mm.write(b'HEADER')

    # Find bytes
    pos = mm.find(b'search_term')

    mm.close()

Memory Mapping vs Regular File I/O

ScenarioRegular FileMemory Map
Sequential readGoodGood
Random accessSlow (seek)Fast
Small file (less than 1MB)GoodOverkill
Large file (greater than 100MB)OKBetter
Multiple processes readingComplexEasy
WritingStandardDirect modification
Max sizeUnlimitedLimited by address space

Practical Memory Mapping Example

import mmap
import struct

def create_index_file(filename, entries):
    """Create a binary index file with memory mapping."""
    with open(filename, 'wb') as f:
        for entry in entries:
            # Write 4-byte integer + 20-byte string
            f.write(struct.pack('i20s', entry['id'], entry['name'].encode()[:20]))

def read_entry(filename, entry_size, index):
    """Read a specific entry using memory mapping."""
    with open(filename, 'rb') as f:
        mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        offset = index * entry_size
        data = mm[offset:offset + entry_size]
        id_val, name = struct.unpack('i20s', data)
        mm.close()
        return {'id': id_val, 'name': name.decode().rstrip('\x00')}

# Usage
entries = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]
create_index_file('index.bin', entries)
entry = read_entry('index.bin', 24, 0)  # Read first entry

Large File Processing with Generators

Loading entire files into memory is dangerous for large files. A 10GB log file will crash your program if you try f.read(). Instead, process line by line.

def process_large_file(filename):
    """Process file line by line without loading into memory."""
    with open(filename, 'r', buffering=8192) as f:
        for line in f:  # Python reads in chunks automatically
            yield line.strip()

# Memory-efficient line counting
def count_lines(filename):
    return sum(1 for _ in process_large_file(filename))

# Memory-efficient word counting
def count_words(filename):
    word_count = 0
    for line in process_large_file(filename):
        word_count += len(line.split())
    return word_count

# Process only specific lines
def process_every_nth(filename, n=10):
    for i, line in enumerate(process_large_file(filename)):
        if i % n == 0:
            yield line

# Filter lines
def grep(filename, pattern):
    """Find lines matching a pattern."""
    for line in process_large_file(filename):
        if pattern in line:
            yield line

Why This Works

When you iterate over a file object (for line in f), Python does NOT read the entire file into memory. Instead:

  1. It reads a buffer (default 8KB)
  2. Yields one line at a time
  3. Reads more from disk when buffer is exhausted

This means you can process gigabyte files with only megabytes of RAM.


Atomic Writes

Atomic writes prevent file corruption. If your program crashes mid-write, the file is either completely written or not written at all — never partially written.

import tempfile
import os

def atomic_write(filename, content):
    """Write atomically using temp file + rename."""
    dir_name = os.path.dirname(filename) or '.'

    # Write to temp file in same directory (same filesystem)
    with tempfile.NamedTemporaryFile(
        mode='w',
        dir=dir_name,
        delete=False,
        suffix='.tmp'
    ) as tmp:
        tmp.write(content)
        tmp_path = tmp.name

    # Atomic on most filesystems (POSIX)
    os.replace(tmp_path, filename)

# Usage
atomic_write('config.json', '{"key": "value"}')

Why This Matters

Without atomic writes:

  1. Program starts writing to config.json
  2. Program crashes at 50% — file is now corrupt
  3. On restart, program reads corrupt config -> crash loop

With atomic writes:

  1. Program writes to config.json.tmp
  2. Program crashes — config.json is untouched
  3. On restart, program reads valid config.json

File Watching

import time
from pathlib import Path

def watch_file(filename, callback, interval=1.0):
    """Watch for file changes and call callback."""
    last_mtime = Path(filename).stat().st_mtime

    while True:
        try:
            current_mtime = Path(filename).stat().st_mtime
            if current_mtime != last_mtime:
                callback(filename)
                last_mtime = current_mtime
        except FileNotFoundError:
            pass
        time.sleep(interval)

# Usage
def on_change(filename):
    print(f"{filename} was modified!")

watch_file('config.json', on_change)

Real-World Example: Batch File Processor

from pathlib import Path
import csv
import json
from datetime import datetime

def process_log_files(log_dir, output_file):
    """Process multiple log files and generate summary."""
    results = []

    for log_file in Path(log_dir).glob('*.log'):
        print(f"Processing {log_file.name}...")

        with open(log_file, 'r') as f:
            for line in f:
                if 'ERROR' in line:
                    parts = line.strip().split(' | ')
                    if len(parts) >= 3:
                        results.append({
                            'file': log_file.name,
                            'timestamp': parts[0],
                            'level': parts[1],
                            'message': parts[2]
                        })

    # Write summary
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)

    print(f"Found {len(results)} errors across {len(list(Path(log_dir).glob('*.log')))} files")

# Usage
process_log_files('logs/', 'error_summary.json')

Real-World Example: Log Archiver

import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta

def archive_old_logs(log_dir, archive_dir, days_old=30):
    """Archive logs older than N days."""
    archive_dir = Path(archive_dir)
    archive_dir.mkdir(parents=True, exist_ok=True)

    cutoff = datetime.now() - timedelta(days=days_old)
    archived = 0

    for log_file in Path(log_dir).glob('*.log'):
        mtime = datetime.fromtimestamp(log_file.stat().st_mtime)

        if mtime < cutoff:
            # Create gzipped archive
            archive_path = archive_dir / f"{log_file.name}.gz"
            with open(log_file, 'rb') as f_in:
                with gzip.open(archive_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

            # Remove original
            log_file.unlink()
            archived += 1
            print(f"Archived: {log_file.name} -> {archive_path.name}")

    print(f"Archived {archived} log files")

# Usage
archive_old_logs('logs/', 'archives/', days_old=30)

Common Mistakes

MistakeProblemSolution
Not closing filesResource leaksUse context managers (with)
Reading entire large fileMemory overflowUse generators, line by line
Hardcoding pathsNot portableUse pathlib.Path
Not using atomic writesFile corruption on crashUse temp file + os.replace()
Ignoring encodingUnicode errorsSpecify encoding='utf-8'
Not handling FileNotFoundErrorCrashes on missing filesUse try/except or Path.exists()

Key Takeaways

  1. Use pathlib for modern, object-oriented file operations
  2. Use generators for memory-efficient line-by-line processing
  3. Memory-mapped files are fast for random access in large files
  4. Atomic writes prevent corruption on crashes
  5. Use tempfile for temporary storage (auto-cleanup)
  6. Work with ZIP and TAR archives using standard library
  7. Always close files (use context managers)
  8. Use buffering parameter for custom buffer sizes
  9. Use os.replace() for atomic file replacement
  10. Prefer pathlib over os.path for new code

Premium Content

Python File I/O — Advanced Patterns

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
💼Interview Prep
📜Certificates
🤝Community Access

Already a member? Log in

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement