File I/O & Data Import/Export

Module 1: FoundationsFree Lesson

Advertisement

File I/O & Data Import/Export

Why This Matters for Data Science

Data scientists spend approximately 60-80% of their time on data acquisition and preparation. Understanding how to efficiently read, write, and transform data across different formats is foundational. This tutorial provides a rigorous exploration of file I/O operations, data formats, and best practices for building robust data pipelines.


1. File Handling Basics

1.1 The Problem with Manual File Handling

# DANGEROUS: Manual file handling
f = open('data.txt', 'r')
content = f.read()
# What if an exception occurs here?
f.close()  # File might never be closed!

DfResource Leak

A situation where a program acquires a resource (file handle, memory, network connection) but fails to release it. File handles are finite OS resources; leaking them can cause "too many open files" errors and system instability.

1.2 Context Managers (with statement)

Context managers guarantee proper resource cleanup:

# SAFE: Using context manager
with open('data.txt', 'r') as f:
    content = f.read()
# File is automatically closed, even if exception occurs

The with statement uses Python's context manager protocol (__enter__ and __exit__ methods). The __exit__ method is guaranteed to execute even if an exception occurs, making it the safest way to handle resources.

1.3 File Modes

ModeDescriptionPositionCreates File?
'r'Read (default)BeginningNo
'w'WriteBeginningYes (overwrites)
'a'AppendEndYes
'x'Exclusive createBeginningYes (fails if exists)
'r+'Read + WriteBeginningNo
'w+'Write + ReadBeginningYes (overwrites)
'a+'Append + ReadEndYes
# Demonstrate file modes
import os

# Create a test file
test_file = 'test_file.txt'

# Write mode (creates new file)
with open(test_file, 'w') as f:
    f.write("Line 1: Hello\n")
    f.write("Line 2: World\n")

# Read mode
with open(test_file, 'r') as f:
    content = f.read()
    print(f"Read mode:\n{content}")

# Append mode
with open(test_file, 'a') as f:
    f.write("Line 3: Appended\n")

# Verify append
with open(test_file, 'r') as f:
    content = f.read()
    print(f"After append:\n{content}")

# Clean up
os.remove(test_file)

1.4 Reading Strategies

Memory Usage by Reading Strategy

Mreadline=O(1),Mreadlines=O(n),Mread=O(n)M_{\text{readline}} = O(1), \quad M_{\text{readlines}} = O(n), \quad M_{\text{read}} = O(n)

Here,

  • =Memory usage
  • =Total file size in bytes
  • =Reads one line at a time
  • =Reads all lines into a list
  • =Reads entire file as string
import os

# Create test file with multiple lines
test_file = 'reading_test.txt'
with open(test_file, 'w') as f:
    for i in range(100):
        f.write(f"Line {i+1}: This is test data with number {i}\n")

# Method 1: Read entire file (small files only)
with open(test_file, 'r') as f:
    content = f.read()
print(f"Full read: {len(content)} characters")

# Method 2: Read line by line (memory efficient)
line_count = 0
with open(test_file, 'r') as f:
    for line in f:
        line_count += 1
print(f"Line count: {line_count}")

# Method 3: Read all lines into list
with open(test_file, 'r') as f:
    lines = f.readlines()
print(f"Lines list: {len(lines)} items")

# Method 4: Read in chunks (large files)
chunk_size = 1024
chunks = []
with open(test_file, 'r') as f:
    while True:
        chunk = f.read(chunk_size)
        if not chunk:
            break
        chunks.append(chunk)
print(f"Chunks: {len(chunks)} chunks of ~{chunk_size} chars")

# Clean up
os.remove(test_file)

1.5 Binary vs Text Mode

import struct

# Text mode (default)
text_content = "Hello, World! café"

# Binary mode
binary_content = text_content.encode('utf-8')

print(f"Text type: {type(text_content)}")
print(f"Binary type: {type(binary_content)}")
print(f"Text length: {len(text_content)} chars")
print(f"Binary length: {len(binary_content)} bytes")

# Write binary data
binary_file = 'binary_test.bin'
with open(binary_file, 'wb') as f:
    # Write integers as binary
    f.write(struct.pack('i', 42))      # 4 bytes
    f.write(struct.pack('f', 3.14))    # 4 bytes
    f.write(struct.pack('d', 2.718))   # 8 bytes

# Read binary data
with open(binary_file, 'rb') as f:
    int_val = struct.unpack('i', f.read(4))[0]
    float_val = struct.unpack('f', f.read(4))[0]
    double_val = struct.unpack('d', f.read(8))[0]

print(f"\nBinary data:")
print(f"  Integer: {int_val}")
print(f"  Float: {float_val}")
print(f"  Double: {double_val}")

# Clean up
os.remove(binary_file)

Text mode applies platform-specific line ending transformations (\n ↔ \r\n). Binary mode preserves exact bytes. Always use binary mode for non-text data (images, serialized objects, compressed files).


2. CSV: Reading and Writing

2.1 Why CSV is Tricky

CSV files seem simple but have many pitfalls:

  • Delimiter variations (comma, semicolon, tab)
  • Quoting issues (commas within fields)
  • Encoding problems (UTF-8 vs Latin-1)
  • Header row handling
  • Missing values representation

DfComma-Separated Values

A plain text format for tabular data where each line is a record and fields are separated by a delimiter (typically comma). Despite its simplicity, CSV has no formal standard, leading to interoperability issues between different software implementations.

2.2 Python's csv Module

import csv
import io
import os

# Create sample CSV data
csv_data = """name,age,score,city
John Doe,30,95.5,New York
Jane Smith,25,87.3,"Los Angeles, CA"
Bob Johnson,35,,Chicago
Alice Brown,28,92.0,"New York, NY"
Charlie Wilson,32,88.7,"Dallas, TX"
"""

# Write CSV
csv_file = 'sample.csv'
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerow(['name', 'age', 'score', 'city'])  # Header
    writer.writerows([
        ['John Doe', 30, 95.5, 'New York'],
        ['Jane Smith', 25, 87.3, 'Los Angeles, CA'],
        ['Bob Johnson', 35, None, 'Chicago'],
        ['Alice Brown', 28, 92.0, 'New York, NY'],
    ])

# Read CSV
with open(csv_file, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    print("CSV Content:")
    for row in reader:
        print(f"  {row}")

# Clean up
os.remove(csv_file)

2.3 Pandas CSV Operations

import pandas as pd
import numpy as np
import os

# Create comprehensive CSV example
data = {
    'id': range(1, 6),
    'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'age': [28, 35, 42, 31, 29],
    'salary': [75000, 85000, 92000, 78000, 81000],
    'department': ['Engineering', 'Marketing', 'Engineering', 'HR', 'Marketing'],
    'start_date': ['2020-01-15', '2019-03-22', '2018-07-01', '2021-02-10', '2020-09-05']
}

df = pd.DataFrame(data)

# Write CSV with various options
csv_file = 'employees.csv'

# Basic write
df.to_csv(csv_file, index=False)
print("Basic CSV written")

# Write with specific options
df.to_csv(
    'employees_detailed.csv',
    index=False,
    sep=';',                    # Semicolon separator
    columns=['name', 'salary'], # Select columns
    float_format='%.2f',        # Format floats
    date_format='%Y-%m-%d',
    quoting=csv.QUOTE_NONNUMERIC,
    lineterminator='\n'
)
print("Detailed CSV written")

# Read CSV with various options
print("\nReading CSV:")
df_read = pd.read_csv(csv_file)
print(f"Basic read: {df_read.shape}")
print(df_read.head())

# Read with specific options
df_special = pd.read_csv(
    csv_file,
    index_col='id',            # Use id as index
    usecols=['name', 'salary'], # Only read these columns
    dtype={'salary': float},    # Specify dtypes
    na_values=['NA', 'null', ''],  # Custom NA values
    nrows=3                     # Read only first 3 rows
)
print(f"\nSpecial read: {df_special.shape}")
print(df_special)

# Handle different separators
df_semicolon = pd.read_csv('employees_detailed.csv', sep=';')
print(f"\nSemicolon CSV: {df_semicolon.shape}")

# Clean up
os.remove(csv_file)
os.remove('employees_detailed.csv')

2.4 CSV Handling Issues

import pandas as pd
import os

# Issue 1: Different encodings
utf8_data = "name,value\ncafé,100\nnaïve,200"
latin1_data = "name,value\ncafé,100\nnaïve,200"

with open('utf8.csv', 'w', encoding='utf-8') as f:
    f.write(utf8_data)

with open('latin1.csv', 'w', encoding='latin-1') as f:
    f.write(latin1_data)

# Reading different encodings
try:
    df_utf8 = pd.read_csv('utf8.csv', encoding='utf-8')
    print(f"UTF-8 read: {df_utf8['name'].tolist()}")
except UnicodeDecodeError as e:
    print(f"UTF-8 error: {e}")

try:
    df_latin1 = pd.read_csv('latin1.csv', encoding='latin-1')
    print(f"Latin-1 read: {df_latin1['name'].tolist()}")
except UnicodeDecodeError as e:
    print(f"Latin-1 error: {e}")

# Issue 2: Large files - chunked reading
print("\nChunked reading for large files:")
chunk_size = 2
chunks = []
for chunk in pd.read_csv('utf8.csv', chunksize=chunk_size):
    chunks.append(chunk)
    print(f"  Chunk shape: {chunk.shape}")
df_combined = pd.concat(chunks, ignore_index=True)
print(f"Combined shape: {df_combined.shape}")

# Clean up
os.remove('utf8.csv')
os.remove('latin1.csv')
Mchunked=O(chunk_size),Mfull=O(file_size)M_{\text{chunked}} = O(\text{chunk\_size}), \quad M_{\text{full}} = O(\text{file\_size})

Chunked reading loads only chunk_size rows into memory at a time, making it possible to process files larger than available RAM.


3. Excel Files

3.1 Reading Excel

import pandas as pd
import os

# Create Excel file with multiple sheets
excel_file = 'data_report.xlsx'

# Create sample DataFrames
df_sales = pd.DataFrame({
    'month': ['Jan', 'Feb', 'Mar', 'Apr'],
    'revenue': [10000, 12000, 11000, 13000],
    'cost': [7000, 8000, 7500, 8500]
})

df_inventory = pd.DataFrame({
    'product': ['A', 'B', 'C'],
    'quantity': [100, 200, 150],
    'warehouse': ['NYC', 'LA', 'Chicago']
})

# Write to Excel with multiple sheets
with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
    df_sales.to_excel(writer, sheet_name='Sales', index=False)
    df_inventory.to_excel(writer, sheet_name='Inventory', index=False)
    
    # Add summary sheet
    df_summary = pd.DataFrame({
        'Metric': ['Total Revenue', 'Total Cost', 'Profit'],
        'Value': [df_sales['revenue'].sum(), 
                  df_sales['cost'].sum(),
                  df_sales['revenue'].sum() - df_sales['cost'].sum()]
    })
    df_summary.to_excel(writer, sheet_name='Summary', index=False)

print("Excel file created with 3 sheets")

# Read specific sheets
print("\nReading Excel:")
df_sales_read = pd.read_excel(excel_file, sheet_name='Sales')
print(f"Sales sheet: {df_sales_read.shape}")
print(df_sales_read)

df_inventory_read = pd.read_excel(excel_file, sheet_name='Inventory')
print(f"\nInventory sheet: {df_inventory_read.shape}")
print(df_inventory_read)

# Read all sheets
all_sheets = pd.read_excel(excel_file, sheet_name=None)
print(f"\nAll sheets: {list(all_sheets.keys())}")

# Clean up
os.remove(excel_file)

3.2 Excel-Specific Features

import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils.dataframe import dataframe_to_rows
import os

# Advanced Excel writing with formatting
def create_styled_excel(filename: str):
    """Create Excel file with professional formatting."""
    
    # Create workbook
    wb = Workbook()
    
    # Sheet 1: Data with formatting
    ws = wb.active
    ws.title = "Sales Data"
    
    # Headers
    headers = ['Product', 'Q1', 'Q2', 'Q3', 'Q4', 'Total']
    header_font = Font(bold=True, color="FFFFFF")
    header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
    
    for col, header in enumerate(headers, 1):
        cell = ws.cell(row=1, column=col, value=header)
        cell.font = header_font
        cell.fill = header_fill
        cell.alignment = Alignment(horizontal='center')
    
    # Data
    data = [
        ['Widget A', 100, 150, 200, 180],
        ['Widget B', 200, 180, 220, 250],
        ['Widget C', 150, 160, 170, 190],
    ]
    
    for row_idx, row_data in enumerate(data, 2):
        for col_idx, value in enumerate(row_data, 1):
            ws.cell(row=row_idx, column=col_idx, value=value)
        # Formula for total
        ws.cell(row=row_idx, column=6, value=f"=SUM(B{row_idx}:E{row_idx})")
    
    # Adjust column widths
    for col in ws.columns:
        max_length = max(len(str(cell.value or "")) for cell in col)
        ws.column_dimensions[col[0].column_letter].width = max_length + 2
    
    # Sheet 2: Pivot-style summary
    ws2 = wb.create_sheet("Summary")
    ws2['A1'] = "Quarter"
    ws2['B1'] = "Total Sales"
    ws2['A1'].font = Font(bold=True)
    ws2['B1'].font = Font(bold=True)
    
    quarters = ['Q1', 'Q2', 'Q3', 'Q4']
    totals = [sum(row[1] for row in data),
              sum(row[2] for row in data),
              sum(row[3] for row in data),
              sum(row[4] for row in data)]
    
    for i, (q, t) in enumerate(zip(quarters, totals), 2):
        ws2.cell(row=i, column=1, value=q)
        ws2.cell(row=i, column=2, value=t)
    
    # Save
    wb.save(filename)
    print(f"Styled Excel created: {filename}")

# Create the file
create_styled_excel('styled_report.xlsx')

# Read with pandas
df = pd.read_excel('styled_report.xlsx', sheet_name='Sales Data')
print("\nRead styled Excel:")
print(df)

# Clean up
os.remove('styled_report.xlsx')

Excel files (.xlsx) are ZIP archives containing XML files. They support formulas, formatting, charts, and multiple sheets. For large datasets (100K+ rows), consider Parquet instead—it's faster, smaller, and avoids Excel's row limits.


4. JSON Data

4.1 JSON Structures

DfJavaScript Object Notation

A lightweight, text-based data interchange format derived from JavaScript object syntax. JSON represents data as key-value pairs (objects) and ordered lists (arrays). It is language-independent but uses conventions familiar to C-family programmers.

import json
import pandas as pd
from typing import Dict, List, Any

# Sample nested JSON
nested_data = {
    "company": "TechCorp",
    "employees": [
        {
            "id": 1,
            "name": "Alice",
            "department": "Engineering",
            "skills": ["Python", "ML", "SQL"],
            "contact": {
                "email": "alice@techcorp.com",
                "phone": "555-0101"
            }
        },
        {
            "id": 2,
            "name": "Bob",
            "department": "Marketing",
            "skills": ["SEO", "Analytics"],
            "contact": {
                "email": "bob@techcorp.com",
                "phone": "555-0102"
            }
        }
    ],
    "metadata": {
        "version": "1.0",
        "last_updated": "2024-01-15"
    }
}

# Write JSON
json_file = 'company_data.json'
with open(json_file, 'w') as f:
    json.dump(nested_data, f, indent=2)

print("JSON written")

# Read JSON
with open(json_file, 'r') as f:
    loaded_data = json.load(f)

print(f"\nLoaded JSON:")
print(f"  Company: {loaded_data['company']}")
print(f"  Employees: {len(loaded_data['employees'])}")

# Flatten nested JSON for pandas
print("\nFlattening nested JSON:")
employees = loaded_data['employees']
df_flat = pd.json_normalize(employees, sep='_')
print(df_flat)

4.2 JSON Processing Patterns

import json
import pandas as pd
from typing import List, Dict, Any

# Pattern 1: Extract specific fields
def extract_json_fields(data: Dict, fields: List[str]) -> Dict:
    """Extract specific fields from nested JSON."""
    result = {}
    for field in fields:
        keys = field.split('.')
        value = data
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                value = None
                break
        result[field] = value
    return result

# Example usage
data = {
    "user": {
        "profile": {
            "name": "Alice",
            "age": 30
        },
        "settings": {
            "theme": "dark"
        }
    }
}

fields = ['user.profile.name', 'user.profile.age', 'user.settings.theme']
extracted = extract_json_fields(data, fields)
print("Extracted fields:", extracted)

# Pattern 2: Transform list of JSON objects
def transform_json_list(json_list: List[Dict], 
                        key_mapping: Dict[str, str]) -> List[Dict]:
    """Transform JSON objects with key mapping."""
    transformed = []
    for item in json_list:
        new_item = {}
        for old_key, new_key in key_mapping.items():
            if old_key in item:
                new_item[new_key] = item[old_key]
        transformed.append(new_item)
    return transformed

# Example
json_list = [
    {"first_name": "Alice", "last_name": "Smith", "emp_id": 1},
    {"first_name": "Bob", "last_name": "Jones", "emp_id": 2}
]

mapping = {"first_name": "given_name", "last_name": "family_name", "emp_id": "id"}
result = transform_json_list(json_list, mapping)
print("\nTransformed JSON:")
for item in result:
    print(f"  {item}")

# Pattern 3: Merge multiple JSON files
def merge_json_files(file_paths: List[str]) -> List[Dict]:
    """Merge multiple JSON files containing lists."""
    merged = []
    for path in file_paths:
        with open(path, 'r') as f:
            data = json.load(f)
            if isinstance(data, list):
                merged.extend(data)
            else:
                merged.append(data)
    return merged

4.3 JSON Lines (NDJSON)

import json
import pandas as pd

# JSON Lines format (one JSON object per line)
ndjson_data = """{"name": "Alice", "age": 30, "city": "NYC"}
{"name": "Bob", "age": 25, "city": "LA"}
{"name": "Charlie", "age": 35, "city": "Chicago"}
{"name": "Diana", "age": 28, "city": "Boston"}
"""

# Write NDJSON
ndjson_file = 'people.ndjson'
with open(ndjson_file, 'w') as f:
    f.write(ndjson_data)

# Read NDJSON with pandas
df = pd.read_json(ndjson_file, lines=True)
print("NDJSON loaded:")
print(df)
print(f"\nShape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
print(f"Memory: {df.memory_usage(deep=True).sum()} bytes")

# Process NDJSON line by line (memory efficient for large files)
print("\nStreaming NDJSON:")
with open(ndjson_file, 'r') as f:
    for i, line in enumerate(f):
        record = json.loads(line)
        print(f"  Line {i}: {record}")

# Clean up
os.remove(ndjson_file)

JSON vs NDJSON

JSON:O(n) parse,NDJSON:O(1) per line\text{JSON}: O(n) \text{ parse}, \quad \text{NDJSON}: O(1) \text{ per line}

Here,

  • =Must parse entire document to access any record
  • =Can process one record at a time (streaming)

5. SQL Databases

5.1 SQLAlchemy Connection

import pandas as pd
from sqlalchemy import create_engine, text
import os

# Create in-memory SQLite database
engine = create_engine('sqlite:///sample.db')

# Create sample tables
with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE employees (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            department TEXT,
            salary REAL,
            hire_date DATE
        )
    """))
    
    conn.execute(text("""
        CREATE TABLE departments (
            id INTEGER PRIMARY KEY,
            name TEXT UNIQUE,
            budget REAL,
            manager TEXT
        )
    """))
    
    # Insert sample data
    conn.execute(text("""
        INSERT INTO employees (id, name, department, salary, hire_date) VALUES
        (1, 'Alice', 'Engineering', 85000, '2020-01-15'),
        (2, 'Bob', 'Marketing', 72000, '2019-03-22'),
        (3, 'Charlie', 'Engineering', 92000, '2018-07-01'),
        (4, 'Diana', 'HR', 68000, '2021-02-10'),
        (5, 'Eve', 'Marketing', 78000, '2020-09-05')
    """))
    
    conn.execute(text("""
        INSERT INTO departments (id, name, budget, manager) VALUES
        (1, 'Engineering', 500000, 'Alice'),
        (2, 'Marketing', 300000, 'Bob'),
        (3, 'HR', 200000, 'Diana')
    """))
    
    conn.commit()

print("Database created with sample data")

5.2 Querying with Pandas

DfParameterized Query

A SQL query where user input is bound as parameters rather than string-concatenated into the query text. This prevents SQL injection attacks by ensuring user input is treated as data, not executable code.

# Read SQL queries into DataFrames
query = "SELECT * FROM employees WHERE salary > 70000"
df = pd.read_sql(query, engine)
print("Query result:")
print(df)

# Parameterized queries (prevent SQL injection)
param_query = "SELECT * FROM employees WHERE department = :dept"
df_eng = pd.read_sql(param_query, engine, params={"dept": "Engineering"})
print(f"\nEngineering employees:")
print(df_eng)

# Aggregation queries
agg_query = """
    SELECT 
        department,
        COUNT(*) as count,
        AVG(salary) as avg_salary,
        MAX(salary) as max_salary
    FROM employees
    GROUP BY department
"""
df_agg = pd.read_sql(agg_query, engine)
print(f"\nDepartment statistics:")
print(df_agg)

# Join queries
join_query = """
    SELECT 
        e.name,
        e.salary,
        d.name as department_name,
        d.budget
    FROM employees e
    JOIN departments d ON e.department = d.name
"""
df_joined = pd.read_sql(join_query, engine)
print(f"\nJoined data:")
print(df_joined)

# Write DataFrame to SQL
new_data = pd.DataFrame({
    'id': [6, 7],
    'name': ['Frank', 'Grace'],
    'department': ['Engineering', 'Marketing'],
    'salary': [95000, 82000],
    'hire_date': ['2022-01-10', '2022-03-15']
})

new_data.to_sql('new_employees', engine, if_exists='replace', index=False)
print(f"\nNew employees written to database")

# Verify
df_verify = pd.read_sql("SELECT * FROM new_employees", engine)
print(df_verify)

# Clean up
os.remove('sample.db')

5.3 Chunked Reading for Large Databases

# Demonstrate chunked reading
chunk_size = 2
chunks = []

for chunk in pd.read_sql("SELECT * FROM employees", engine, chunksize=chunk_size):
    chunks.append(chunk)
    print(f"  Chunk: {chunk.shape[0]} rows")

df_combined = pd.concat(chunks, ignore_index=True)
print(f"Combined: {df_combined.shape[0]} rows")

6. Parquet: Columnar Storage

6.1 Why Parquet?

Parquet is optimized for analytics:

  • Columnar storage: Only reads needed columns
  • Compression: Much smaller files
  • Predicate pushdown: Filters applied at read time
  • Schema evolution: Add columns without rewriting

DfColumnar Storage

A storage format where data is organized by columns rather than rows. This enables: (1) reading only required columns (I/O reduction), (2) better compression (similar values grouped together), and (3) vectorized processing on contiguous column data.

6.2 Parquet Operations

import pandas as pd
import numpy as np
import time
import os

# Create sample dataset
np.random.seed(42)
n_rows = 100000

df = pd.DataFrame({
    'id': range(n_rows),
    'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
    'value1': np.random.normal(0, 1, n_rows),
    'value2': np.random.uniform(0, 100, n_rows),
    'text': ['Sample text ' + str(i) for i in range(n_rows)]
})

print(f"Original DataFrame:")
print(f"  Rows: {len(df):,}")
print(f"  Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Write to different formats for comparison
csv_file = 'data.csv'
parquet_file = 'data.parquet'

# CSV
start = time.perf_counter()
df.to_csv(csv_file, index=False)
csv_time = time.perf_counter() - start
csv_size = os.path.getsize(csv_file) / 1024**2

# Parquet
start = time.perf_counter()
df.to_parquet(parquet_file, index=False)
parquet_time = time.perf_counter() - start
parquet_size = os.path.getsize(parquet_file) / 1024**2

print(f"\nFile comparison:")
print(f"{'Format':<10} {'Size (MB)':<12} {'Write Time':<12} {'Compression':<12}")
print("-" * 46)
print(f"{'CSV':<10} {csv_size:<12.2f} {csv_time:<12.4f} {csv_size/parquet_size:.1f}x")
print(f"{'Parquet':<10} {parquet_size:<12.2f} {parquet_time:<12.4f} 1x")

# Read performance
print(f"\nRead performance:")

start = time.perf_counter()
df_csv = pd.read_csv(csv_file)
csv_read_time = time.perf_counter() - start

start = time.perf_counter()
df_parquet = pd.read_parquet(parquet_file)
parquet_read_time = time.perf_counter() - start

print(f"  CSV read: {csv_read_time:.4f}s")
print(f"  Parquet read: {parquet_read_time:.4f}s")
print(f"  Speedup: {csv_read_time/parquet_read_time:.1f}x faster with Parquet")

# Column selection (Parquet advantage)
print(f"\nColumn selection (reading 2 of 5 columns):")

start = time.perf_counter()
df_csv_cols = pd.read_csv(csv_file, usecols=['id', 'value1'])
csv_cols_time = time.perf_counter() - start

start = time.perf_counter()
df_parquet_cols = pd.read_parquet(parquet_file, columns=['id', 'value1'])
parquet_cols_time = time.perf_counter() - start

print(f"  CSV: {csv_cols_time:.4f}s (reads all columns)")
print(f"  Parquet: {parquet_cols_time:.4f}s (reads only selected)")
print(f"  Speedup: {csv_cols_time/parquet_cols_time:.1f}x")

# Clean up
os.remove(csv_file)
os.remove(parquet_file)
Rcompression=CSV_sizeParquet_size3 to 10R_{\text{compression}} = \frac{\text{CSV\_size}}{\text{Parquet\_size}} \approx 3\text{ to }10

Parquet files are typically 3-10x smaller than equivalent CSV files due to columnar encoding and compression algorithms like Snappy, Gzip, or Zstandard.

6.3 Parquet Compression Options

import pandas as pd
import os

# Test different compression algorithms
compressions = {
    'snappy': None,  # Default, fast
    'gzip': '.gz',   # Good compression
    'brotli': '.br', # Best compression
    'lz4': None,     # Fast decompression
    'zstd': None     # Good balance
}

df = pd.DataFrame({
    'id': range(10000),
    'category': ['A', 'B', 'C', 'D'] * 2500,
    'value': range(10000)
})

print("Compression comparison:")
print(f"{'Algorithm':<12} {'Size (bytes)':<15} {'Ratio':<10}")
print("-" * 37)

base_size = None
for comp, ext in compressions.items():
    filename = f'test_{comp}.parquet'
    try:
        df.to_parquet(filename, compression=comp, index=False)
        size = os.path.getsize(filename)
        if base_size is None:
            base_size = size
        ratio = base_size / size if size > 0 else 0
        print(f"{comp:<12} {size:<15,} {ratio:.2f}x")
        os.remove(filename)
    except Exception as e:
        print(f"{comp:<12} Error: {e}")

# Reading compressed Parquet
df.to_parquet('test_zstd.parquet', compression='zstd', index=False)
df_read = pd.read_parquet('test_zstd.parquet')
print(f"\nRead compressed Parquet: {df_read.shape}")
os.remove('test_zstd.parquet')

7. HDF5: Hierarchical Data

7.1 When to Use HDF5

HDF5 is ideal for:

  • Large numerical datasets
  • Hierarchical data structures
  • Multiple arrays in one file
  • Partial reads/writes

DfHierarchical Data Format

A file format designed for storing and organizing large amounts of data. HDF5 files contain two types of objects: groups (directories) and datasets (files). Data is stored in a hierarchical structure with metadata (attributes) attached to any object.

7.2 HDF5 Operations

import pandas as pd
import numpy as np
import os

# Create sample data
np.random.seed(42)
n = 10000

df1 = pd.DataFrame({
    'x': np.random.normal(0, 1, n),
    'y': np.random.normal(0, 1, n)
})

df2 = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=n, freq='T'),
    'value': np.random.uniform(0, 100, n)
})

# Write to HDF5
hdf_file = 'data.h5'

# Store multiple datasets
df1.to_hdf(hdf_file, key='coordinates', mode='w')
df2.to_hdf(hdf_file, key='timeseries', mode='a')

print("HDF5 file created with 2 datasets")

# Read specific dataset
df_coords = pd.read_hdf(hdf_file, key='coordinates')
df_ts = pd.read_hdf(hdf_file, key='timeseries')

print(f"\nCoordinates: {df_coords.shape}")
print(f"Timeseries: {df_ts.shape}")

# List all keys
import h5py
with h5py.File(hdf_file, 'r') as f:
    print(f"\nHDF5 structure:")
    def print_structure(name, obj):
        print(f"  {name}: {type(obj).__name__}")
    f.visititems(print_structure)

# Store with data columns (for efficient queries)
store = pd.HDFStore(hdf_file)
store.append('sensor_data', 
             pd.DataFrame({
                 'sensor_id': np.random.choice(['A', 'B', 'C'], 1000),
                 'reading': np.random.normal(0, 1, 1000),
                 'timestamp': pd.date_range('2024-01-01', periods=1000, freq='S')
             }),
             data_columns=['sensor_id'])

# Query using data columns
result = store.select('sensor_data', where="sensor_id='A'")
print(f"\nQuery sensor_id='A': {result.shape}")
store.close()

# Clean up
os.remove(hdf_file)

8. API Data Acquisition

8.1 The requests Library

import requests
import json
import time
from typing import Dict, Any, Optional

class APIClient:
    """Base API client with rate limiting and error handling."""
    
    def __init__(self, base_url: str, rate_limit: float = 1.0):
        self.base_url = base_url
        self.session = requests.Session()
        self.last_request_time = 0
        self.rate_limit = rate_limit  # Seconds between requests
    
    def _rate_limit_wait(self):
        """Wait if necessary to respect rate limits."""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.rate_limit:
            time.sleep(self.rate_limit - elapsed)
        self.last_request_time = time.time()
    
    def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Make GET request with error handling."""
        self._rate_limit_wait()
        
        url = f"{self.base_url}/{endpoint}"
        try:
            response = self.session.get(url, params=params, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}
    
    def post(self, endpoint: str, data: Dict) -> Dict[str, Any]:
        """Make POST request."""
        self._rate_limit_wait()
        
        url = f"{self.base_url}/{endpoint}"
        try:
            response = self.session.post(url, json=data, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

# Example: JSONPlaceholder API (fake REST API for testing)
client = APIClient("https://jsonplaceholder.typicode.com", rate_limit=0.5)

# GET request
print("GET request:")
posts = client.get("posts", params={"_limit": 3})
if "error" not in posts:
    for post in posts:
        print(f"  Post {post['id']}: {post['title'][:50]}...")
else:
    print(f"  Error: {posts['error']}")

# GET single resource
print("\nGET single resource:")
post = client.get("posts/1")
if "error" not in post:
    print(f"  Title: {post['title']}")
    print(f"  Body: {post['body'][:100]}...")

8.2 Pagination Patterns

import requests
import pandas as pd
from typing import List, Dict

class PaginatedAPIClient:
    """Handle paginated API responses."""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = requests.Session()
    
    def get_all_pages(self, endpoint: str, 
                      page_param: str = 'page',
                      per_page: int = 100,
                      max_pages: int = 10) -> List[Dict]:
        """Fetch all pages of results."""
        all_results = []
        page = 1
        
        while page <= max_pages:
            params = {page_param: page, '_limit': per_page}
            response = self.session.get(
                f"{self.base_url}/{endpoint}",
                params=params
            )
            
            if response.status_code != 200:
                break
            
            data = response.json()
            if not data:
                break
            
            all_results.extend(data)
            page += 1
            
            # Check if we've reached the end
            if len(data) < per_page:
                break
        
        return all_results
    
    def get_paginated_generator(self, endpoint: str, 
                               per_page: int = 100):
        """Generator for memory-efficient pagination."""
        page = 1
        
        while True:
            response = self.session.get(
                f"{self.base_url}/{endpoint}",
                params={'_limit': per_page, 'page': page}
            )
            
            if response.status_code != 200:
                break
            
            data = response.json()
            if not data:
                break
            
            yield data
            
            if len(data) < per_page:
                break
            
            page += 1

# Example usage
client = PaginatedAPIClient("https://jsonplaceholder.typicode.com")

# Fetch all posts (limited for demo)
print("Fetching posts with pagination:")
all_posts = client.get_all_pages("posts", per_page=10, max_pages=3)
print(f"  Total posts fetched: {len(all_posts)}")

# Convert to DataFrame
df = pd.DataFrame(all_posts)
print(f"  DataFrame shape: {df.shape}")
print(f"  Columns: {df.columns.tolist()}")

8.3 Error Handling and Retry Logic

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
from typing import Optional

def create_session_with_retry(
    max_retries: int = 3,
    backoff_factor: float = 0.5,
    status_forcelist: tuple = (500, 502, 503, 504)
) -> requests.Session:
    """Create requests session with automatic retries."""
    session = requests.Session()
    
    retry_strategy = Retry(
        total=max_retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# Example with error handling
def safe_api_call(
    url: str,
    params: Optional[dict] = None,
    max_retries: int = 3
) -> dict:
    """Make API call with comprehensive error handling."""
    session = create_session_with_retry(max_retries)
    
    try:
        response = session.get(url, params=params, timeout=10)
        response.raise_for_status()
        return {"success": True, "data": response.json()}
    
    except requests.exceptions.Timeout:
        return {"success": False, "error": "Request timed out"}
    
    except requests.exceptions.ConnectionError:
        return {"success": False, "error": "Connection failed"}
    
    except requests.exceptions.HTTPError as e:
        return {"success": False, "error": f"HTTP {e.response.status_code}"}
    
    except ValueError:
        return {"success": False, "error": "Invalid JSON response"}
    
    finally:
        session.close()

# Test
result = safe_api_call("https://jsonplaceholder.typicode.com/posts/1")
if result["success"]:
    print(f"API call successful: {result['data']['title'][:50]}...")
else:
    print(f"API call failed: {result['error']}")

Exponential backoff (delay = base × 2^attempt) is the standard retry strategy for transient API failures. It prevents thundering herd problems by spacing out retry attempts across multiple clients.


9. Format Comparison Table

FormatSpeedCompressionType SafetyUse CasePandas Support
CSVSlowNoneNoSimple data, interoperabilityread_csv()
ExcelSlowLowPartialBusiness reports, formulasread_excel()
JSONMediumMediumYesAPIs, nested dataread_json()
ParquetFastHighYesAnalytics, big dataread_parquet()
HDF5FastHighYesScientific data, arraysread_hdf()
FeatherVery FastMediumYesDataFrame serializationread_feather()
PickleFastMediumYesPython objectsread_pickle()
SQLMediumN/AYesStructured data, queriesread_sql()

Performance Benchmark

import pandas as pd
import numpy as np
import time
import os

# Create benchmark dataset
np.random.seed(42)
n_rows = 100000
n_cols = 10

data = {f'col_{i}': np.random.randn(n_rows) for i in range(n_cols)}
df = pd.DataFrame(data)
df['category'] = np.random.choice(['A', 'B', 'C', 'D'], n_rows)
df['text'] = ['text_' + str(i) for i in range(n_rows)]

print(f"Benchmark dataset: {n_rows:,} rows × {n_cols + 2} columns")
print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")

# Benchmark different formats
formats = {
    'CSV': lambda: df.to_csv('bench.csv', index=False),
    'JSON': lambda: df.to_json('bench.json', orient='records'),
    'Parquet': lambda: df.to_parquet('bench.parquet', index=False),
    'HDF5': lambda: df.to_hdf('bench.h5', key='data', mode='w'),
    'Pickle': lambda: df.to_pickle('bench.pkl'),
    'Feather': lambda: df.to_feather('bench.feather'),
}

results = []
for name, write_func in formats.items():
    # Write
    start = time.perf_counter()
    write_func()
    write_time = time.perf_counter() - start
    
    # Get file size
    ext = {'CSV': '.csv', 'JSON': '.json', 'Parquet': '.parquet',
           'HDF5': '.h5', 'Pickle': '.pkl', 'Feather': '.feather'}
    size = os.path.getsize(f'bench{ext[name]}') / 1024**2
    
    # Read
    read_funcs = {
        'CSV': lambda: pd.read_csv('bench.csv'),
        'JSON': lambda: pd.read_json('bench.json'),
        'Parquet': lambda: pd.read_parquet('bench.parquet'),
        'HDF5': lambda: pd.read_hdf('bench.h5'),
        'Pickle': lambda: pd.read_pickle('bench.pkl'),
        'Feather': lambda: pd.read_feather('bench.feather'),
    }
    
    start = time.perf_counter()
    _ = read_funcs[name]()
    read_time = time.perf_counter() - start
    
    results.append({
        'Format': name,
        'Size (MB)': size,
        'Write (s)': write_time,
        'Read (s)': read_time,
        'Total (s)': write_time + read_time
    })

# Display results
results_df = pd.DataFrame(results)
print("Benchmark Results:")
print(results_df.to_string(index=False))

# Clean up
for ext in ['.csv', '.json', '.parquet', '.h5', '.pkl', '.feather']:
    os.remove(f'bench{ext}')

10. Complete Data Pipeline

10.1 Multi-Source Data Integration

import pandas as pd
import numpy as np
import json
import os
from typing import Dict, List, Optional
from datetime import datetime

class DataPipeline:
    """Complete data pipeline for data science projects."""
    
    def __init__(self, data_dir: str = './data'):
        self.data_dir = data_dir
        os.makedirs(data_dir, exist_ok=True)
    
    def load_csv(self, filename: str, **kwargs) -> pd.DataFrame:
        """Load CSV with common preprocessing."""
        filepath = os.path.join(self.data_dir, filename)
        df = pd.read_csv(filepath, **kwargs)
        print(f"Loaded {filename}: {df.shape}")
        return df
    
    def load_excel(self, filename: str, sheet: str = 0, **kwargs) -> pd.DataFrame:
        """Load Excel with specific sheet."""
        filepath = os.path.join(self.data_dir, filename)
        df = pd.read_excel(filepath, sheet_name=sheet, **kwargs)
        print(f"Loaded {filename} (sheet {sheet}): {df.shape}")
        return df
    
    def load_json(self, filename: str, **kwargs) -> pd.DataFrame:
        """Load JSON (handles both regular and NDJSON)."""
        filepath = os.path.join(self.data_dir, filename)
        
        # Try regular JSON first
        try:
            df = pd.read_json(filepath, **kwargs)
            print(f"Loaded {filename}: {df.shape}")
            return df
        except ValueError:
            # Try NDJSON
            df = pd.read_json(filepath, lines=True, **kwargs)
            print(f"Loaded {filename} (NDJSON): {df.shape}")
            return df
    
    def load_parquet(self, filename: str, columns: Optional[List[str]] = None) -> pd.DataFrame:
        """Load Parquet with optional column selection."""
        filepath = os.path.join(self.data_dir, filename)
        df = pd.read_parquet(filepath, columns=columns)
        print(f"Loaded {filename}: {df.shape}")
        return df
    
    def merge_dataframes(
        self,
        dfs: List[pd.DataFrame],
        on: Optional[str] = None,
        how: str = 'inner'
    ) -> pd.DataFrame:
        """Merge multiple DataFrames."""
        if len(dfs) == 0:
            return pd.DataFrame()
        
        result = dfs[0]
        for df in dfs[1:]:
            if on:
                result = pd.merge(result, df, on=on, how=how)
            else:
                result = pd.merge(result, df, left_index=True, right_index=True, how=how)
        
        print(f"Merged result: {result.shape}")
        return result
    
    def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply common data cleaning operations."""
        initial_shape = df.shape
        
        # Remove duplicates
        df = df.drop_duplicates()
        
        # Handle missing values
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        categorical_cols = df.select_dtypes(include=['object']).columns
        
        # Fill numeric with median
        df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
        
        # Fill categorical with mode
        for col in categorical_cols:
            if df[col].isna().any():
                df[col] = df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'Unknown')
        
        print(f"Cleaned data: {initial_shape} → {df.shape}")
        return df
    
    def save_data(
        self,
        df: pd.DataFrame,
        filename: str,
        format: str = 'parquet'
    ) -> str:
        """Save DataFrame in specified format."""
        filepath = os.path.join(self.data_dir, filename)
        
        if format == 'parquet':
            df.to_parquet(filepath, index=False)
        elif format == 'csv':
            df.to_csv(filepath, index=False)
        elif format == 'json':
            df.to_json(filepath, orient='records', indent=2)
        elif format == 'hdf5':
            df.to_hdf(filepath, key='data', mode='w')
        else:
            raise ValueError(f"Unsupported format: {format}")
        
        size = os.path.getsize(filepath) / 1024**2
        print(f"Saved {filename} ({format}): {size:.2f} MB")
        return filepath


# Example usage
pipeline = DataPipeline('./pipeline_data')

# Create sample data
np.random.seed(42)

# Source 1: Sales data
sales_data = pd.DataFrame({
    'transaction_id': range(1000),
    'product_id': np.random.randint(1, 50, 1000),
    'quantity': np.random.randint(1, 10, 1000),
    'price': np.random.uniform(10, 100, 1000),
    'date': pd.date_range('2024-01-01', periods=1000, freq='H')
})

# Source 2: Product data
product_data = pd.DataFrame({
    'product_id': range(1, 51),
    'category': np.random.choice(['Electronics', 'Clothing', 'Food'], 50),
    'brand': [f'Brand_{chr(65 + i % 10)}' for i in range(50)]
})

# Source 3: Customer data
customer_data = pd.DataFrame({
    'customer_id': range(1, 101),
    'name': [f'Customer_{i}' for i in range(1, 101)],
    'segment': np.random.choice(['Premium', 'Regular', 'Basic'], 100)
})

# Save sample data
sales_data.to_parquet('./pipeline_data/sales.parquet', index=False)
product_data.to_csv('./pipeline_data/products.csv', index=False)
customer_data.to_json('./pipeline_data/customers.json', orient='records')

# Run pipeline
print("\n" + "="*60)
print("DATA PIPELINE EXECUTION")
print("="*60 + "\n")

# Load
sales = pipeline.load_parquet('sales.parquet')
products = pipeline.load_csv('products.csv')
customers = pipeline.load_json('customers.json')

# Transform
sales['total'] = sales['quantity'] * sales['price']
sales['customer_id'] = np.random.randint(1, 101, len(sales))

# Merge
merged = pipeline.merge_dataframes(
    [sales, products, customers],
    on=None
)

# Clean
cleaned = pipeline.clean_data(merged)

# Feature engineering
cleaned['day_of_week'] = cleaned['date'].dt.day_name()
cleaned['hour'] = cleaned['date'].dt.hour

# Save results
pipeline.save_data(cleaned, 'processed_data.parquet', format='parquet')
pipeline.save_data(cleaned, 'processed_data.csv', format='csv')

print("\nPipeline completed successfully!")
print(f"Final dataset: {cleaned.shape}")
print(f"Columns: {cleaned.columns.tolist()}")

# Clean up
import shutil
shutil.rmtree('./pipeline_data')

Key Takeaways

📋Summary: File I/O & Data Import

  1. Context Managers Always: Use with statements for file I/O. They guarantee proper resource cleanup even when exceptions occur.
  2. Format Selection: CSV for small-medium data; Parquet for analytics and large datasets; JSON for APIs and nested data; HDF5 for scientific computing; Excel for business reports.
  3. Encoding Matters: Always specify encoding='utf-8' for text files; handle encoding errors with errors='replace' or errors='ignore'.
  4. Performance Optimization: Use chunked reading for large files; select only needed columns; Parquet reads are 10-100x faster than CSV for column selection.
  5. Data Quality: Validate data after loading (check shapes, dtypes, nulls); handle missing values explicitly; remove duplicates.
  6. API Best Practices: Implement rate limiting; use retry logic with exponential backoff; handle timeouts gracefully; cache responses when possible.

Practice Exercises

Exercise 1: Multi-Format Reader

Build a universal data reader that:

  1. Auto-detects file format from extension
  2. Handles CSV, Excel, JSON, Parquet
  3. Validates data after loading
  4. Returns standardized DataFrame

Exercise 2: Data Quality Pipeline

Create a pipeline that:

  1. Loads data from multiple sources
  2. Validates schema and data types
  3. Handles missing values with configurable strategies
  4. Logs all transformations
  5. Generates a data quality report

Exercise 3: Parquet Migration

Migrate a legacy CSV-based data warehouse to Parquet:

  1. Convert existing CSV files to Parquet
  2. Implement incremental updates
  3. Compare query performance before/after
  4. Document storage savings

Exercise 4: API Data Collection

Build an API client that:

  1. Handles pagination
  2. Implements rate limiting
  3. Caches responses locally
  4. Handles authentication
  5. Stores data in Parquet format

Exercise 5: Data Pipeline Orchestration

Create a complete data pipeline that:

  1. Extracts data from 3+ sources (CSV, API, database)
  2. Transforms and cleans data
  3. Loads to data warehouse
  4. Generates quality metrics
  5. Sends notifications on failure

Advertisement

Need Expert Data Science Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement