Database Patterns: SQLAlchemy, Connection Pooling, Transactions
Database patterns and ORM optimization in Python
Interview Question
"Explain database patterns in Python. How does SQLAlchemy work? What is connection pooling and why is it important? How do you handle transactions properly?"
Difficulty: Hard | Frequently asked at Google, Meta, Amazon
Theoretical Foundation
Why Database Patterns Matter
# Without patterns:
# 1. Connection leaks
# 2. SQL injection
# 3. N+1 query problems
# 4. Transaction management issues
# 5. No connection reuse
# With patterns:
# 1. Connection pooling
# 2. ORM for safe queries
# 3. Eager/lazy loading
# 4. ACID transactions
# 5. Session management
ℹ️
Key Concept: Proper database patterns ensure security, performance, and data integrity.
SQLAlchemy Basics
Model Definition
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime
Base = declarative_base()
# Models
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.now)
# Relationships
posts = relationship("Post", back_populates="author", lazy="select")
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}')>"
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(String(1000))
author_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, default=datetime.now)
# Relationships
author = relationship("User", back_populates="posts")
tags = relationship("Tag", secondary="post_tags", back_populates="posts")
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
# Many-to-many relationship
posts = relationship("Post", secondary="post_tags", back_populates="tags")
# Association table
from sqlalchemy import Table
post_tags = Table(
'post_tags',
Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)
# Create database
engine = create_engine('sqlite:///app.db', echo=True)
Base.metadata.create_all(engine)
Basic CRUD Operations
from sqlalchemy.orm import Session
# Create session
SessionLocal = sessionmaker(bind=engine)
# CRUD Operations
def create_user(name: str, email: str) -> User:
"""Create a new user."""
with SessionLocal() as session:
user = User(name=name, email=email)
session.add(user)
session.commit()
session.refresh(user) # Refresh to get generated ID
return user
def get_user(user_id: int) -> User:
"""Get user by ID."""
with SessionLocal() as session:
user = session.query(User).filter(User.id == user_id).first()
return user
def update_user(user_id: int, **kwargs) -> User:
"""Update user."""
with SessionLocal() as session:
user = session.query(User).filter(User.id == user_id).first()
if user:
for key, value in kwargs.items():
setattr(user, key, value)
session.commit()
session.refresh(user)
return user
def delete_user(user_id: int) -> bool:
"""Delete user."""
with SessionLocal() as session:
user = session.query(User).filter(User.id == user_id).first()
if user:
session.delete(user)
session.commit()
return True
return False
# Usage
user = create_user("Alice", "alice@example.com")
print(f"Created: {user}")
user = get_user(user.id)
print(f"Got: {user}")
user = update_user(user.id, name="Alice Smith")
print(f"Updated: {user}")
deleted = delete_user(user.id)
print(f"Deleted: {deleted}")
💡
Interview Tip: Always use context managers (with statements) for database sessions to ensure proper cleanup.
Connection Pooling
Why Connection Pooling?
# Without connection pooling:
# 1. Create new connection for each request
# 2. Overhead of TCP handshake, authentication
# 3. Limited by database max connections
# 4. Connection leaks if not properly closed
# With connection pooling:
# 1. Reuse existing connections
# 2. Reduced overhead
# 3. Better resource utilization
# 4. Automatic connection management
SQLAlchemy Connection Pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Engine with connection pooling
engine = create_engine(
"postgresql://user:pass@localhost/dbname",
poolclass=QueuePool,
pool_size=20, # Number of connections to maintain
max_overflow=10, # Additional connections beyond pool_size
pool_timeout=30, # Seconds to wait for connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True, # Verify connections before use
echo=True # Log SQL queries
)
# Monitor pool
from sqlalchemy import event
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_rec, connection_proxy):
"""Called when connection is retrieved from pool."""
print(f"Connection checked out: {id(dbapi_conn)}")
@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_rec):
"""Called when connection is returned to pool."""
print(f"Connection returned: {id(dbapi_conn)}")
# Usage with pool
with engine.connect() as conn:
result = conn.execute("SELECT 1")
print(result.fetchone())
# Pool statistics
pool = engine.pool
print(f"Pool size: {pool.size()}")
print(f"Checked out: {pool.checkedout()}")
print(f"Overflow: {pool.overflow()}")
Custom Connection Pool
import queue
import threading
from contextlib import contextmanager
from typing import Any, Generator
import sqlite3
class SimpleConnectionPool:
"""Simple connection pool implementation."""
def __init__(self, db_path: str, max_size: int = 10):
self.db_path = db_path
self.max_size = max_size
self.pool = queue.Queue(maxsize=max_size)
self.lock = threading.Lock()
self._initialize_pool()
def _initialize_pool(self):
"""Initialize connection pool."""
for _ in range(self.max_size):
conn = sqlite3.connect(self.db_path)
self.pool.put(conn)
@contextmanager
def get_connection(self) -> Generator[sqlite3.Connection, None, None]:
"""Get connection from pool."""
conn = self.pool.get(timeout=5)
try:
yield conn
finally:
self.pool.put(conn)
def close_all(self):
"""Close all connections."""
while not self.pool.empty():
conn = self.pool.get()
conn.close()
# Usage
pool = SimpleConnectionPool("app.db", max_size=5)
def query_user(user_id: int):
with pool.get_connection() as conn:
cursor = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,))
return cursor.fetchone()
# Thread-safe usage
import concurrent.futures
def fetch_users(user_ids):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(query_user, uid) for uid in user_ids]
return [f.result() for f in futures]
⚠️
Important: Always configure connection pooling in production to prevent connection exhaustion.
Transaction Management
Basic Transactions
from sqlalchemy.orm import Session
from contextlib import contextmanager
@contextmanager
def get_session():
"""Get database session with transaction management."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def transfer_money(from_user_id: int, to_user_id: int, amount: float):
"""Transfer money between users (atomic operation)."""
with get_session() as session:
# Get both users
from_user = session.query(User).filter(User.id == from_user_id).first()
to_user = session.query(User).filter(User.id == to_user_id).first()
if not from_user or not to_user:
raise ValueError("User not found")
# Update balances
from_user.balance -= amount
to_user.balance += amount
# Both operations succeed or fail together
# Commit happens automatically when exiting context manager
# Usage
try:
transfer_money(1, 2, 100.0)
print("Transfer successful")
except Exception as e:
print(f"Transfer failed: {e}")
Nested Transactions
from sqlalchemy.orm import Session
from contextlib import contextmanager
class TransactionManager:
"""Manage nested transactions."""
def __init__(self, session: Session):
self.session = session
self.savepoints = []
def begin(self):
"""Begin transaction."""
self.session.begin()
def savepoint(self):
"""Create savepoint."""
savepoint = self.session.begin_nested()
self.savepoints.append(savepoint)
return savepoint
def commit(self):
"""Commit transaction."""
self.session.commit()
def rollback(self):
"""Rollback to last savepoint or abort."""
if self.savepoints:
savepoint = self.savepoints.pop()
savepoint.rollback()
else:
self.session.rollback()
# Usage
def complex_operation():
with SessionLocal() as session:
tx = TransactionManager(session)
tx.begin()
try:
# First operation
user = User(name="Alice", email="alice@example.com")
session.add(user)
# Create savepoint
tx.savepoint()
# Second operation (might fail)
post = Post(title="Hello", author=user)
session.add(post)
# If post creation fails, rollback to savepoint
# but keep user
tx.commit()
except Exception as e:
tx.rollback()
raise
Optimistic Locking
from sqlalchemy import Column, Integer, DateTime, func
from sqlalchemy.orm import Session
from datetime import datetime
class OptimisticLockMixin:
"""Mixin for optimistic locking."""
version = Column(Integer, default=1, nullable=False)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class User(OptimisticLockMixin, Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
def update_with_lock(self, session: Session, **kwargs):
"""Update with optimistic lock check."""
# Get current version
current_version = self.version
# Update with version check
for key, value in kwargs.items():
setattr(self, key, value)
self.version = current_version + 1
# Check if row was modified
rows_updated = session.query(User).filter(
User.id == self.id,
User.version == current_version
).update({
**kwargs,
'version': current_version + 1
})
if rows_updated == 0:
raise ValueError("Record was modified by another user")
session.commit()
# Usage
def update_user_concurrently(user_id: int):
with SessionLocal() as session:
user = session.query(User).filter(User.id == user_id).first()
user.update_with_lock(session, name="Updated Name")
💡
Interview Tip: Optimistic locking is better for read-heavy workloads, pessimistic locking for write-heavy.
Advanced Patterns
Query Optimization
from sqlalchemy.orm import Session, joinedload, selectinload
from sqlalchemy import select
# N+1 Problem Example
def get_users_with_posts_bad():
"""Bad: N+1 queries."""
with SessionLocal() as session:
users = session.query(User).all() # 1 query
for user in users:
# N additional queries!
posts = session.query(Post).filter(Post.author_id == user.id).all()
print(f"{user.name}: {len(posts)} posts")
# Eager Loading Solutions
def get_users_with_posts_good():
"""Good: Eager loading."""
with SessionLocal() as session:
# Option 1: joinedload (single query with JOIN)
users = session.query(User).options(joinedload(User.posts)).all()
# Option 2: selectinload (separate queries, IN clause)
users = session.query(User).options(selectinload(User.posts)).all()
for user in users:
print(f"{user.name}: {len(user.posts)} posts")
# Query Optimization Examples
def optimized_queries():
with SessionLocal() as session:
# 1. Select only needed columns
users = session.query(User.id, User.name).all()
# 2. Filter early
users = session.query(User).filter(User.name.like('A%')).all()
# 3. Use indexes (ensure proper indexing in DB)
users = session.query(User).filter(User.email == 'alice@example.com').first()
# 4. Pagination
page = session.query(User).offset(0).limit(10).all()
# 5. Use exists instead of count for checks
from sqlalchemy import exists
has_users = session.query(exists().select_from(User)).scalar()
# 6. Bulk operations
session.bulk_update_mappings(User, [
{'id': 1, 'name': 'New Name'},
{'id': 2, 'name': 'Another Name'}
])
session.commit()
Repository Pattern
from abc import ABC, abstractmethod
from typing import List, Optional
from sqlalchemy.orm import Session
# Abstract Repository
class UserRepository(ABC):
@abstractmethod
def get_by_id(self, user_id: int) -> Optional[User]:
pass
@abstractmethod
def get_all(self) -> List[User]:
pass
@abstractmethod
def create(self, user: User) -> User:
pass
@abstractmethod
def update(self, user: User) -> User:
pass
@abstractmethod
def delete(self, user_id: int) -> bool:
pass
# SQLAlchemy Implementation
class SQLAlchemyUserRepository(UserRepository):
def __init__(self, session: Session):
self.session = session
def get_by_id(self, user_id: int) -> Optional[User]:
return self.session.query(User).filter(User.id == user_id).first()
def get_all(self) -> List[User]:
return self.session.query(User).all()
def create(self, user: User) -> User:
self.session.add(user)
self.session.commit()
self.session.refresh(user)
return user
def update(self, user: User) -> User:
self.session.merge(user)
self.session.commit()
return user
def delete(self, user_id: int) -> bool:
user = self.get_by_id(user_id)
if user:
self.session.delete(user)
self.session.commit()
return True
return False
# Usage
with SessionLocal() as session:
repo = SQLAlchemyUserRepository(session)
user = repo.get_by_id(1)
users = repo.get_all()
Unit of Work Pattern
from typing import List
from sqlalchemy.orm import Session
class UnitOfWork:
"""Unit of Work pattern for transaction management."""
def __init__(self, session: Session):
self.session = session
self._new_objects: List = []
self._dirty_objects: List = []
self._deleted_objects: List = []
def register_new(self, obj):
self._new_objects.append(obj)
def register_dirty(self, obj):
if obj not in self._dirty_objects:
self._dirty_objects.append(obj)
def register_deleted(self, obj):
self._deleted_objects.append(obj)
def commit(self):
try:
# Insert new objects
for obj in self._new_objects:
self.session.add(obj)
# Update dirty objects
for obj in self._dirty_objects:
self.session.merge(obj)
# Delete objects
for obj in self._deleted_objects:
self.session.delete(obj)
self.session.commit()
# Clear tracking
self._new_objects.clear()
self._dirty_objects.clear()
self._deleted_objects.clear()
except Exception as e:
self.session.rollback()
raise e
def rollback(self):
self.session.rollback()
self._new_objects.clear()
self._dirty_objects.clear()
self._deleted_objects.clear()
# Usage
def complex_business_operation():
with SessionLocal() as session:
uow = UnitOfWork(session)
# Track changes
new_user = User(name="Alice", email="alice@example.com")
uow.register_new(new_user)
existing_user = session.query(User).filter(User.id == 1).first()
existing_user.name = "Updated"
uow.register_dirty(existing_user)
# Commit all changes atomically
uow.commit()
ℹ️
Pattern: Repository and Unit of Work patterns separate business logic from data access.
Performance Optimization
Query Performance
import time
from sqlalchemy.orm import Session
def benchmark_queries():
"""Benchmark different query approaches."""
with SessionLocal() as session:
# Bad: Load all
start = time.time()
users = session.query(User).all()
print(f"Load all: {len(users)} users in {time.time()-start:.3f}s")
# Good: Filter early
start = time.time()
users = session.query(User).filter(User.name.like('A%')).all()
print(f"Filtered: {len(users)} users in {time.time()-start:.3f}s")
# Good: Select specific columns
start = time.time()
users = session.query(User.id, User.name).all()
print(f"Select columns: {len(users)} users in {time.time()-start:.3f}s")
# Good: Use exists
start = time.time()
from sqlalchemy import exists
has_users = session.query(exists().select_from(User)).scalar()
print(f"Exists check: {has_users} in {time.time()-start:.3f}s")
# Connection pooling performance
def connection_pool_performance():
"""Test connection pool performance."""
import time
start = time.time()
for i in range(100):
with SessionLocal() as session:
session.execute("SELECT 1")
print(f"100 queries: {time.time()-start:.3f}s")
Caching
from functools import lru_cache
from typing import Optional
class UserRepositoryWithCache:
"""User repository with caching."""
def __init__(self, session: Session):
self.session = session
self.cache = {}
def get_by_id(self, user_id: int) -> Optional[User]:
# Check cache first
if user_id in self.cache:
return self.cache[user_id]
# Query database
user = self.session.query(User).filter(User.id == user_id).first()
if user:
self.cache[user_id] = user
return user
def invalidate_cache(self, user_id: int):
"""Invalidate cache for user."""
self.cache.pop(user_id, None)
def clear_cache(self):
"""Clear all cache."""
self.cache.clear()
# Usage
repo = UserRepositoryWithCache(session)
user = repo.get_by_id(1) # Database query
user = repo.get_by_id(1) # Cache hit
Interview Tips
Common Follow-up Questions
-
"When would you use raw SQL vs ORM?"
- ORM: Most cases, type safety, maintainability
- Raw SQL: Complex queries, performance optimization
- Hybrid: ORM for CRUD, raw for complex queries
-
"How do you prevent N+1 queries?"
- Eager loading (joinedload, selectinload)
- Subquery loading
- Batch loading
-
"What's the difference between eager and lazy loading?"
- Eager: Load related objects immediately
- Lazy: Load on access (can cause N+1)
- Select: Load when explicitly requested
Code Review Tips
# BAD: N+1 queries
users = session.query(User).all()
for user in users:
posts = user.posts # Triggers query!
# GOOD: Eager loading
users = session.query(User).options(joinedload(User.posts)).all()
# BAD: No session management
user = session.query(User).first()
session.close() # May not execute if error occurs
# GOOD: Context manager
with SessionLocal() as session:
user = session.query(User).first()
# BAD: No transaction handling
session.add(user)
session.commit() # What if commit fails?
# GOOD: Transaction handling
try:
session.add(user)
session.commit()
except:
session.rollback()
raise
⚠️
Common Mistake: Not using eager loading causes N+1 query problems in production.
Summary
| Pattern | Purpose | Use Case |
|---|---|---|
| Repository | Data access abstraction | Clean architecture |
| Unit of Work | Transaction management | Complex operations |
| Connection Pool | Resource reuse | High concurrency |
| Eager Loading | Query optimization | Related data |
| Optimistic Locking | Concurrency control | Read-heavy apps |
Best Practices
- Use connection pooling in production
- Eager load related data
- Handle transactions properly
- Use context managers for sessions
- Monitor query performance
- Implement caching for read-heavy data
ℹ️
Key Takeaway: Proper database patterns ensure security, performance, and maintainability.
Practice Problems
- Repository Pattern: Implement a complete repository for a domain entity
- Connection Pool: Build a custom connection pool
- Transaction Manager: Create a transaction manager with savepoints
- Query Optimizer: Optimize N+1 queries in existing code
- Caching Layer: Implement a caching repository
Further Reading
- SQLAlchemy Docs: https://docs.sqlalchemy.org/
- Database Design: Normalization, indexing strategies
- Performance: Query optimization, EXPLAIN plans
- Patterns: Repository, Unit of Work, Data Mapper
Remember: Database patterns are essential for building scalable, maintainable applications.