File I/O & Data Import/Export
Why This Matters for Data Science
Data scientists spend approximately 60-80% of their time on data acquisition and preparation. Understanding how to efficiently read, write, and transform data across different formats is foundational. This tutorial provides a rigorous exploration of file I/O operations, data formats, and best practices for building robust data pipelines.
1. File Handling Basics
1.1 The Problem with Manual File Handling
# DANGEROUS: Manual file handling
f = open('data.txt', 'r')
content = f.read()
# What if an exception occurs here?
f.close() # File might never be closed!
DfResource Leak
A situation where a program acquires a resource (file handle, memory, network connection) but fails to release it. File handles are finite OS resources; leaking them can cause "too many open files" errors and system instability.
1.2 Context Managers (with statement)
Context managers guarantee proper resource cleanup:
# SAFE: Using context manager
with open('data.txt', 'r') as f:
content = f.read()
# File is automatically closed, even if exception occurs
The with statement uses Python's context manager protocol (__enter__ and __exit__ methods). The __exit__ method is guaranteed to execute even if an exception occurs, making it the safest way to handle resources.
1.3 File Modes
| Mode | Description | Position | Creates File? |
|---|---|---|---|
'r' | Read (default) | Beginning | No |
'w' | Write | Beginning | Yes (overwrites) |
'a' | Append | End | Yes |
'x' | Exclusive create | Beginning | Yes (fails if exists) |
'r+' | Read + Write | Beginning | No |
'w+' | Write + Read | Beginning | Yes (overwrites) |
'a+' | Append + Read | End | Yes |
# Demonstrate file modes
import os
# Create a test file
test_file = 'test_file.txt'
# Write mode (creates new file)
with open(test_file, 'w') as f:
f.write("Line 1: Hello\n")
f.write("Line 2: World\n")
# Read mode
with open(test_file, 'r') as f:
content = f.read()
print(f"Read mode:\n{content}")
# Append mode
with open(test_file, 'a') as f:
f.write("Line 3: Appended\n")
# Verify append
with open(test_file, 'r') as f:
content = f.read()
print(f"After append:\n{content}")
# Clean up
os.remove(test_file)
1.4 Reading Strategies
Memory Usage by Reading Strategy
Here,
- =Memory usage
- =Total file size in bytes
- =Reads one line at a time
- =Reads all lines into a list
- =Reads entire file as string
import os
# Create test file with multiple lines
test_file = 'reading_test.txt'
with open(test_file, 'w') as f:
for i in range(100):
f.write(f"Line {i+1}: This is test data with number {i}\n")
# Method 1: Read entire file (small files only)
with open(test_file, 'r') as f:
content = f.read()
print(f"Full read: {len(content)} characters")
# Method 2: Read line by line (memory efficient)
line_count = 0
with open(test_file, 'r') as f:
for line in f:
line_count += 1
print(f"Line count: {line_count}")
# Method 3: Read all lines into list
with open(test_file, 'r') as f:
lines = f.readlines()
print(f"Lines list: {len(lines)} items")
# Method 4: Read in chunks (large files)
chunk_size = 1024
chunks = []
with open(test_file, 'r') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
print(f"Chunks: {len(chunks)} chunks of ~{chunk_size} chars")
# Clean up
os.remove(test_file)
1.5 Binary vs Text Mode
import struct
# Text mode (default)
text_content = "Hello, World! café"
# Binary mode
binary_content = text_content.encode('utf-8')
print(f"Text type: {type(text_content)}")
print(f"Binary type: {type(binary_content)}")
print(f"Text length: {len(text_content)} chars")
print(f"Binary length: {len(binary_content)} bytes")
# Write binary data
binary_file = 'binary_test.bin'
with open(binary_file, 'wb') as f:
# Write integers as binary
f.write(struct.pack('i', 42)) # 4 bytes
f.write(struct.pack('f', 3.14)) # 4 bytes
f.write(struct.pack('d', 2.718)) # 8 bytes
# Read binary data
with open(binary_file, 'rb') as f:
int_val = struct.unpack('i', f.read(4))[0]
float_val = struct.unpack('f', f.read(4))[0]
double_val = struct.unpack('d', f.read(8))[0]
print(f"\nBinary data:")
print(f" Integer: {int_val}")
print(f" Float: {float_val}")
print(f" Double: {double_val}")
# Clean up
os.remove(binary_file)
Text mode applies platform-specific line ending transformations (\n ↔ \r\n). Binary mode preserves exact bytes. Always use binary mode for non-text data (images, serialized objects, compressed files).
2. CSV: Reading and Writing
2.1 Why CSV is Tricky
CSV files seem simple but have many pitfalls:
- Delimiter variations (comma, semicolon, tab)
- Quoting issues (commas within fields)
- Encoding problems (UTF-8 vs Latin-1)
- Header row handling
- Missing values representation
DfComma-Separated Values
A plain text format for tabular data where each line is a record and fields are separated by a delimiter (typically comma). Despite its simplicity, CSV has no formal standard, leading to interoperability issues between different software implementations.
2.2 Python's csv Module
import csv
import io
import os
# Create sample CSV data
csv_data = """name,age,score,city
John Doe,30,95.5,New York
Jane Smith,25,87.3,"Los Angeles, CA"
Bob Johnson,35,,Chicago
Alice Brown,28,92.0,"New York, NY"
Charlie Wilson,32,88.7,"Dallas, TX"
"""
# Write CSV
csv_file = 'sample.csv'
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['name', 'age', 'score', 'city']) # Header
writer.writerows([
['John Doe', 30, 95.5, 'New York'],
['Jane Smith', 25, 87.3, 'Los Angeles, CA'],
['Bob Johnson', 35, None, 'Chicago'],
['Alice Brown', 28, 92.0, 'New York, NY'],
])
# Read CSV
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
print("CSV Content:")
for row in reader:
print(f" {row}")
# Clean up
os.remove(csv_file)
2.3 Pandas CSV Operations
import pandas as pd
import numpy as np
import os
# Create comprehensive CSV example
data = {
'id': range(1, 6),
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'age': [28, 35, 42, 31, 29],
'salary': [75000, 85000, 92000, 78000, 81000],
'department': ['Engineering', 'Marketing', 'Engineering', 'HR', 'Marketing'],
'start_date': ['2020-01-15', '2019-03-22', '2018-07-01', '2021-02-10', '2020-09-05']
}
df = pd.DataFrame(data)
# Write CSV with various options
csv_file = 'employees.csv'
# Basic write
df.to_csv(csv_file, index=False)
print("Basic CSV written")
# Write with specific options
df.to_csv(
'employees_detailed.csv',
index=False,
sep=';', # Semicolon separator
columns=['name', 'salary'], # Select columns
float_format='%.2f', # Format floats
date_format='%Y-%m-%d',
quoting=csv.QUOTE_NONNUMERIC,
lineterminator='\n'
)
print("Detailed CSV written")
# Read CSV with various options
print("\nReading CSV:")
df_read = pd.read_csv(csv_file)
print(f"Basic read: {df_read.shape}")
print(df_read.head())
# Read with specific options
df_special = pd.read_csv(
csv_file,
index_col='id', # Use id as index
usecols=['name', 'salary'], # Only read these columns
dtype={'salary': float}, # Specify dtypes
na_values=['NA', 'null', ''], # Custom NA values
nrows=3 # Read only first 3 rows
)
print(f"\nSpecial read: {df_special.shape}")
print(df_special)
# Handle different separators
df_semicolon = pd.read_csv('employees_detailed.csv', sep=';')
print(f"\nSemicolon CSV: {df_semicolon.shape}")
# Clean up
os.remove(csv_file)
os.remove('employees_detailed.csv')
2.4 CSV Handling Issues
import pandas as pd
import os
# Issue 1: Different encodings
utf8_data = "name,value\ncafé,100\nnaïve,200"
latin1_data = "name,value\ncafé,100\nnaïve,200"
with open('utf8.csv', 'w', encoding='utf-8') as f:
f.write(utf8_data)
with open('latin1.csv', 'w', encoding='latin-1') as f:
f.write(latin1_data)
# Reading different encodings
try:
df_utf8 = pd.read_csv('utf8.csv', encoding='utf-8')
print(f"UTF-8 read: {df_utf8['name'].tolist()}")
except UnicodeDecodeError as e:
print(f"UTF-8 error: {e}")
try:
df_latin1 = pd.read_csv('latin1.csv', encoding='latin-1')
print(f"Latin-1 read: {df_latin1['name'].tolist()}")
except UnicodeDecodeError as e:
print(f"Latin-1 error: {e}")
# Issue 2: Large files - chunked reading
print("\nChunked reading for large files:")
chunk_size = 2
chunks = []
for chunk in pd.read_csv('utf8.csv', chunksize=chunk_size):
chunks.append(chunk)
print(f" Chunk shape: {chunk.shape}")
df_combined = pd.concat(chunks, ignore_index=True)
print(f"Combined shape: {df_combined.shape}")
# Clean up
os.remove('utf8.csv')
os.remove('latin1.csv')
Chunked reading loads only chunk_size rows into memory at a time, making it possible to process files larger than available RAM.
3. Excel Files
3.1 Reading Excel
import pandas as pd
import os
# Create Excel file with multiple sheets
excel_file = 'data_report.xlsx'
# Create sample DataFrames
df_sales = pd.DataFrame({
'month': ['Jan', 'Feb', 'Mar', 'Apr'],
'revenue': [10000, 12000, 11000, 13000],
'cost': [7000, 8000, 7500, 8500]
})
df_inventory = pd.DataFrame({
'product': ['A', 'B', 'C'],
'quantity': [100, 200, 150],
'warehouse': ['NYC', 'LA', 'Chicago']
})
# Write to Excel with multiple sheets
with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
df_sales.to_excel(writer, sheet_name='Sales', index=False)
df_inventory.to_excel(writer, sheet_name='Inventory', index=False)
# Add summary sheet
df_summary = pd.DataFrame({
'Metric': ['Total Revenue', 'Total Cost', 'Profit'],
'Value': [df_sales['revenue'].sum(),
df_sales['cost'].sum(),
df_sales['revenue'].sum() - df_sales['cost'].sum()]
})
df_summary.to_excel(writer, sheet_name='Summary', index=False)
print("Excel file created with 3 sheets")
# Read specific sheets
print("\nReading Excel:")
df_sales_read = pd.read_excel(excel_file, sheet_name='Sales')
print(f"Sales sheet: {df_sales_read.shape}")
print(df_sales_read)
df_inventory_read = pd.read_excel(excel_file, sheet_name='Inventory')
print(f"\nInventory sheet: {df_inventory_read.shape}")
print(df_inventory_read)
# Read all sheets
all_sheets = pd.read_excel(excel_file, sheet_name=None)
print(f"\nAll sheets: {list(all_sheets.keys())}")
# Clean up
os.remove(excel_file)
3.2 Excel-Specific Features
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils.dataframe import dataframe_to_rows
import os
# Advanced Excel writing with formatting
def create_styled_excel(filename: str):
"""Create Excel file with professional formatting."""
# Create workbook
wb = Workbook()
# Sheet 1: Data with formatting
ws = wb.active
ws.title = "Sales Data"
# Headers
headers = ['Product', 'Q1', 'Q2', 'Q3', 'Q4', 'Total']
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal='center')
# Data
data = [
['Widget A', 100, 150, 200, 180],
['Widget B', 200, 180, 220, 250],
['Widget C', 150, 160, 170, 190],
]
for row_idx, row_data in enumerate(data, 2):
for col_idx, value in enumerate(row_data, 1):
ws.cell(row=row_idx, column=col_idx, value=value)
# Formula for total
ws.cell(row=row_idx, column=6, value=f"=SUM(B{row_idx}:E{row_idx})")
# Adjust column widths
for col in ws.columns:
max_length = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = max_length + 2
# Sheet 2: Pivot-style summary
ws2 = wb.create_sheet("Summary")
ws2['A1'] = "Quarter"
ws2['B1'] = "Total Sales"
ws2['A1'].font = Font(bold=True)
ws2['B1'].font = Font(bold=True)
quarters = ['Q1', 'Q2', 'Q3', 'Q4']
totals = [sum(row[1] for row in data),
sum(row[2] for row in data),
sum(row[3] for row in data),
sum(row[4] for row in data)]
for i, (q, t) in enumerate(zip(quarters, totals), 2):
ws2.cell(row=i, column=1, value=q)
ws2.cell(row=i, column=2, value=t)
# Save
wb.save(filename)
print(f"Styled Excel created: {filename}")
# Create the file
create_styled_excel('styled_report.xlsx')
# Read with pandas
df = pd.read_excel('styled_report.xlsx', sheet_name='Sales Data')
print("\nRead styled Excel:")
print(df)
# Clean up
os.remove('styled_report.xlsx')
Excel files (.xlsx) are ZIP archives containing XML files. They support formulas, formatting, charts, and multiple sheets. For large datasets (100K+ rows), consider Parquet instead—it's faster, smaller, and avoids Excel's row limits.
4. JSON Data
4.1 JSON Structures
DfJavaScript Object Notation
A lightweight, text-based data interchange format derived from JavaScript object syntax. JSON represents data as key-value pairs (objects) and ordered lists (arrays). It is language-independent but uses conventions familiar to C-family programmers.
import json
import pandas as pd
from typing import Dict, List, Any
# Sample nested JSON
nested_data = {
"company": "TechCorp",
"employees": [
{
"id": 1,
"name": "Alice",
"department": "Engineering",
"skills": ["Python", "ML", "SQL"],
"contact": {
"email": "alice@techcorp.com",
"phone": "555-0101"
}
},
{
"id": 2,
"name": "Bob",
"department": "Marketing",
"skills": ["SEO", "Analytics"],
"contact": {
"email": "bob@techcorp.com",
"phone": "555-0102"
}
}
],
"metadata": {
"version": "1.0",
"last_updated": "2024-01-15"
}
}
# Write JSON
json_file = 'company_data.json'
with open(json_file, 'w') as f:
json.dump(nested_data, f, indent=2)
print("JSON written")
# Read JSON
with open(json_file, 'r') as f:
loaded_data = json.load(f)
print(f"\nLoaded JSON:")
print(f" Company: {loaded_data['company']}")
print(f" Employees: {len(loaded_data['employees'])}")
# Flatten nested JSON for pandas
print("\nFlattening nested JSON:")
employees = loaded_data['employees']
df_flat = pd.json_normalize(employees, sep='_')
print(df_flat)
4.2 JSON Processing Patterns
import json
import pandas as pd
from typing import List, Dict, Any
# Pattern 1: Extract specific fields
def extract_json_fields(data: Dict, fields: List[str]) -> Dict:
"""Extract specific fields from nested JSON."""
result = {}
for field in fields:
keys = field.split('.')
value = data
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
value = None
break
result[field] = value
return result
# Example usage
data = {
"user": {
"profile": {
"name": "Alice",
"age": 30
},
"settings": {
"theme": "dark"
}
}
}
fields = ['user.profile.name', 'user.profile.age', 'user.settings.theme']
extracted = extract_json_fields(data, fields)
print("Extracted fields:", extracted)
# Pattern 2: Transform list of JSON objects
def transform_json_list(json_list: List[Dict],
key_mapping: Dict[str, str]) -> List[Dict]:
"""Transform JSON objects with key mapping."""
transformed = []
for item in json_list:
new_item = {}
for old_key, new_key in key_mapping.items():
if old_key in item:
new_item[new_key] = item[old_key]
transformed.append(new_item)
return transformed
# Example
json_list = [
{"first_name": "Alice", "last_name": "Smith", "emp_id": 1},
{"first_name": "Bob", "last_name": "Jones", "emp_id": 2}
]
mapping = {"first_name": "given_name", "last_name": "family_name", "emp_id": "id"}
result = transform_json_list(json_list, mapping)
print("\nTransformed JSON:")
for item in result:
print(f" {item}")
# Pattern 3: Merge multiple JSON files
def merge_json_files(file_paths: List[str]) -> List[Dict]:
"""Merge multiple JSON files containing lists."""
merged = []
for path in file_paths:
with open(path, 'r') as f:
data = json.load(f)
if isinstance(data, list):
merged.extend(data)
else:
merged.append(data)
return merged
4.3 JSON Lines (NDJSON)
import json
import pandas as pd
# JSON Lines format (one JSON object per line)
ndjson_data = """{"name": "Alice", "age": 30, "city": "NYC"}
{"name": "Bob", "age": 25, "city": "LA"}
{"name": "Charlie", "age": 35, "city": "Chicago"}
{"name": "Diana", "age": 28, "city": "Boston"}
"""
# Write NDJSON
ndjson_file = 'people.ndjson'
with open(ndjson_file, 'w') as f:
f.write(ndjson_data)
# Read NDJSON with pandas
df = pd.read_json(ndjson_file, lines=True)
print("NDJSON loaded:")
print(df)
print(f"\nShape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
print(f"Memory: {df.memory_usage(deep=True).sum()} bytes")
# Process NDJSON line by line (memory efficient for large files)
print("\nStreaming NDJSON:")
with open(ndjson_file, 'r') as f:
for i, line in enumerate(f):
record = json.loads(line)
print(f" Line {i}: {record}")
# Clean up
os.remove(ndjson_file)
JSON vs NDJSON
Here,
- =Must parse entire document to access any record
- =Can process one record at a time (streaming)
5. SQL Databases
5.1 SQLAlchemy Connection
import pandas as pd
from sqlalchemy import create_engine, text
import os
# Create in-memory SQLite database
engine = create_engine('sqlite:///sample.db')
# Create sample tables
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
department TEXT,
salary REAL,
hire_date DATE
)
"""))
conn.execute(text("""
CREATE TABLE departments (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
budget REAL,
manager TEXT
)
"""))
# Insert sample data
conn.execute(text("""
INSERT INTO employees (id, name, department, salary, hire_date) VALUES
(1, 'Alice', 'Engineering', 85000, '2020-01-15'),
(2, 'Bob', 'Marketing', 72000, '2019-03-22'),
(3, 'Charlie', 'Engineering', 92000, '2018-07-01'),
(4, 'Diana', 'HR', 68000, '2021-02-10'),
(5, 'Eve', 'Marketing', 78000, '2020-09-05')
"""))
conn.execute(text("""
INSERT INTO departments (id, name, budget, manager) VALUES
(1, 'Engineering', 500000, 'Alice'),
(2, 'Marketing', 300000, 'Bob'),
(3, 'HR', 200000, 'Diana')
"""))
conn.commit()
print("Database created with sample data")
5.2 Querying with Pandas
DfParameterized Query
A SQL query where user input is bound as parameters rather than string-concatenated into the query text. This prevents SQL injection attacks by ensuring user input is treated as data, not executable code.
# Read SQL queries into DataFrames
query = "SELECT * FROM employees WHERE salary > 70000"
df = pd.read_sql(query, engine)
print("Query result:")
print(df)
# Parameterized queries (prevent SQL injection)
param_query = "SELECT * FROM employees WHERE department = :dept"
df_eng = pd.read_sql(param_query, engine, params={"dept": "Engineering"})
print(f"\nEngineering employees:")
print(df_eng)
# Aggregation queries
agg_query = """
SELECT
department,
COUNT(*) as count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM employees
GROUP BY department
"""
df_agg = pd.read_sql(agg_query, engine)
print(f"\nDepartment statistics:")
print(df_agg)
# Join queries
join_query = """
SELECT
e.name,
e.salary,
d.name as department_name,
d.budget
FROM employees e
JOIN departments d ON e.department = d.name
"""
df_joined = pd.read_sql(join_query, engine)
print(f"\nJoined data:")
print(df_joined)
# Write DataFrame to SQL
new_data = pd.DataFrame({
'id': [6, 7],
'name': ['Frank', 'Grace'],
'department': ['Engineering', 'Marketing'],
'salary': [95000, 82000],
'hire_date': ['2022-01-10', '2022-03-15']
})
new_data.to_sql('new_employees', engine, if_exists='replace', index=False)
print(f"\nNew employees written to database")
# Verify
df_verify = pd.read_sql("SELECT * FROM new_employees", engine)
print(df_verify)
# Clean up
os.remove('sample.db')
5.3 Chunked Reading for Large Databases
# Demonstrate chunked reading
chunk_size = 2
chunks = []
for chunk in pd.read_sql("SELECT * FROM employees", engine, chunksize=chunk_size):
chunks.append(chunk)
print(f" Chunk: {chunk.shape[0]} rows")
df_combined = pd.concat(chunks, ignore_index=True)
print(f"Combined: {df_combined.shape[0]} rows")
6. Parquet: Columnar Storage
6.1 Why Parquet?
Parquet is optimized for analytics:
- Columnar storage: Only reads needed columns
- Compression: Much smaller files
- Predicate pushdown: Filters applied at read time
- Schema evolution: Add columns without rewriting
DfColumnar Storage
A storage format where data is organized by columns rather than rows. This enables: (1) reading only required columns (I/O reduction), (2) better compression (similar values grouped together), and (3) vectorized processing on contiguous column data.
6.2 Parquet Operations
import pandas as pd
import numpy as np
import time
import os
# Create sample dataset
np.random.seed(42)
n_rows = 100000
df = pd.DataFrame({
'id': range(n_rows),
'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
'value1': np.random.normal(0, 1, n_rows),
'value2': np.random.uniform(0, 100, n_rows),
'text': ['Sample text ' + str(i) for i in range(n_rows)]
})
print(f"Original DataFrame:")
print(f" Rows: {len(df):,}")
print(f" Memory: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# Write to different formats for comparison
csv_file = 'data.csv'
parquet_file = 'data.parquet'
# CSV
start = time.perf_counter()
df.to_csv(csv_file, index=False)
csv_time = time.perf_counter() - start
csv_size = os.path.getsize(csv_file) / 1024**2
# Parquet
start = time.perf_counter()
df.to_parquet(parquet_file, index=False)
parquet_time = time.perf_counter() - start
parquet_size = os.path.getsize(parquet_file) / 1024**2
print(f"\nFile comparison:")
print(f"{'Format':<10} {'Size (MB)':<12} {'Write Time':<12} {'Compression':<12}")
print("-" * 46)
print(f"{'CSV':<10} {csv_size:<12.2f} {csv_time:<12.4f} {csv_size/parquet_size:.1f}x")
print(f"{'Parquet':<10} {parquet_size:<12.2f} {parquet_time:<12.4f} 1x")
# Read performance
print(f"\nRead performance:")
start = time.perf_counter()
df_csv = pd.read_csv(csv_file)
csv_read_time = time.perf_counter() - start
start = time.perf_counter()
df_parquet = pd.read_parquet(parquet_file)
parquet_read_time = time.perf_counter() - start
print(f" CSV read: {csv_read_time:.4f}s")
print(f" Parquet read: {parquet_read_time:.4f}s")
print(f" Speedup: {csv_read_time/parquet_read_time:.1f}x faster with Parquet")
# Column selection (Parquet advantage)
print(f"\nColumn selection (reading 2 of 5 columns):")
start = time.perf_counter()
df_csv_cols = pd.read_csv(csv_file, usecols=['id', 'value1'])
csv_cols_time = time.perf_counter() - start
start = time.perf_counter()
df_parquet_cols = pd.read_parquet(parquet_file, columns=['id', 'value1'])
parquet_cols_time = time.perf_counter() - start
print(f" CSV: {csv_cols_time:.4f}s (reads all columns)")
print(f" Parquet: {parquet_cols_time:.4f}s (reads only selected)")
print(f" Speedup: {csv_cols_time/parquet_cols_time:.1f}x")
# Clean up
os.remove(csv_file)
os.remove(parquet_file)
Parquet files are typically 3-10x smaller than equivalent CSV files due to columnar encoding and compression algorithms like Snappy, Gzip, or Zstandard.
6.3 Parquet Compression Options
import pandas as pd
import os
# Test different compression algorithms
compressions = {
'snappy': None, # Default, fast
'gzip': '.gz', # Good compression
'brotli': '.br', # Best compression
'lz4': None, # Fast decompression
'zstd': None # Good balance
}
df = pd.DataFrame({
'id': range(10000),
'category': ['A', 'B', 'C', 'D'] * 2500,
'value': range(10000)
})
print("Compression comparison:")
print(f"{'Algorithm':<12} {'Size (bytes)':<15} {'Ratio':<10}")
print("-" * 37)
base_size = None
for comp, ext in compressions.items():
filename = f'test_{comp}.parquet'
try:
df.to_parquet(filename, compression=comp, index=False)
size = os.path.getsize(filename)
if base_size is None:
base_size = size
ratio = base_size / size if size > 0 else 0
print(f"{comp:<12} {size:<15,} {ratio:.2f}x")
os.remove(filename)
except Exception as e:
print(f"{comp:<12} Error: {e}")
# Reading compressed Parquet
df.to_parquet('test_zstd.parquet', compression='zstd', index=False)
df_read = pd.read_parquet('test_zstd.parquet')
print(f"\nRead compressed Parquet: {df_read.shape}")
os.remove('test_zstd.parquet')
7. HDF5: Hierarchical Data
7.1 When to Use HDF5
HDF5 is ideal for:
- Large numerical datasets
- Hierarchical data structures
- Multiple arrays in one file
- Partial reads/writes
DfHierarchical Data Format
A file format designed for storing and organizing large amounts of data. HDF5 files contain two types of objects: groups (directories) and datasets (files). Data is stored in a hierarchical structure with metadata (attributes) attached to any object.
7.2 HDF5 Operations
import pandas as pd
import numpy as np
import os
# Create sample data
np.random.seed(42)
n = 10000
df1 = pd.DataFrame({
'x': np.random.normal(0, 1, n),
'y': np.random.normal(0, 1, n)
})
df2 = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=n, freq='T'),
'value': np.random.uniform(0, 100, n)
})
# Write to HDF5
hdf_file = 'data.h5'
# Store multiple datasets
df1.to_hdf(hdf_file, key='coordinates', mode='w')
df2.to_hdf(hdf_file, key='timeseries', mode='a')
print("HDF5 file created with 2 datasets")
# Read specific dataset
df_coords = pd.read_hdf(hdf_file, key='coordinates')
df_ts = pd.read_hdf(hdf_file, key='timeseries')
print(f"\nCoordinates: {df_coords.shape}")
print(f"Timeseries: {df_ts.shape}")
# List all keys
import h5py
with h5py.File(hdf_file, 'r') as f:
print(f"\nHDF5 structure:")
def print_structure(name, obj):
print(f" {name}: {type(obj).__name__}")
f.visititems(print_structure)
# Store with data columns (for efficient queries)
store = pd.HDFStore(hdf_file)
store.append('sensor_data',
pd.DataFrame({
'sensor_id': np.random.choice(['A', 'B', 'C'], 1000),
'reading': np.random.normal(0, 1, 1000),
'timestamp': pd.date_range('2024-01-01', periods=1000, freq='S')
}),
data_columns=['sensor_id'])
# Query using data columns
result = store.select('sensor_data', where="sensor_id='A'")
print(f"\nQuery sensor_id='A': {result.shape}")
store.close()
# Clean up
os.remove(hdf_file)
8. API Data Acquisition
8.1 The requests Library
import requests
import json
import time
from typing import Dict, Any, Optional
class APIClient:
"""Base API client with rate limiting and error handling."""
def __init__(self, base_url: str, rate_limit: float = 1.0):
self.base_url = base_url
self.session = requests.Session()
self.last_request_time = 0
self.rate_limit = rate_limit # Seconds between requests
def _rate_limit_wait(self):
"""Wait if necessary to respect rate limits."""
elapsed = time.time() - self.last_request_time
if elapsed < self.rate_limit:
time.sleep(self.rate_limit - elapsed)
self.last_request_time = time.time()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Make GET request with error handling."""
self._rate_limit_wait()
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def post(self, endpoint: str, data: Dict) -> Dict[str, Any]:
"""Make POST request."""
self._rate_limit_wait()
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.post(url, json=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# Example: JSONPlaceholder API (fake REST API for testing)
client = APIClient("https://jsonplaceholder.typicode.com", rate_limit=0.5)
# GET request
print("GET request:")
posts = client.get("posts", params={"_limit": 3})
if "error" not in posts:
for post in posts:
print(f" Post {post['id']}: {post['title'][:50]}...")
else:
print(f" Error: {posts['error']}")
# GET single resource
print("\nGET single resource:")
post = client.get("posts/1")
if "error" not in post:
print(f" Title: {post['title']}")
print(f" Body: {post['body'][:100]}...")
8.2 Pagination Patterns
import requests
import pandas as pd
from typing import List, Dict
class PaginatedAPIClient:
"""Handle paginated API responses."""
def __init__(self, base_url: str):
self.base_url = base_url
self.session = requests.Session()
def get_all_pages(self, endpoint: str,
page_param: str = 'page',
per_page: int = 100,
max_pages: int = 10) -> List[Dict]:
"""Fetch all pages of results."""
all_results = []
page = 1
while page <= max_pages:
params = {page_param: page, '_limit': per_page}
response = self.session.get(
f"{self.base_url}/{endpoint}",
params=params
)
if response.status_code != 200:
break
data = response.json()
if not data:
break
all_results.extend(data)
page += 1
# Check if we've reached the end
if len(data) < per_page:
break
return all_results
def get_paginated_generator(self, endpoint: str,
per_page: int = 100):
"""Generator for memory-efficient pagination."""
page = 1
while True:
response = self.session.get(
f"{self.base_url}/{endpoint}",
params={'_limit': per_page, 'page': page}
)
if response.status_code != 200:
break
data = response.json()
if not data:
break
yield data
if len(data) < per_page:
break
page += 1
# Example usage
client = PaginatedAPIClient("https://jsonplaceholder.typicode.com")
# Fetch all posts (limited for demo)
print("Fetching posts with pagination:")
all_posts = client.get_all_pages("posts", per_page=10, max_pages=3)
print(f" Total posts fetched: {len(all_posts)}")
# Convert to DataFrame
df = pd.DataFrame(all_posts)
print(f" DataFrame shape: {df.shape}")
print(f" Columns: {df.columns.tolist()}")
8.3 Error Handling and Retry Logic
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
from typing import Optional
def create_session_with_retry(
max_retries: int = 3,
backoff_factor: float = 0.5,
status_forcelist: tuple = (500, 502, 503, 504)
) -> requests.Session:
"""Create requests session with automatic retries."""
session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Example with error handling
def safe_api_call(
url: str,
params: Optional[dict] = None,
max_retries: int = 3
) -> dict:
"""Make API call with comprehensive error handling."""
session = create_session_with_retry(max_retries)
try:
response = session.get(url, params=params, timeout=10)
response.raise_for_status()
return {"success": True, "data": response.json()}
except requests.exceptions.Timeout:
return {"success": False, "error": "Request timed out"}
except requests.exceptions.ConnectionError:
return {"success": False, "error": "Connection failed"}
except requests.exceptions.HTTPError as e:
return {"success": False, "error": f"HTTP {e.response.status_code}"}
except ValueError:
return {"success": False, "error": "Invalid JSON response"}
finally:
session.close()
# Test
result = safe_api_call("https://jsonplaceholder.typicode.com/posts/1")
if result["success"]:
print(f"API call successful: {result['data']['title'][:50]}...")
else:
print(f"API call failed: {result['error']}")
Exponential backoff (delay = base × 2^attempt) is the standard retry strategy for transient API failures. It prevents thundering herd problems by spacing out retry attempts across multiple clients.
9. Format Comparison Table
| Format | Speed | Compression | Type Safety | Use Case | Pandas Support |
|---|---|---|---|---|---|
| CSV | Slow | None | No | Simple data, interoperability | read_csv() |
| Excel | Slow | Low | Partial | Business reports, formulas | read_excel() |
| JSON | Medium | Medium | Yes | APIs, nested data | read_json() |
| Parquet | Fast | High | Yes | Analytics, big data | read_parquet() |
| HDF5 | Fast | High | Yes | Scientific data, arrays | read_hdf() |
| Feather | Very Fast | Medium | Yes | DataFrame serialization | read_feather() |
| Pickle | Fast | Medium | Yes | Python objects | read_pickle() |
| SQL | Medium | N/A | Yes | Structured data, queries | read_sql() |
Performance Benchmark
import pandas as pd
import numpy as np
import time
import os
# Create benchmark dataset
np.random.seed(42)
n_rows = 100000
n_cols = 10
data = {f'col_{i}': np.random.randn(n_rows) for i in range(n_cols)}
df = pd.DataFrame(data)
df['category'] = np.random.choice(['A', 'B', 'C', 'D'], n_rows)
df['text'] = ['text_' + str(i) for i in range(n_rows)]
print(f"Benchmark dataset: {n_rows:,} rows × {n_cols + 2} columns")
print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB\n")
# Benchmark different formats
formats = {
'CSV': lambda: df.to_csv('bench.csv', index=False),
'JSON': lambda: df.to_json('bench.json', orient='records'),
'Parquet': lambda: df.to_parquet('bench.parquet', index=False),
'HDF5': lambda: df.to_hdf('bench.h5', key='data', mode='w'),
'Pickle': lambda: df.to_pickle('bench.pkl'),
'Feather': lambda: df.to_feather('bench.feather'),
}
results = []
for name, write_func in formats.items():
# Write
start = time.perf_counter()
write_func()
write_time = time.perf_counter() - start
# Get file size
ext = {'CSV': '.csv', 'JSON': '.json', 'Parquet': '.parquet',
'HDF5': '.h5', 'Pickle': '.pkl', 'Feather': '.feather'}
size = os.path.getsize(f'bench{ext[name]}') / 1024**2
# Read
read_funcs = {
'CSV': lambda: pd.read_csv('bench.csv'),
'JSON': lambda: pd.read_json('bench.json'),
'Parquet': lambda: pd.read_parquet('bench.parquet'),
'HDF5': lambda: pd.read_hdf('bench.h5'),
'Pickle': lambda: pd.read_pickle('bench.pkl'),
'Feather': lambda: pd.read_feather('bench.feather'),
}
start = time.perf_counter()
_ = read_funcs[name]()
read_time = time.perf_counter() - start
results.append({
'Format': name,
'Size (MB)': size,
'Write (s)': write_time,
'Read (s)': read_time,
'Total (s)': write_time + read_time
})
# Display results
results_df = pd.DataFrame(results)
print("Benchmark Results:")
print(results_df.to_string(index=False))
# Clean up
for ext in ['.csv', '.json', '.parquet', '.h5', '.pkl', '.feather']:
os.remove(f'bench{ext}')
10. Complete Data Pipeline
10.1 Multi-Source Data Integration
import pandas as pd
import numpy as np
import json
import os
from typing import Dict, List, Optional
from datetime import datetime
class DataPipeline:
"""Complete data pipeline for data science projects."""
def __init__(self, data_dir: str = './data'):
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
def load_csv(self, filename: str, **kwargs) -> pd.DataFrame:
"""Load CSV with common preprocessing."""
filepath = os.path.join(self.data_dir, filename)
df = pd.read_csv(filepath, **kwargs)
print(f"Loaded {filename}: {df.shape}")
return df
def load_excel(self, filename: str, sheet: str = 0, **kwargs) -> pd.DataFrame:
"""Load Excel with specific sheet."""
filepath = os.path.join(self.data_dir, filename)
df = pd.read_excel(filepath, sheet_name=sheet, **kwargs)
print(f"Loaded {filename} (sheet {sheet}): {df.shape}")
return df
def load_json(self, filename: str, **kwargs) -> pd.DataFrame:
"""Load JSON (handles both regular and NDJSON)."""
filepath = os.path.join(self.data_dir, filename)
# Try regular JSON first
try:
df = pd.read_json(filepath, **kwargs)
print(f"Loaded {filename}: {df.shape}")
return df
except ValueError:
# Try NDJSON
df = pd.read_json(filepath, lines=True, **kwargs)
print(f"Loaded {filename} (NDJSON): {df.shape}")
return df
def load_parquet(self, filename: str, columns: Optional[List[str]] = None) -> pd.DataFrame:
"""Load Parquet with optional column selection."""
filepath = os.path.join(self.data_dir, filename)
df = pd.read_parquet(filepath, columns=columns)
print(f"Loaded {filename}: {df.shape}")
return df
def merge_dataframes(
self,
dfs: List[pd.DataFrame],
on: Optional[str] = None,
how: str = 'inner'
) -> pd.DataFrame:
"""Merge multiple DataFrames."""
if len(dfs) == 0:
return pd.DataFrame()
result = dfs[0]
for df in dfs[1:]:
if on:
result = pd.merge(result, df, on=on, how=how)
else:
result = pd.merge(result, df, left_index=True, right_index=True, how=how)
print(f"Merged result: {result.shape}")
return result
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply common data cleaning operations."""
initial_shape = df.shape
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object']).columns
# Fill numeric with median
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
# Fill categorical with mode
for col in categorical_cols:
if df[col].isna().any():
df[col] = df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'Unknown')
print(f"Cleaned data: {initial_shape} → {df.shape}")
return df
def save_data(
self,
df: pd.DataFrame,
filename: str,
format: str = 'parquet'
) -> str:
"""Save DataFrame in specified format."""
filepath = os.path.join(self.data_dir, filename)
if format == 'parquet':
df.to_parquet(filepath, index=False)
elif format == 'csv':
df.to_csv(filepath, index=False)
elif format == 'json':
df.to_json(filepath, orient='records', indent=2)
elif format == 'hdf5':
df.to_hdf(filepath, key='data', mode='w')
else:
raise ValueError(f"Unsupported format: {format}")
size = os.path.getsize(filepath) / 1024**2
print(f"Saved {filename} ({format}): {size:.2f} MB")
return filepath
# Example usage
pipeline = DataPipeline('./pipeline_data')
# Create sample data
np.random.seed(42)
# Source 1: Sales data
sales_data = pd.DataFrame({
'transaction_id': range(1000),
'product_id': np.random.randint(1, 50, 1000),
'quantity': np.random.randint(1, 10, 1000),
'price': np.random.uniform(10, 100, 1000),
'date': pd.date_range('2024-01-01', periods=1000, freq='H')
})
# Source 2: Product data
product_data = pd.DataFrame({
'product_id': range(1, 51),
'category': np.random.choice(['Electronics', 'Clothing', 'Food'], 50),
'brand': [f'Brand_{chr(65 + i % 10)}' for i in range(50)]
})
# Source 3: Customer data
customer_data = pd.DataFrame({
'customer_id': range(1, 101),
'name': [f'Customer_{i}' for i in range(1, 101)],
'segment': np.random.choice(['Premium', 'Regular', 'Basic'], 100)
})
# Save sample data
sales_data.to_parquet('./pipeline_data/sales.parquet', index=False)
product_data.to_csv('./pipeline_data/products.csv', index=False)
customer_data.to_json('./pipeline_data/customers.json', orient='records')
# Run pipeline
print("\n" + "="*60)
print("DATA PIPELINE EXECUTION")
print("="*60 + "\n")
# Load
sales = pipeline.load_parquet('sales.parquet')
products = pipeline.load_csv('products.csv')
customers = pipeline.load_json('customers.json')
# Transform
sales['total'] = sales['quantity'] * sales['price']
sales['customer_id'] = np.random.randint(1, 101, len(sales))
# Merge
merged = pipeline.merge_dataframes(
[sales, products, customers],
on=None
)
# Clean
cleaned = pipeline.clean_data(merged)
# Feature engineering
cleaned['day_of_week'] = cleaned['date'].dt.day_name()
cleaned['hour'] = cleaned['date'].dt.hour
# Save results
pipeline.save_data(cleaned, 'processed_data.parquet', format='parquet')
pipeline.save_data(cleaned, 'processed_data.csv', format='csv')
print("\nPipeline completed successfully!")
print(f"Final dataset: {cleaned.shape}")
print(f"Columns: {cleaned.columns.tolist()}")
# Clean up
import shutil
shutil.rmtree('./pipeline_data')
Key Takeaways
📋Summary: File I/O & Data Import
- Context Managers Always: Use
withstatements for file I/O. They guarantee proper resource cleanup even when exceptions occur. - Format Selection: CSV for small-medium data; Parquet for analytics and large datasets; JSON for APIs and nested data; HDF5 for scientific computing; Excel for business reports.
- Encoding Matters: Always specify
encoding='utf-8'for text files; handle encoding errors witherrors='replace'orerrors='ignore'. - Performance Optimization: Use chunked reading for large files; select only needed columns; Parquet reads are 10-100x faster than CSV for column selection.
- Data Quality: Validate data after loading (check shapes, dtypes, nulls); handle missing values explicitly; remove duplicates.
- API Best Practices: Implement rate limiting; use retry logic with exponential backoff; handle timeouts gracefully; cache responses when possible.
Practice Exercises
Exercise 1: Multi-Format Reader
Build a universal data reader that:
- Auto-detects file format from extension
- Handles CSV, Excel, JSON, Parquet
- Validates data after loading
- Returns standardized DataFrame
Exercise 2: Data Quality Pipeline
Create a pipeline that:
- Loads data from multiple sources
- Validates schema and data types
- Handles missing values with configurable strategies
- Logs all transformations
- Generates a data quality report
Exercise 3: Parquet Migration
Migrate a legacy CSV-based data warehouse to Parquet:
- Convert existing CSV files to Parquet
- Implement incremental updates
- Compare query performance before/after
- Document storage savings
Exercise 4: API Data Collection
Build an API client that:
- Handles pagination
- Implements rate limiting
- Caches responses locally
- Handles authentication
- Stores data in Parquet format
Exercise 5: Data Pipeline Orchestration
Create a complete data pipeline that:
- Extracts data from 3+ sources (CSV, API, database)
- Transforms and cleans data
- Loads to data warehouse
- Generates quality metrics
- Sends notifications on failure