API Patterns
This document defines our standard API patterns and best practices.
Response Time Requirement
RULE: All API endpoints MUST respond in <500ms (95th percentile)
This is non-negotiable. If an operation takes longer, it must be processed asynchronously.
Capture-and-Process-Async Pattern
For any operation that might take >500ms, use this pattern:
- Capture: Immediately save the request data
- Respond: Return 200 OK quickly
- Process: Handle the work asynchronously via background worker
Complete Example: Processing a Large Order
# app/api/v1/endpoints/orders.py
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from app.api import deps
from app.schemas.order import OrderCreate, OrderResponse
from app.models.order import Order
from app.tasks.order_tasks import process_order_task
from app.core.logging import get_logger
logger = get_logger(__name__)
router = APIRouter()
@router.post("/orders", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
def create_order(
order_in: OrderCreate,
db: Session = Depends(deps.get_db),
current_user: User = Depends(deps.get_current_user),
) -> OrderResponse:
"""
Create a new order and process it asynchronously.
This endpoint immediately creates an order record with status 'pending',
returns the order to the client, and queues background processing.
Args:
order_in: Order creation data
db: Database session
current_user: Authenticated user
Returns:
OrderResponse: The created order with status 'pending'
Example:
```python
POST /api/v1/orders
{
"items": [
{"product_id": 123, "quantity": 2},
{"product_id": 456, "quantity": 1}
],
"shipping_address": {...}
}
Response (< 100ms):
{
"id": 789,
"status": "pending",
"items": [...],
"created_at": "2025-01-15T10:30:00Z"
}
```
"""
logger.info(f"Creating order for user {current_user.id}")
# STEP 1: CAPTURE - Save order immediately
order = Order(
user_id=current_user.id,
status="pending", # Initial status
total_amount=0, # Will be calculated in background
)
db.add(order)
db.commit()
db.refresh(order)
# Save order items
for item_data in order_in.items:
order_item = OrderItem(
order_id=order.id,
product_id=item_data.product_id,
quantity=item_data.quantity,
)
db.add(order_item)
db.commit()
logger.info(f"Order {order.id} created, queuing for processing")
# STEP 2: QUEUE - Queue background processing
process_order_task.delay(order.id)
# STEP 3: RESPOND - Return immediately
return order
# app/tasks/order_tasks.py
from celery import Task
from sqlalchemy.orm import Session
from app.worker import celery_app
from app.db.session import SessionLocal
from app.models.order import Order
from app.services.inventory_service import InventoryService
from app.services.payment_service import PaymentService
from app.services.email_service import EmailService
from app.core.logging import get_logger
logger = get_logger(__name__)
@celery_app.task(bind=True, max_retries=3)
def process_order_task(self: Task, order_id: int) -> None:
"""
Process order: validate inventory, charge payment, send confirmation.
This task runs asynchronously and can take 2-10 seconds.
Args:
order_id: ID of the order to process
Raises:
Retry: If processing fails (max 3 attempts)
"""
db: Session = SessionLocal()
try:
logger.info(f"Processing order {order_id}")
# Fetch order
order = db.query(Order).filter(Order.id == order_id).first()
if not order:
logger.error(f"Order {order_id} not found")
return
# Update status
order.status = "processing"
db.commit()
# Check inventory (may take 1-2 seconds)
try:
InventoryService.reserve_inventory(db, order)
except ValueError as e:
logger.error(f"Inventory check failed for order {order_id}: {str(e)}")
order.status = "failed"
order.failure_reason = "Insufficient inventory"
db.commit()
return
# Process payment (may take 2-5 seconds)
try:
payment = PaymentService.charge_order(db, order)
order.payment_id = payment.id
except Exception as e:
logger.error(f"Payment failed for order {order_id}: {str(e)}")
# Release inventory
InventoryService.release_inventory(db, order)
order.status = "failed"
order.failure_reason = "Payment failed"
db.commit()
# Retry with exponential backoff
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
# Mark as completed
order.status = "completed"
db.commit()
logger.info(f"Order {order_id} processed successfully")
# Send confirmation email (fire and forget)
EmailService.send_order_confirmation.delay(order_id)
except Exception as e:
logger.error(f"Unexpected error processing order {order_id}: {str(e)}")
raise
finally:
db.close()
Webhook Handler Pattern
Webhooks from external services (Shopify, Stripe, etc.) MUST follow this pattern:
Complete Webhook Example
# app/api/v1/endpoints/webhooks.py
from fastapi import APIRouter, Request, HTTPException, status, Header
from sqlalchemy.orm import Session
import hmac
import hashlib
from typing import Optional
from app.api import deps
from app.models.webhook_event import WebhookEvent
from app.tasks.webhook_tasks import process_shopify_order_webhook
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
router = APIRouter()
def verify_shopify_webhook(payload: bytes, hmac_header: str) -> bool:
"""
Verify Shopify webhook signature.
Args:
payload: Raw request body
hmac_header: X-Shopify-Hmac-SHA256 header value
Returns:
bool: True if signature is valid
"""
computed_hmac = hmac.new(
settings.SHOPIFY_WEBHOOK_SECRET.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_hmac, hmac_header)
@router.post("/webhooks/shopify/orders/create")
async def shopify_order_created(
request: Request,
db: Session = Depends(deps.get_db),
x_shopify_hmac_sha256: Optional[str] = Header(None),
x_shopify_shop_domain: Optional[str] = Header(None),
) -> dict:
"""
Handle Shopify order creation webhook.
This endpoint receives order data from Shopify, verifies the signature,
stores the raw payload, and queues processing.
Args:
request: FastAPI request object
db: Database session
x_shopify_hmac_sha256: Shopify signature header
x_shopify_shop_domain: Shop domain header
Returns:
dict: Success response
Raises:
HTTPException 401: If webhook signature is invalid
"""
# STEP 1: Read raw body
body = await request.body()
# STEP 2: Verify signature
if not x_shopify_hmac_sha256:
logger.warning("Shopify webhook received without HMAC header")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing HMAC signature"
)
if not verify_shopify_webhook(body, x_shopify_hmac_sha256):
logger.warning("Invalid Shopify webhook signature")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid signature"
)
# STEP 3: Store raw webhook payload
webhook_event = WebhookEvent(
source="shopify",
event_type="orders/create",
shop_domain=x_shopify_shop_domain,
payload=body.decode('utf-8'),
status="pending",
)
db.add(webhook_event)
db.commit()
db.refresh(webhook_event)
logger.info(f"Shopify webhook stored: {webhook_event.id}")
# STEP 4: Queue processing
process_shopify_order_webhook.delay(webhook_event.id)
# STEP 5: Return 200 immediately
return {"status": "accepted", "webhook_id": webhook_event.id}
# app/tasks/webhook_tasks.py
from celery import Task
import json
from app.worker import celery_app
from app.db.session import SessionLocal
from app.models.webhook_event import WebhookEvent
from app.services.shopify_service import ShopifyService
from app.core.logging import get_logger
logger = get_logger(__name__)
@celery_app.task(bind=True, max_retries=5)
def process_shopify_order_webhook(self: Task, webhook_id: int) -> None:
"""
Process Shopify order webhook asynchronously.
Args:
webhook_id: ID of the webhook event to process
"""
db = SessionLocal()
try:
webhook_event = db.query(WebhookEvent).filter(
WebhookEvent.id == webhook_id
).first()
if not webhook_event:
logger.error(f"Webhook event {webhook_id} not found")
return
# Parse payload
try:
payload = json.loads(webhook_event.payload)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in webhook {webhook_id}: {str(e)}")
webhook_event.status = "failed"
webhook_event.error_message = "Invalid JSON payload"
db.commit()
return
# Update status
webhook_event.status = "processing"
db.commit()
# Process the order (may call Shopify API, update database, etc.)
try:
ShopifyService.process_order_created(db, payload)
webhook_event.status = "completed"
webhook_event.processed_at = datetime.utcnow()
db.commit()
logger.info(f"Webhook {webhook_id} processed successfully")
except Exception as e:
logger.error(f"Failed to process webhook {webhook_id}: {str(e)}")
webhook_event.status = "failed"
webhook_event.error_message = str(e)
db.commit()
# Retry with exponential backoff
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
except Exception as e:
logger.error(f"Unexpected error processing webhook {webhook_id}: {str(e)}")
raise
finally:
db.close()
Pagination Pattern
Use cursor-based pagination for better performance with large datasets.
# app/schemas/pagination.py
from typing import Generic, TypeVar, List, Optional
from pydantic import BaseModel
T = TypeVar('T')
class PaginatedResponse(BaseModel, Generic[T]):
"""Generic paginated response."""
items: List[T]
total: int
page: int
page_size: int
has_next: bool
# app/api/v1/endpoints/orders.py
@router.get("/orders", response_model=PaginatedResponse[OrderResponse])
def list_orders(
page: int = 1,
page_size: int = 20,
status: Optional[str] = None,
db: Session = Depends(deps.get_db),
current_user: User = Depends(deps.get_current_user),
) -> PaginatedResponse[OrderResponse]:
"""
List orders with pagination and filtering.
Args:
page: Page number (1-indexed)
page_size: Number of items per page (max 100)
status: Filter by order status
db: Database session
current_user: Authenticated user
Returns:
PaginatedResponse[OrderResponse]: Paginated list of orders
"""
# Limit page size
page_size = min(page_size, 100)
# Build query
query = db.query(Order).filter(Order.user_id == current_user.id)
if status:
query = query.filter(Order.status == status)
# Get total count
total = query.count()
# Apply pagination
offset = (page - 1) * page_size
orders = query.order_by(Order.created_at.desc()).offset(offset).limit(page_size).all()
# Check if there's a next page
has_next = (offset + page_size) < total
return PaginatedResponse(
items=orders,
total=total,
page=page,
page_size=page_size,
has_next=has_next,
)
Filtering and Sorting Pattern
from typing import Optional
from enum import Enum
class OrderSortBy(str, Enum):
"""Valid sort fields for orders."""
CREATED_AT = "created_at"
TOTAL_AMOUNT = "total_amount"
STATUS = "status"
@router.get("/orders", response_model=List[OrderResponse])
def list_orders(
status: Optional[str] = None,
min_amount: Optional[float] = None,
max_amount: Optional[float] = None,
sort_by: OrderSortBy = OrderSortBy.CREATED_AT,
sort_desc: bool = True,
db: Session = Depends(deps.get_db),
current_user: User = Depends(deps.get_current_user),
) -> List[OrderResponse]:
"""
List orders with filtering and sorting.
Args:
status: Filter by order status
min_amount: Minimum order amount
max_amount: Maximum order amount
sort_by: Field to sort by
sort_desc: Sort descending if True, ascending if False
db: Database session
current_user: Authenticated user
Returns:
List[OrderResponse]: Filtered and sorted orders
"""
query = db.query(Order).filter(Order.user_id == current_user.id)
# Apply filters
if status:
query = query.filter(Order.status == status)
if min_amount is not None:
query = query.filter(Order.total_amount >= min_amount)
if max_amount is not None:
query = query.filter(Order.total_amount <= max_amount)
# Apply sorting
sort_column = getattr(Order, sort_by.value)
if sort_desc:
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
return query.all()
Rate Limiting Pattern
# app/core/rate_limit.py
from fastapi import HTTPException, status, Request
from typing import Optional
import time
from app.core.redis import redis_client
from app.core.logging import get_logger
logger = get_logger(__name__)
def rate_limit(key: str, max_requests: int, window_seconds: int) -> None:
"""
Check if request is within rate limit.
Args:
key: Rate limit key (e.g., user ID or IP)
max_requests: Maximum requests allowed
window_seconds: Time window in seconds
Raises:
HTTPException 429: If rate limit exceeded
"""
current = redis_client.get(key)
if current is None:
# First request in window
redis_client.setex(key, window_seconds, 1)
return
if int(current) >= max_requests:
logger.warning(f"Rate limit exceeded for {key}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded. Max {max_requests} requests per {window_seconds} seconds."
)
# Increment counter
redis_client.incr(key)
# Usage in endpoint
@router.post("/api/v1/orders")
def create_order(
request: Request,
order_in: OrderCreate,
current_user: User = Depends(deps.get_current_user),
) -> OrderResponse:
"""Create order with rate limiting."""
# Rate limit: 10 orders per minute per user
rate_limit(f"orders:user:{current_user.id}", max_requests=10, window_seconds=60)
# Continue with order creation...
API Versioning Strategy
All API endpoints MUST be versioned under /api/v1/, /api/v2/, etc.
# app/api/v1/__init__.py
from fastapi import APIRouter
from app.api.v1.endpoints import users, orders, webhooks
api_router = APIRouter()
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(orders.router, prefix="/orders", tags=["orders"])
api_router.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"])
# app/main.py
from fastapi import FastAPI
from app.api.v1 import api_router as api_v1_router
app = FastAPI(title="Your App API")
app.include_router(api_v1_router, prefix="/api/v1")
# When you need breaking changes, create v2
# from app.api.v2 import api_router as api_v2_router
# app.include_router(api_v2_router, prefix="/api/v2")
Error Response Format
All errors return consistent JSON structure.
# app/schemas/error.py
from pydantic import BaseModel
from typing import Optional, Dict, Any
class ErrorResponse(BaseModel):
"""Standard error response format."""
detail: str
error_code: Optional[str] = None
errors: Optional[Dict[str, Any]] = None
# Example error responses:
# 400 Bad Request
{
"detail": "Email already exists",
"error_code": "EMAIL_EXISTS"
}
# 422 Validation Error (automatic from Pydantic)
{
"detail": [
{
"loc": ["body", "email"],
"msg": "value is not a valid email address",
"type": "value_error.email"
}
]
}
# 401 Unauthorized
{
"detail": "Could not validate credentials"
}
# 403 Forbidden
{
"detail": "Only administrators can perform this action"
}
Success Response Format
Consistent success responses.
# Single resource
{
"id": 123,
"email": "user@example.com",
"created_at": "2025-01-15T10:30:00Z"
}
# List of resources
{
"items": [...],
"total": 50,
"page": 1,
"page_size": 20,
"has_next": true
}
# Action completed
{
"status": "success",
"message": "Order cancelled successfully",
"order_id": 456
}
Authentication Pattern
JWT with refresh tokens.
# app/api/v1/endpoints/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from app.core.security import create_access_token, create_refresh_token
from app.schemas.auth import Token, TokenRefresh
from app.services.auth_service import AuthService
router = APIRouter()
@router.post("/auth/login", response_model=Token)
def login(
db: Session = Depends(deps.get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
"""
Authenticate user and return access + refresh tokens.
Args:
db: Database session
form_data: Username (email) and password
Returns:
Token: Access token and refresh token
Raises:
HTTPException 401: If credentials are invalid
"""
user = AuthService.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
# Create tokens
access_token = create_access_token(subject=user.id)
refresh_token = create_refresh_token(subject=user.id)
# Store refresh token in Redis
redis_client.setex(
f"refresh_token:{user.id}",
timedelta(days=7),
refresh_token
)
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
CORS Configuration
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS, # ["http://localhost:3000", "https://app.example.com"]
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Request/Response Logging
# app/middleware/logging.py
import time
from fastapi import Request
from app.core.logging import get_logger
logger = get_logger(__name__)
async def log_requests(request: Request, call_next):
"""Log all API requests and responses."""
start_time = time.time()
# Log request
logger.info(
"Request started",
extra={
"method": request.method,
"path": request.url.path,
"client": request.client.host if request.client else None,
}
)
# Process request
response = await call_next(request)
# Calculate duration
duration = time.time() - start_time
# Log response
logger.info(
"Request completed",
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration * 1000, 2),
}
)
return response
# Add to app
from app.main import app
app.middleware("http")(log_requests)
API Contract Documentation
IMPORTANT: All public API endpoints MUST be documented in docs/api/endpoints.md.
Update the API documentation BEFORE implementing the endpoint.
See ../api/endpoints.md for the template and examples.
Next Steps
- Review
database-patterns.mdfor database design - Check
testing-standards.mdfor testing patterns - See
../03-workflows/feature-development.mdfor implementation workflow