On this page
Database Patterns
This document defines our database design standards and best practices for PostgreSQL.
When to Use PostgreSQL
Answer: 99% of the time.
Use PostgreSQL as your default choice unless you have a specific reason not to:
- ✅ User data, orders, products, transactions
- ✅ Application configuration
- ✅ Audit logs and events
- ✅ JSON documents (PostgreSQL has excellent JSON support)
When NOT to use PostgreSQL:
- ❌ Ephemeral data (use Redis)
- ❌ File storage (use S3)
- ❌ Full-text search at scale (use Elasticsearch, but start with PostgreSQL)
Database Design Rules
RULE 1: Always Use dbdiagram.io Before Coding
Mandatory: Design your database schema visually first.
- Go to dbdiagram.io
- Design your schema
- Review with team
- Export SQL or use as reference
- Create Alembic migration
Example dbdiagram.io syntax:
Table users {
id integer [primary key, increment]
email varchar(255) [unique, not null]
hashed_password varchar(255) [not null]
full_name varchar(100) [not null]
is_active boolean [default: true]
is_admin boolean [default: false]
created_at timestamp [default: `now()`]
updated_at timestamp
indexes {
email [unique]
created_at
}
}
Table orders {
id integer [primary key, increment]
user_id integer [not null, ref: > users.id]
status varchar(50) [not null, default: 'pending']
total_amount decimal(10,2) [not null, default: 0]
payment_id integer [ref: > payments.id]
created_at timestamp [default: `now()`]
updated_at timestamp
indexes {
user_id
status
(user_id, status) [name: 'idx_user_status']
created_at
}
}
Table order_items {
id integer [primary key, increment]
order_id integer [not null, ref: > orders.id]
product_id integer [not null, ref: > products.id]
quantity integer [not null]
unit_price decimal(10,2) [not null]
created_at timestamp [default: `now()`]
indexes {
order_id
product_id
}
}
Table products {
id integer [primary key, increment]
name varchar(255) [not null]
sku varchar(100) [unique, not null]
price decimal(10,2) [not null]
inventory_count integer [not null, default: 0]
is_active boolean [default: true]
created_at timestamp [default: `now()`]
updated_at timestamp
indexes {
sku [unique]
is_active
}
}
RULE 2: Index All Foreign Keys
Every foreign key MUST have an index.
# BAD - No index
class Order(Base):
__tablename__ = "orders"
user_id = Column(Integer, ForeignKey("users.id"))
# GOOD - Indexed foreign key
class Order(Base):
__tablename__ = "orders"
user_id = Column(Integer, ForeignKey("users.id"), index=True)
RULE 3: Index Fields in WHERE Clauses
If you filter by a field frequently, index it.
# If you query: SELECT * FROM orders WHERE status = 'pending'
# Then you need:
class Order(Base):
status = Column(String(50), index=True)
# If you query: SELECT * FROM products WHERE is_active = true
# Then you need:
class Product(Base):
is_active = Column(Boolean, default=True, index=True)
RULE 4: Index Fields in ORDER BY
If you sort by a field frequently, index it.
# If you query: SELECT * FROM orders ORDER BY created_at DESC
# Then you need:
class Order(Base):
created_at = Column(DateTime, default=datetime.utcnow, index=True)
RULE 5: Composite Indexes for Common Queries
Create composite indexes for frequently-used filter combinations.
from sqlalchemy import Index
class Order(Base):
__tablename__ = "orders"
user_id = Column(Integer, ForeignKey("users.id"))
status = Column(String(50))
# Composite index for common query: WHERE user_id = ? AND status = ?
__table_args__ = (
Index('idx_user_status', 'user_id', 'status'),
)
SQLAlchemy Model Pattern
Complete example with all best practices:
# app/models/order.py
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean, ForeignKey, Index
from sqlalchemy.orm import relationship
from app.db.base_class import Base
class Order(Base):
"""
Order model representing customer orders.
Relationships:
- user: Many-to-one with User
- items: One-to-many with OrderItem
- payment: One-to-one with Payment
"""
__tablename__ = "orders"
# Primary key
id = Column(Integer, primary_key=True, index=True)
# Foreign keys (always indexed)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True)
payment_id = Column(Integer, ForeignKey("payments.id"), nullable=True, index=True)
# Attributes
status = Column(
String(50),
nullable=False,
default="pending",
index=True, # Frequently filtered
)
total_amount = Column(Numeric(10, 2), nullable=False, default=0)
shipping_address = Column(String(500), nullable=True)
notes = Column(String(1000), nullable=True)
# Soft delete
is_deleted = Column(Boolean, default=False, index=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=True)
# Relationships
user = relationship("User", back_populates="orders")
items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
payment = relationship("Payment", back_populates="order", uselist=False)
# Composite indexes for common queries
__table_args__ = (
Index('idx_user_status', 'user_id', 'status'),
Index('idx_status_created', 'status', 'created_at'),
)
def __repr__(self) -> str:
return f"<Order(id={self.id}, user_id={self.user_id}, status={self.status})>"
Migration Pattern (Alembic)
Creating a Migration
# Auto-generate migration from model changes
alembic revision --autogenerate -m "Add orders table"
# Create empty migration for manual SQL
alembic revision -m "Add custom index"
Migration Template
# migrations/versions/001_add_orders_table.py
"""Add orders table
Revision ID: 001
Revises:
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
"""
Apply migration: Create orders table.
"""
# Create table
op.create_table(
'orders',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('status', sa.String(length=50), nullable=False, server_default='pending'),
sa.Column('total_amount', sa.Numeric(precision=10, scale=2), nullable=False, server_default='0'),
sa.Column('shipping_address', sa.String(length=500), nullable=True),
sa.Column('is_deleted', sa.Boolean(), nullable=False, server_default='false'),
sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('now()')),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# Create indexes
op.create_index('ix_orders_id', 'orders', ['id'])
op.create_index('ix_orders_user_id', 'orders', ['user_id'])
op.create_index('ix_orders_status', 'orders', ['status'])
op.create_index('ix_orders_created_at', 'orders', ['created_at'])
op.create_index('ix_orders_is_deleted', 'orders', ['is_deleted'])
# Composite indexes
op.create_index('idx_user_status', 'orders', ['user_id', 'status'])
op.create_index('idx_status_created', 'orders', ['status', 'created_at'])
# Foreign keys
op.create_foreign_key(
'fk_orders_user_id',
'orders',
'users',
['user_id'],
['id'],
ondelete='CASCADE'
)
def downgrade() -> None:
"""
Rollback migration: Drop orders table.
"""
op.drop_table('orders')
Data Migration Example
"""Migrate order statuses
Revision ID: 002
"""
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
"""
Migrate old status values to new format.
"""
# Update status values
op.execute("""
UPDATE orders
SET status = 'completed'
WHERE status = 'complete'
""")
op.execute("""
UPDATE orders
SET status = 'cancelled'
WHERE status IN ('canceled', 'void')
""")
def downgrade() -> None:
"""
Rollback status migration.
"""
op.execute("""
UPDATE orders
SET status = 'complete'
WHERE status = 'completed'
""")
Connection Pooling Configuration
# app/db/session.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from app.core.config import settings
# Configure connection pool
engine = create_engine(
settings.DATABASE_URL,
poolclass=QueuePool,
pool_size=5, # Number of connections to keep open
max_overflow=10, # Additional connections when pool is full
pool_pre_ping=True, # Verify connections before using
pool_recycle=3600, # Recycle connections after 1 hour
echo=False, # Set True for SQL query logging (development only)
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Transaction Handling
Basic Transaction
from sqlalchemy.orm import Session
def create_order_with_items(db: Session, order_data: OrderCreate) -> Order:
"""
Create order and items in a transaction.
Args:
db: Database session
order_data: Order creation data
Returns:
Order: Created order with items
"""
try:
# Create order
order = Order(user_id=order_data.user_id, status="pending")
db.add(order)
db.flush() # Get order.id without committing
# Create order items
for item_data in order_data.items:
item = OrderItem(
order_id=order.id,
product_id=item_data.product_id,
quantity=item_data.quantity,
)
db.add(item)
# Commit transaction
db.commit()
db.refresh(order)
return order
except Exception as e:
# Rollback on any error
db.rollback()
logger.error(f"Failed to create order: {str(e)}")
raise
Explicit Transaction
from sqlalchemy.orm import Session
def transfer_inventory(db: Session, from_warehouse: int, to_warehouse: int, product_id: int, quantity: int) -> None:
"""
Transfer inventory between warehouses atomically.
"""
# Begin explicit transaction
with db.begin():
# Deduct from source warehouse
source = db.query(Warehouse).filter(
Warehouse.id == from_warehouse,
Warehouse.product_id == product_id
).with_for_update().first()
if source.quantity < quantity:
raise ValueError("Insufficient inventory")
source.quantity -= quantity
# Add to destination warehouse
dest = db.query(Warehouse).filter(
Warehouse.id == to_warehouse,
Warehouse.product_id == product_id
).with_for_update().first()
dest.quantity += quantity
# Commit happens automatically on context exit
Soft Delete Pattern
Never hard-delete important data. Use soft deletes.
# Model
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
is_deleted = Column(Boolean, default=False, index=True)
deleted_at = Column(DateTime, nullable=True)
# Service methods
class ProductService:
@staticmethod
def soft_delete(db: Session, product_id: int) -> Product:
"""Soft delete a product."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise ValueError("Product not found")
product.is_deleted = True
product.deleted_at = datetime.utcnow()
db.commit()
return product
@staticmethod
def get_active_products(db: Session) -> List[Product]:
"""Get all active (non-deleted) products."""
return db.query(Product).filter(Product.is_deleted == False).all()
@staticmethod
def restore(db: Session, product_id: int) -> Product:
"""Restore a soft-deleted product."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise ValueError("Product not found")
product.is_deleted = False
product.deleted_at = None
db.commit()
return product
Timestamp Fields
Always include created_at and updated_at on tables.
from datetime import datetime
from sqlalchemy import Column, DateTime
class BaseModel(Base):
"""Base model with timestamp fields."""
__abstract__ = True
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# All models inherit
class User(BaseModel):
__tablename__ = "users"
# ... other fields
Querying Patterns
Prevent N+1 Queries
Use joinedload or selectinload to eager load relationships.
from sqlalchemy.orm import joinedload, selectinload
# BAD - N+1 query problem
orders = db.query(Order).filter(Order.user_id == user_id).all()
for order in orders:
print(order.user.email) # Triggers a query for EACH order
# GOOD - Eager load with joinedload
orders = db.query(Order).options(
joinedload(Order.user)
).filter(Order.user_id == user_id).all()
for order in orders:
print(order.user.email) # No additional queries
# GOOD - Eager load with selectinload (better for one-to-many)
users = db.query(User).options(
selectinload(User.orders)
).all()
for user in users:
for order in user.orders: # No additional queries
print(order.id)
Filtering
# Simple filter
active_users = db.query(User).filter(User.is_active == True).all()
# Multiple conditions (AND)
recent_orders = db.query(Order).filter(
Order.user_id == user_id,
Order.status == "completed",
Order.created_at >= start_date
).all()
# OR conditions
from sqlalchemy import or_
pending_or_processing = db.query(Order).filter(
or_(
Order.status == "pending",
Order.status == "processing"
)
).all()
# IN clause
statuses = ["pending", "processing", "completed"]
orders = db.query(Order).filter(Order.status.in_(statuses)).all()
# LIKE clause
users = db.query(User).filter(User.email.like("%@example.com")).all()
Pagination
def get_orders_paginated(db: Session, page: int, page_size: int) -> List[Order]:
"""
Get paginated orders.
Args:
db: Database session
page: Page number (1-indexed)
page_size: Number of items per page
Returns:
List[Order]: Page of orders
"""
offset = (page - 1) * page_size
orders = db.query(Order)\
.order_by(Order.created_at.desc())\
.offset(offset)\
.limit(page_size)\
.all()
return orders
Aggregations
from sqlalchemy import func
# Count
total_orders = db.query(func.count(Order.id)).scalar()
# Sum
total_revenue = db.query(func.sum(Order.total_amount))\
.filter(Order.status == "completed")\
.scalar()
# Average
avg_order_value = db.query(func.avg(Order.total_amount)).scalar()
# Group by
from sqlalchemy import func
order_counts_by_status = db.query(
Order.status,
func.count(Order.id).label('count')
).group_by(Order.status).all()
for status, count in order_counts_by_status:
print(f"{status}: {count}")
Common PostgreSQL Data Types
from sqlalchemy import (
Column,
Integer, # Whole numbers
BigInteger, # Large whole numbers
String, # Variable-length text
Text, # Unlimited text
Boolean, # True/False
DateTime, # Date and time
Date, # Date only
Time, # Time only
Numeric, # Decimal numbers (for money)
Float, # Floating-point numbers
JSON, # JSON data (stored as JSON)
ARRAY, # PostgreSQL arrays
)
from sqlalchemy.dialects.postgresql import (
UUID, # UUID type
JSONB, # JSON with indexing
INET, # IP address
)
# Examples
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
price = Column(Numeric(10, 2), nullable=False) # e.g., 999999.99
is_active = Column(Boolean, default=True)
stock_count = Column(Integer, default=0)
metadata = Column(JSONB, nullable=True) # Store arbitrary JSON
tags = Column(ARRAY(String), nullable=True) # Array of strings
created_at = Column(DateTime, default=datetime.utcnow)
Performance Tips
1. Use EXPLAIN ANALYZE
# Check query performance
from sqlalchemy import text
query = db.query(Order).filter(Order.status == "pending")
# Get query execution plan
explain = db.execute(
text("EXPLAIN ANALYZE " + str(query.statement.compile(compile_kwargs={"literal_binds": True})))
).fetchall()
for row in explain:
print(row[0])
2. Add Indexes Based on Slow Queries
# Enable slow query log in PostgreSQL
# Add to postgresql.conf:
log_min_duration_statement = 1000 # Log queries > 1 second
# View slow queries in logs and add appropriate indexes
3. Use Database Constraints
from sqlalchemy import CheckConstraint, UniqueConstraint
class Product(Base):
__tablename__ = "products"
price = Column(Numeric(10, 2), nullable=False)
stock_count = Column(Integer, nullable=False)
__table_args__ = (
CheckConstraint('price >= 0', name='price_positive'),
CheckConstraint('stock_count >= 0', name='stock_positive'),
UniqueConstraint('sku', name='uq_product_sku'),
)
4. Batch Operations
# BAD - Multiple individual inserts
for item_data in items:
item = OrderItem(**item_data)
db.add(item)
db.commit() # Slow!
# GOOD - Batch insert
items_to_create = [OrderItem(**item_data) for item_data in items]
db.bulk_save_objects(items_to_create)
db.commit() # One commit for all
Database Schema Example
Complete example for an e-commerce system:
# app/models/user.py
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, nullable=False, index=True)
hashed_password = Column(String(255), nullable=False)
full_name = Column(String(100), nullable=False)
is_active = Column(Boolean, default=True, index=True)
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow, index=True)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# Relationships
orders = relationship("Order", back_populates="user")
# app/models/product.py
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(255), nullable=False)
sku = Column(String(100), unique=True, nullable=False, index=True)
price = Column(Numeric(10, 2), nullable=False)
inventory_count = Column(Integer, default=0, nullable=False)
is_active = Column(Boolean, default=True, index=True)
created_at = Column(DateTime, default=datetime.utcnow, index=True)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# app/models/order.py
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
status = Column(String(50), default="pending", nullable=False, index=True)
total_amount = Column(Numeric(10, 2), default=0, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, index=True)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="orders")
items = relationship("OrderItem", back_populates="order")
__table_args__ = (
Index('idx_user_status', 'user_id', 'status'),
)
# app/models/order_item.py
class OrderItem(Base):
__tablename__ = "order_items"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False, index=True)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False, index=True)
quantity = Column(Integer, nullable=False)
unit_price = Column(Numeric(10, 2), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
order = relationship("Order", back_populates="items")
product = relationship("Product")
Next Steps
- Review
testing-standards.mdfor database testing patterns - See
../04-templates/migration-template.sqlfor migration template - Check
../05-runbooks/common-issues.mdfor database troubleshooting