This document defines our standard API patterns and best practices.

Response Time Requirement

RULE: All API endpoints MUST respond in <500ms (95th percentile)

This is non-negotiable. If an operation takes longer, it must be processed asynchronously.


Capture-and-Process-Async Pattern

For any operation that might take >500ms, use this pattern:

  1. Capture: Immediately save the request data
  2. Respond: Return 200 OK quickly
  3. Process: Handle the work asynchronously via background worker

Complete Example: Processing a Large Order

  # app/api/v1/endpoints/orders.py
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session

from app.api import deps
from app.schemas.order import OrderCreate, OrderResponse
from app.models.order import Order
from app.tasks.order_tasks import process_order_task
from app.core.logging import get_logger

logger = get_logger(__name__)
router = APIRouter()


@router.post("/orders", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
def create_order(
    order_in: OrderCreate,
    db: Session = Depends(deps.get_db),
    current_user: User = Depends(deps.get_current_user),
) -> OrderResponse:
    """
    Create a new order and process it asynchronously.

    This endpoint immediately creates an order record with status 'pending',
    returns the order to the client, and queues background processing.

    Args:
        order_in: Order creation data
        db: Database session
        current_user: Authenticated user

    Returns:
        OrderResponse: The created order with status 'pending'

    Example:
        ```python
        POST /api/v1/orders
        {
            "items": [
                {"product_id": 123, "quantity": 2},
                {"product_id": 456, "quantity": 1}
            ],
            "shipping_address": {...}
        }

        Response (< 100ms):
        {
            "id": 789,
            "status": "pending",
            "items": [...],
            "created_at": "2025-01-15T10:30:00Z"
        }
        ```
    """
    logger.info(f"Creating order for user {current_user.id}")

    # STEP 1: CAPTURE - Save order immediately
    order = Order(
        user_id=current_user.id,
        status="pending",  # Initial status
        total_amount=0,  # Will be calculated in background
    )
    db.add(order)
    db.commit()
    db.refresh(order)

    # Save order items
    for item_data in order_in.items:
        order_item = OrderItem(
            order_id=order.id,
            product_id=item_data.product_id,
            quantity=item_data.quantity,
        )
        db.add(order_item)
    db.commit()

    logger.info(f"Order {order.id} created, queuing for processing")

    # STEP 2: QUEUE - Queue background processing
    process_order_task.delay(order.id)

    # STEP 3: RESPOND - Return immediately
    return order
  
  # app/tasks/order_tasks.py
from celery import Task
from sqlalchemy.orm import Session

from app.worker import celery_app
from app.db.session import SessionLocal
from app.models.order import Order
from app.services.inventory_service import InventoryService
from app.services.payment_service import PaymentService
from app.services.email_service import EmailService
from app.core.logging import get_logger

logger = get_logger(__name__)


@celery_app.task(bind=True, max_retries=3)
def process_order_task(self: Task, order_id: int) -> None:
    """
    Process order: validate inventory, charge payment, send confirmation.

    This task runs asynchronously and can take 2-10 seconds.

    Args:
        order_id: ID of the order to process

    Raises:
        Retry: If processing fails (max 3 attempts)
    """
    db: Session = SessionLocal()

    try:
        logger.info(f"Processing order {order_id}")

        # Fetch order
        order = db.query(Order).filter(Order.id == order_id).first()
        if not order:
            logger.error(f"Order {order_id} not found")
            return

        # Update status
        order.status = "processing"
        db.commit()

        # Check inventory (may take 1-2 seconds)
        try:
            InventoryService.reserve_inventory(db, order)
        except ValueError as e:
            logger.error(f"Inventory check failed for order {order_id}: {str(e)}")
            order.status = "failed"
            order.failure_reason = "Insufficient inventory"
            db.commit()
            return

        # Process payment (may take 2-5 seconds)
        try:
            payment = PaymentService.charge_order(db, order)
            order.payment_id = payment.id
        except Exception as e:
            logger.error(f"Payment failed for order {order_id}: {str(e)}")
            # Release inventory
            InventoryService.release_inventory(db, order)
            order.status = "failed"
            order.failure_reason = "Payment failed"
            db.commit()
            # Retry with exponential backoff
            raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))

        # Mark as completed
        order.status = "completed"
        db.commit()

        logger.info(f"Order {order_id} processed successfully")

        # Send confirmation email (fire and forget)
        EmailService.send_order_confirmation.delay(order_id)

    except Exception as e:
        logger.error(f"Unexpected error processing order {order_id}: {str(e)}")
        raise
    finally:
        db.close()
  

Webhook Handler Pattern

Webhooks from external services (Shopify, Stripe, etc.) MUST follow this pattern:

Complete Webhook Example

  # app/api/v1/endpoints/webhooks.py
from fastapi import APIRouter, Request, HTTPException, status, Header
from sqlalchemy.orm import Session
import hmac
import hashlib
from typing import Optional

from app.api import deps
from app.models.webhook_event import WebhookEvent
from app.tasks.webhook_tasks import process_shopify_order_webhook
from app.core.config import settings
from app.core.logging import get_logger

logger = get_logger(__name__)
router = APIRouter()


def verify_shopify_webhook(payload: bytes, hmac_header: str) -> bool:
    """
    Verify Shopify webhook signature.

    Args:
        payload: Raw request body
        hmac_header: X-Shopify-Hmac-SHA256 header value

    Returns:
        bool: True if signature is valid
    """
    computed_hmac = hmac.new(
        settings.SHOPIFY_WEBHOOK_SECRET.encode('utf-8'),
        payload,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(computed_hmac, hmac_header)


@router.post("/webhooks/shopify/orders/create")
async def shopify_order_created(
    request: Request,
    db: Session = Depends(deps.get_db),
    x_shopify_hmac_sha256: Optional[str] = Header(None),
    x_shopify_shop_domain: Optional[str] = Header(None),
) -> dict:
    """
    Handle Shopify order creation webhook.

    This endpoint receives order data from Shopify, verifies the signature,
    stores the raw payload, and queues processing.

    Args:
        request: FastAPI request object
        db: Database session
        x_shopify_hmac_sha256: Shopify signature header
        x_shopify_shop_domain: Shop domain header

    Returns:
        dict: Success response

    Raises:
        HTTPException 401: If webhook signature is invalid
    """
    # STEP 1: Read raw body
    body = await request.body()

    # STEP 2: Verify signature
    if not x_shopify_hmac_sha256:
        logger.warning("Shopify webhook received without HMAC header")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing HMAC signature"
        )

    if not verify_shopify_webhook(body, x_shopify_hmac_sha256):
        logger.warning("Invalid Shopify webhook signature")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid signature"
        )

    # STEP 3: Store raw webhook payload
    webhook_event = WebhookEvent(
        source="shopify",
        event_type="orders/create",
        shop_domain=x_shopify_shop_domain,
        payload=body.decode('utf-8'),
        status="pending",
    )
    db.add(webhook_event)
    db.commit()
    db.refresh(webhook_event)

    logger.info(f"Shopify webhook stored: {webhook_event.id}")

    # STEP 4: Queue processing
    process_shopify_order_webhook.delay(webhook_event.id)

    # STEP 5: Return 200 immediately
    return {"status": "accepted", "webhook_id": webhook_event.id}
  
  # app/tasks/webhook_tasks.py
from celery import Task
import json

from app.worker import celery_app
from app.db.session import SessionLocal
from app.models.webhook_event import WebhookEvent
from app.services.shopify_service import ShopifyService
from app.core.logging import get_logger

logger = get_logger(__name__)


@celery_app.task(bind=True, max_retries=5)
def process_shopify_order_webhook(self: Task, webhook_id: int) -> None:
    """
    Process Shopify order webhook asynchronously.

    Args:
        webhook_id: ID of the webhook event to process
    """
    db = SessionLocal()

    try:
        webhook_event = db.query(WebhookEvent).filter(
            WebhookEvent.id == webhook_id
        ).first()

        if not webhook_event:
            logger.error(f"Webhook event {webhook_id} not found")
            return

        # Parse payload
        try:
            payload = json.loads(webhook_event.payload)
        except json.JSONDecodeError as e:
            logger.error(f"Invalid JSON in webhook {webhook_id}: {str(e)}")
            webhook_event.status = "failed"
            webhook_event.error_message = "Invalid JSON payload"
            db.commit()
            return

        # Update status
        webhook_event.status = "processing"
        db.commit()

        # Process the order (may call Shopify API, update database, etc.)
        try:
            ShopifyService.process_order_created(db, payload)
            webhook_event.status = "completed"
            webhook_event.processed_at = datetime.utcnow()
            db.commit()
            logger.info(f"Webhook {webhook_id} processed successfully")

        except Exception as e:
            logger.error(f"Failed to process webhook {webhook_id}: {str(e)}")
            webhook_event.status = "failed"
            webhook_event.error_message = str(e)
            db.commit()

            # Retry with exponential backoff
            raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))

    except Exception as e:
        logger.error(f"Unexpected error processing webhook {webhook_id}: {str(e)}")
        raise
    finally:
        db.close()
  

Pagination Pattern

Use cursor-based pagination for better performance with large datasets.

  # app/schemas/pagination.py
from typing import Generic, TypeVar, List, Optional
from pydantic import BaseModel

T = TypeVar('T')


class PaginatedResponse(BaseModel, Generic[T]):
    """Generic paginated response."""

    items: List[T]
    total: int
    page: int
    page_size: int
    has_next: bool


# app/api/v1/endpoints/orders.py
@router.get("/orders", response_model=PaginatedResponse[OrderResponse])
def list_orders(
    page: int = 1,
    page_size: int = 20,
    status: Optional[str] = None,
    db: Session = Depends(deps.get_db),
    current_user: User = Depends(deps.get_current_user),
) -> PaginatedResponse[OrderResponse]:
    """
    List orders with pagination and filtering.

    Args:
        page: Page number (1-indexed)
        page_size: Number of items per page (max 100)
        status: Filter by order status
        db: Database session
        current_user: Authenticated user

    Returns:
        PaginatedResponse[OrderResponse]: Paginated list of orders
    """
    # Limit page size
    page_size = min(page_size, 100)

    # Build query
    query = db.query(Order).filter(Order.user_id == current_user.id)

    if status:
        query = query.filter(Order.status == status)

    # Get total count
    total = query.count()

    # Apply pagination
    offset = (page - 1) * page_size
    orders = query.order_by(Order.created_at.desc()).offset(offset).limit(page_size).all()

    # Check if there's a next page
    has_next = (offset + page_size) < total

    return PaginatedResponse(
        items=orders,
        total=total,
        page=page,
        page_size=page_size,
        has_next=has_next,
    )
  

Filtering and Sorting Pattern

  from typing import Optional
from enum import Enum


class OrderSortBy(str, Enum):
    """Valid sort fields for orders."""
    CREATED_AT = "created_at"
    TOTAL_AMOUNT = "total_amount"
    STATUS = "status"


@router.get("/orders", response_model=List[OrderResponse])
def list_orders(
    status: Optional[str] = None,
    min_amount: Optional[float] = None,
    max_amount: Optional[float] = None,
    sort_by: OrderSortBy = OrderSortBy.CREATED_AT,
    sort_desc: bool = True,
    db: Session = Depends(deps.get_db),
    current_user: User = Depends(deps.get_current_user),
) -> List[OrderResponse]:
    """
    List orders with filtering and sorting.

    Args:
        status: Filter by order status
        min_amount: Minimum order amount
        max_amount: Maximum order amount
        sort_by: Field to sort by
        sort_desc: Sort descending if True, ascending if False
        db: Database session
        current_user: Authenticated user

    Returns:
        List[OrderResponse]: Filtered and sorted orders
    """
    query = db.query(Order).filter(Order.user_id == current_user.id)

    # Apply filters
    if status:
        query = query.filter(Order.status == status)
    if min_amount is not None:
        query = query.filter(Order.total_amount >= min_amount)
    if max_amount is not None:
        query = query.filter(Order.total_amount <= max_amount)

    # Apply sorting
    sort_column = getattr(Order, sort_by.value)
    if sort_desc:
        query = query.order_by(sort_column.desc())
    else:
        query = query.order_by(sort_column.asc())

    return query.all()
  

Rate Limiting Pattern

  # app/core/rate_limit.py
from fastapi import HTTPException, status, Request
from typing import Optional
import time

from app.core.redis import redis_client
from app.core.logging import get_logger

logger = get_logger(__name__)


def rate_limit(key: str, max_requests: int, window_seconds: int) -> None:
    """
    Check if request is within rate limit.

    Args:
        key: Rate limit key (e.g., user ID or IP)
        max_requests: Maximum requests allowed
        window_seconds: Time window in seconds

    Raises:
        HTTPException 429: If rate limit exceeded
    """
    current = redis_client.get(key)

    if current is None:
        # First request in window
        redis_client.setex(key, window_seconds, 1)
        return

    if int(current) >= max_requests:
        logger.warning(f"Rate limit exceeded for {key}")
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail=f"Rate limit exceeded. Max {max_requests} requests per {window_seconds} seconds."
        )

    # Increment counter
    redis_client.incr(key)


# Usage in endpoint
@router.post("/api/v1/orders")
def create_order(
    request: Request,
    order_in: OrderCreate,
    current_user: User = Depends(deps.get_current_user),
) -> OrderResponse:
    """Create order with rate limiting."""
    # Rate limit: 10 orders per minute per user
    rate_limit(f"orders:user:{current_user.id}", max_requests=10, window_seconds=60)

    # Continue with order creation...
  

API Versioning Strategy

All API endpoints MUST be versioned under /api/v1/, /api/v2/, etc.

  # app/api/v1/__init__.py
from fastapi import APIRouter

from app.api.v1.endpoints import users, orders, webhooks

api_router = APIRouter()

api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(orders.router, prefix="/orders", tags=["orders"])
api_router.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"])


# app/main.py
from fastapi import FastAPI
from app.api.v1 import api_router as api_v1_router

app = FastAPI(title="Your App API")

app.include_router(api_v1_router, prefix="/api/v1")

# When you need breaking changes, create v2
# from app.api.v2 import api_router as api_v2_router
# app.include_router(api_v2_router, prefix="/api/v2")
  

Error Response Format

All errors return consistent JSON structure.

  # app/schemas/error.py
from pydantic import BaseModel
from typing import Optional, Dict, Any


class ErrorResponse(BaseModel):
    """Standard error response format."""

    detail: str
    error_code: Optional[str] = None
    errors: Optional[Dict[str, Any]] = None


# Example error responses:

# 400 Bad Request
{
    "detail": "Email already exists",
    "error_code": "EMAIL_EXISTS"
}

# 422 Validation Error (automatic from Pydantic)
{
    "detail": [
        {
            "loc": ["body", "email"],
            "msg": "value is not a valid email address",
            "type": "value_error.email"
        }
    ]
}

# 401 Unauthorized
{
    "detail": "Could not validate credentials"
}

# 403 Forbidden
{
    "detail": "Only administrators can perform this action"
}
  

Success Response Format

Consistent success responses.

  # Single resource
{
    "id": 123,
    "email": "user@example.com",
    "created_at": "2025-01-15T10:30:00Z"
}

# List of resources
{
    "items": [...],
    "total": 50,
    "page": 1,
    "page_size": 20,
    "has_next": true
}

# Action completed
{
    "status": "success",
    "message": "Order cancelled successfully",
    "order_id": 456
}
  

Authentication Pattern

JWT with refresh tokens.

  # app/api/v1/endpoints/auth.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

from app.core.security import create_access_token, create_refresh_token
from app.schemas.auth import Token, TokenRefresh
from app.services.auth_service import AuthService

router = APIRouter()


@router.post("/auth/login", response_model=Token)
def login(
    db: Session = Depends(deps.get_db),
    form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
    """
    Authenticate user and return access + refresh tokens.

    Args:
        db: Database session
        form_data: Username (email) and password

    Returns:
        Token: Access token and refresh token

    Raises:
        HTTPException 401: If credentials are invalid
    """
    user = AuthService.authenticate_user(db, form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
        )

    # Create tokens
    access_token = create_access_token(subject=user.id)
    refresh_token = create_refresh_token(subject=user.id)

    # Store refresh token in Redis
    redis_client.setex(
        f"refresh_token:{user.id}",
        timedelta(days=7),
        refresh_token
    )

    return Token(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer"
    )
  

CORS Configuration

  # app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.core.config import settings

app = FastAPI()

# Configure CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.CORS_ORIGINS,  # ["http://localhost:3000", "https://app.example.com"]
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
  

Request/Response Logging

  # app/middleware/logging.py
import time
from fastapi import Request
from app.core.logging import get_logger

logger = get_logger(__name__)


async def log_requests(request: Request, call_next):
    """Log all API requests and responses."""
    start_time = time.time()

    # Log request
    logger.info(
        "Request started",
        extra={
            "method": request.method,
            "path": request.url.path,
            "client": request.client.host if request.client else None,
        }
    )

    # Process request
    response = await call_next(request)

    # Calculate duration
    duration = time.time() - start_time

    # Log response
    logger.info(
        "Request completed",
        extra={
            "method": request.method,
            "path": request.url.path,
            "status_code": response.status_code,
            "duration_ms": round(duration * 1000, 2),
        }
    )

    return response


# Add to app
from app.main import app
app.middleware("http")(log_requests)
  

API Contract Documentation

IMPORTANT: All public API endpoints MUST be documented in docs/api/endpoints.md.

Update the API documentation BEFORE implementing the endpoint.

See ../api/endpoints.md for the template and examples.


Next Steps