Backend Architecture
Colony's backend follows a modular, domain-driven architecture using FastAPI with clear separation of concerns and industry best practices.
Architecture Overview
The backend is built using FastAPI with a domain-driven design approach, where each business domain (auth, expenses, cycles, etc.) is organized as a self-contained module with its own models, schemas, services, and routes.
Key Principles
- Domain-Driven Design: Each business domain is a separate module
- Separation of Concerns: Clear boundaries between layers (routes, services, data)
- Dependency Injection: Clean dependency management
- Type Safety: Full type annotations with Pydantic
- Testing: Comprehensive test coverage per domain
Project Structure
backend/
├── alembic/ # Database migrations
│ ├── versions/
│ └── env.py
├── app/ # Main application source
│ ├── auth/ # Authentication domain
│ │ ├── router.py # Auth API routes
│ │ ├── schemas.py # Pydantic request/response models
│ │ ├── models.py # SQLAlchemy database models
│ │ ├── dependencies.py # Auth-specific dependencies
│ │ ├── service.py # Authentication business logic
│ │ ├── config.py # Auth configuration (JWT settings)
│ │ ├── constants.py # Auth constants and error codes
│ │ ├── exceptions.py # Auth-specific exceptions
│ │ └── utils.py # Auth utility functions
│ ├── payment_methods/ # Payment methods domain
│ │ ├── router.py
│ │ ├── schemas.py
│ │ ├── models.py
│ │ ├── dependencies.py
│ │ ├── service.py
│ │ ├── constants.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── expense_templates/ # Expense templates domain
│ │ ├── router.py
│ │ ├── schemas.py
│ │ ├── models.py
│ │ ├── dependencies.py
│ │ ├── service.py
│ │ ├── constants.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── cycles/ # Cycle management domain
│ │ ├── router.py
│ │ ├── schemas.py
│ │ ├── models.py
│ │ ├── dependencies.py
│ │ ├── service.py
│ │ ├── constants.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── expenses/ # Individual expenses domain
│ │ ├── router.py
│ │ ├── schemas.py
│ │ ├── models.py
│ │ ├── dependencies.py
│ │ ├── service.py
│ │ ├── constants.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── reports/ # Analytics and reporting domain
│ │ ├── router.py
│ │ ├── schemas.py
│ │ ├── service.py
│ │ ├── constants.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── exchange_rates/ # Currency exchange domain
│ │ ├── client.py # External API client
│ │ ├── schemas.py
│ │ ├── models.py
│ │ ├── service.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── exceptions.py
│ │ └── utils.py
│ ├── config.py # Global application configuration
│ ├── database.py # Database connection and session management
│ ├── models.py # Global database models and base classes
│ ├── exceptions.py # Global exception classes
│ ├── dependencies.py # Global dependencies (DB session, current user)
│ ├── pagination.py # Pagination utilities
│ └── main.py # FastAPI application factory
├── tests/ # Test suite organized by domain
│ ├── auth/
│ ├── payment_methods/
│ ├── expense_templates/
│ ├── cycles/
│ ├── expenses/
│ ├── reports/
│ ├── exchange_rates/
│ ├── conftest.py # Pytest configuration and fixtures
│ └── utils.py # Test utilities
├── .env # Environment variables
├── alembic.ini # Alembic configuration
├── pyproject.toml # Project configuration and dependencies
└── README.md
Domain Architecture
Each domain follows a consistent internal structure with clear responsibilities:
File Responsibilities
router.py
Contains all HTTP endpoints for the domain. Handles request/response, validation, and delegates business logic to services.
# Example: app/cycles/router.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.auth.models import User
from . import service, schemas
router = APIRouter(prefix="/cycles", tags=["cycles"])
@router.post("/", response_model=schemas.CycleResponse)
async def create_cycle(
cycle_data: schemas.CycleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
return await service.create_cycle(db, cycle_data, current_user.id)
schemas.py
Pydantic models for request/response validation and serialization.
# Example: app/cycles/schemas.py
from datetime import date
from decimal import Decimal
from typing import Optional
from pydantic import BaseModel
class CycleCreate(BaseModel):
name: str
start_date: date
end_date: date
income_amount: Decimal
generate_from_templates: bool = True
class CycleResponse(BaseModel):
id: str
name: str
start_date: date
end_date: date
income_amount: Decimal
status: str
created_at: datetime
class Config:
from_attributes = True
models.py
SQLAlchemy database models representing the domain's data structure.
# Example: app/cycles/models.py
from sqlalchemy import Column, String, Date, DECIMAL, ForeignKey
from sqlalchemy.dialects.postgresql import UUID, ENUM
from sqlalchemy.orm import relationship
from app.models import BaseModel, CycleStatus
class Cycle(BaseModel):
__tablename__ = "cycles"
name = Column(String(100), nullable=False)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
start_date = Column(Date, nullable=False)
end_date = Column(Date, nullable=False)
income_amount = Column(DECIMAL(10, 2), default=0)
status = Column(ENUM(CycleStatus), default=CycleStatus.DRAFT)
# Relationships
user = relationship("User", back_populates="cycles")
expenses = relationship("CycleExpense", back_populates="cycle")
service.py
Contains the domain's business logic, data processing, and complex operations.
# Example: app/cycles/service.py
from sqlalchemy.orm import Session
from typing import List
from . import models, schemas
from ..expense_templates import service as template_service
from ..expenses import service as expense_service
async def create_cycle(
db: Session,
cycle_data: schemas.CycleCreate,
user_id: str
) -> models.Cycle:
"""Create a new cycle and optionally generate expenses from templates."""
# Create cycle
cycle = models.Cycle(
name=cycle_data.name,
user_id=user_id,
start_date=cycle_data.start_date,
end_date=cycle_data.end_date,
income_amount=cycle_data.income_amount
)
db.add(cycle)
db.flush() # Get cycle ID
# Generate expenses from templates if requested
if cycle_data.generate_from_templates:
await _generate_expenses_from_templates(db, cycle)
db.commit()
return cycle
dependencies.py
Domain-specific dependencies for dependency injection.
# Example: app/cycles/dependencies.py
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.auth.models import User
from . import service
from .constants import ErrorCode
async def get_cycle_by_id(
cycle_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Dependency to get cycle by ID and verify ownership."""
cycle = await service.get_cycle_by_id(db, cycle_id, current_user.id)
if not cycle:
raise HTTPException(
status_code=404,
detail=ErrorCode.CYCLE_NOT_FOUND
)
return cycle
constants.py
Domain-specific constants, error codes, and enums.
# Example: app/cycles/constants.py
from enum import Enum
class ErrorCode:
CYCLE_NOT_FOUND = "CYCLE_NOT_FOUND"
CYCLE_ALREADY_COMPLETED = "CYCLE_ALREADY_COMPLETED"
INVALID_CYCLE_DATES = "INVALID_CYCLE_DATES"
CYCLE_NAME_EXISTS = "CYCLE_NAME_EXISTS"
class CycleStatus(str, Enum):
DRAFT = "draft"
ACTIVE = "active"
COMPLETED = "completed"
# Business constants
MAX_CYCLE_DURATION_DAYS = 45
DEFAULT_CYCLE_DURATION_DAYS = 42 # 6 weeks
exceptions.py
Domain-specific exception classes.
# Example: app/cycles/exceptions.py
from app.exceptions import AppException
from .constants import ErrorCode
class CycleNotFoundException(AppException):
def __init__(self):
super().__init__(
error_code=ErrorCode.CYCLE_NOT_FOUND,
message="Cycle not found",
status_code=404
)
class CycleAlreadyCompletedException(AppException):
def __init__(self):
super().__init__(
error_code=ErrorCode.CYCLE_ALREADY_COMPLETED,
message="Cannot modify completed cycle",
status_code=409
)
utils.py
Domain-specific utility functions that don't contain business logic.
# Example: app/cycles/utils.py
from datetime import date, timedelta
from typing import Tuple
def calculate_cycle_dates(start_date: date) -> Tuple[date, date]:
"""Calculate 6-week cycle end date from start date."""
end_date = start_date + timedelta(days=42) # 6 weeks
return start_date, end_date
def format_cycle_name(start_date: date, end_date: date) -> str:
"""Generate a descriptive cycle name from dates."""
return f"{start_date.strftime('%B %Y')} Cycle"
Global Architecture Components
app/main.py
Application factory that initializes FastAPI with all domain routers.
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.auth.router import router as auth_router
from app.cycles.router import router as cycles_router
from app.expenses.router import router as expenses_router
# ... other routers
def create_app() -> FastAPI:
app = FastAPI(
title="Colony API",
version="1.0.0",
description="Personal expense management API"
)
# Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_HOSTS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include domain routers
app.include_router(auth_router, prefix="/api/v1")
app.include_router(cycles_router, prefix="/api/v1")
app.include_router(expenses_router, prefix="/api/v1")
return app
app = create_app()
app/database.py
Database connection and session management.
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Database session dependency."""
db = SessionLocal()
try:
yield db
finally:
db.close()
app/models.py
Base model classes and global models.
from sqlalchemy import Column, DateTime, Boolean
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import uuid
Base = declarative_base()
class BaseModel(Base):
"""Base model with common fields."""
__abstract__ = True
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
active = Column(Boolean, default=True)
app/dependencies.py
Global dependencies used across domains.
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from sqlalchemy.orm import Session
from app.database import get_db
from app.auth import service as auth_service
security = HTTPBearer()
async def get_current_user(
token: str = Depends(security),
db: Session = Depends(get_db)
):
"""Get current authenticated user from JWT token."""
user = await auth_service.get_user_from_token(db, token.credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return user
Cross-Domain Communication
When domains need to interact, they import services explicitly:
# In app/cycles/service.py
from app.expense_templates import service as template_service
from app.expenses import service as expense_service
from app.exchange_rates import service as exchange_service
async def create_cycle_with_expenses(db: Session, cycle_data: schemas.CycleCreate):
# Get user's templates
templates = await template_service.get_active_templates(db, cycle_data.user_id)
# Generate expenses for each template
for template in templates:
expenses = await expense_service.generate_from_template(
db, template, cycle_data.start_date, cycle_data.end_date
)
# Convert currencies if needed
for expense in expenses:
if expense.currency != "USD":
usd_amount = await exchange_service.convert_to_usd(
expense.amount, expense.currency
)
expense.amount_usd = usd_amount
Data Flow Architecture
graph TD
A[HTTP Request] --> B[Router Layer]
B --> C[Pydantic Validation]
C --> D[Dependency Injection]
D --> E[Service Layer]
E --> F[Database Models]
F --> G[SQLAlchemy ORM]
G --> H[PostgreSQL]
E --> I[External Services]
I --> J[Exchange Rate APIs]
E --> K[Cross-Domain Services]
K --> L[Other Domain Services]
H --> G
G --> F
F --> E
E --> M[Response Schemas]
M --> N[HTTP Response]
Key Architecture Benefits
1. Modularity
- Each domain is self-contained
- Easy to add new features without affecting other domains
- Clear boundaries and responsibilities
2. Testability
- Each layer can be tested independently
- Mock dependencies easily
- Domain-specific test suites
3. Maintainability
- Consistent structure across domains
- Easy to locate and modify code
- Clear separation of concerns
4. Scalability
- Domains can be extracted to microservices if needed
- Independent development by different teams
- Horizontal scaling capabilities
5. Type Safety
- Full type annotations
- Pydantic validation
- SQLAlchemy type mapping
Authentication & Security
JWT Authentication
The application uses JSON Web Tokens (JWT) for stateless authentication:
# app/auth/config.py
class AuthSettings(BaseSettings):
SECRET_KEY: str = "your-secret-key-here" # Change in production
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
class Config:
env_file = ".env"
env_prefix = "AUTH_"
Token Flow
- User authenticates via
/auth/loginwith email/password - System returns JWT token with user claims
- Client includes token in
Authorization: Bearer <token>header - System validates token on protected endpoints
Password Security
- Passwords hashed using Argon2ID algorithm
- Minimum 8 characters required
- Salt automatically generated per password
# app/auth/utils.py
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher()
def hash_password(password: str) -> str:
"""Hash password using Argon2ID."""
return ph.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
try:
ph.verify(hashed_password, plain_password)
return True
except VerifyMismatchError:
return False
Authentication Dependencies
Global authentication handled via dependency injection:
# app/dependencies.py
from app.auth.dependencies import get_current_active_user
# Use in any protected endpoint
@router.get("/protected")
async def protected_endpoint(
current_user: CurrentActiveUser # Automatically validates JWT
):
return {"user": current_user.email}
Error Handling Architecture
Exception Hierarchy
Structured exception handling with consistent error responses:
# app/exceptions.py
class AppException(Exception):
"""Base application exception."""
def __init__(self, error_code: str, message: str, status_code: int = 400, details: dict = None):
self.error_code = error_code
self.message = message
self.status_code = status_code
self.details = details or {}
super().__init__(self.message)
# Domain-specific exceptions inherit from AppException
class UserNotFoundException(AppException):
def __init__(self):
super().__init__(
error_code="USER_NOT_FOUND",
message="User not found",
status_code=404
)
Global Exception Handlers
Centralized error handling for consistent API responses:
# app/exceptions.py
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
async def app_exception_handler(request: Request, exc: AppException):
"""Handle application-specific exceptions."""
return JSONResponse(
status_code=exc.status_code,
content={
"success": False,
"error": {
"code": exc.error_code,
"message": exc.message,
"details": exc.details
}
}
)
async def validation_exception_handler(request: Request, exc: ValueError):
"""Handle validation errors."""
return JSONResponse(
status_code=422,
content={
"success": False,
"error": {
"code": "VALIDATION_ERROR",
"message": str(exc),
"details": {}
}
}
)
Error Response Format
All errors follow a consistent structure:
{
"success": false,
"error": {
"code": "USER_NOT_FOUND",
"message": "User not found",
"details": {
"user_id": "123e4567-e89b-12d3-a456-426614174000"
}
}
}
Database Architecture
Connection Management
PostgreSQL connection using SQLAlchemy with connection pooling:
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.config import settings
# Connection pool configuration
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True, # Verify connections before use
pool_recycle=300, # Recycle connections every 5 minutes
pool_size=10, # Connection pool size
max_overflow=20 # Additional connections if needed
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Migration Strategy
Database versioning using Alembic:
# Generate migration
alembic revision --autogenerate -m "add user table"
# Apply migrations
alembic upgrade head
# Rollback
alembic downgrade -1
Model Conventions
Consistent database modeling patterns:
# app/models.py
import uuid
from datetime import datetime
from sqlalchemy import Column, DateTime, Boolean, String
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class BaseModel(Base):
"""Base model with common fields and behaviors."""
__abstract__ = True
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
active = Column(Boolean, default=True, nullable=False)
def to_dict(self) -> dict:
"""Convert model to dictionary."""
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
Validation & Serialization
Pydantic Models
Type-safe request/response handling:
# Domain schemas follow naming conventions
class UserCreate(BaseModel): # For POST requests
email: EmailStr
password: str
first_name: Optional[str]
class UserUpdate(BaseModel): # For PUT/PATCH requests
first_name: Optional[str]
last_name: Optional[str]
class UserResponse(BaseModel): # For API responses
id: str
email: str
first_name: Optional[str]
active: bool
created_at: datetime
class Config:
from_attributes = True # Enable SQLAlchemy model conversion
Custom Validators
Domain-specific validation logic:
from pydantic import validator
from decimal import Decimal
class ExpenseCreate(BaseModel):
amount: Decimal
description: str
@validator('amount')
def amount_must_be_positive(cls, v):
if v <= 0:
raise ValueError('Amount must be positive')
if v > Decimal('999999.99'):
raise ValueError('Amount too large')
return v
@validator('description')
def description_length(cls, v):
if len(v.strip()) < 1:
raise ValueError('Description cannot be empty')
return v.strip()
Testing Architecture
Test Organization
Tests mirror the domain structure:
tests/
├── conftest.py # Global test fixtures
├── test_auth/ # Authentication tests
│ ├── test_service.py # Business logic tests
│ ├── test_router.py # API endpoint tests
│ └── test_models.py # Database model tests
├── test_cycles/
└── utils.py # Test utilities
Test Database
Isolated test database for each test session:
# tests/conftest.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.database import Base, get_db
from app.main import app
# Test database
TEST_DATABASE_URL = "postgresql://test_user:test_pass@localhost/test_colony"
test_engine = create_engine(TEST_DATABASE_URL)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=test_engine)
@pytest.fixture(scope="session")
def test_db():
"""Create test database tables."""
Base.metadata.create_all(bind=test_engine)
yield
Base.metadata.drop_all(bind=test_engine)
@pytest.fixture
def db_session(test_db):
"""Provide database session for tests."""
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture
def client(db_session):
"""FastAPI test client with database override."""
def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
yield TestClient(app)
app.dependency_overrides.clear()
Test Patterns
Consistent testing approaches:
# tests/test_auth/test_service.py
import pytest
from app.auth import service
from app.auth.exceptions import UserAlreadyExistsException
class TestAuthService:
def test_create_user_success(self, db_session):
"""Test successful user creation."""
user_data = {
"email": "test@example.com",
"password": "securepass123"
}
user = service.create_user(db_session, user_data)
assert user.email == "test@example.com"
assert user.active is True
assert user.id is not None
def test_create_user_duplicate_email(self, db_session):
"""Test duplicate email handling."""
user_data = {
"email": "test@example.com",
"password": "securepass123"
}
# Create first user
service.create_user(db_session, user_data)
# Attempt to create duplicate
with pytest.raises(UserAlreadyExistsException):
service.create_user(db_session, user_data)
Configuration Management
Environment-based Config
Secure configuration using Pydantic settings:
# app/config.py
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
# Application
APP_NAME: str = "Colony API"
VERSION: str = "1.0.0"
DEBUG: bool = False
# Database
DATABASE_URL: str
# Security
SECRET_KEY: str
ALLOWED_HOSTS: List[str] = ["*"]
# External APIs
EXCHANGE_RATE_API_KEY: str
EXCHANGE_RATE_API_URL: str = "https://api.exchangerate-api.com/v4/latest"
# Logging
LOG_LEVEL: str = "INFO"
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
Environment Files
Development and production configurations:
# .env.development
DATABASE_URL=postgresql://colony_user:colony_pass@localhost:5432/colony_dev
SECRET_KEY=development-secret-key
DEBUG=true
LOG_LEVEL=DEBUG
# .env.production
DATABASE_URL=postgresql://user:pass@prod-db:5432/colony_prod
SECRET_KEY=${COLONY_SECRET_KEY}
DEBUG=false
LOG_LEVEL=INFO
ALLOWED_HOSTS=["api.colony.app"]
Logging & Monitoring
Structured Logging
Consistent logging across all domains:
# app/logging.py
import logging
import json
from typing import Any, Dict
class JSONFormatter(logging.Formatter):
"""JSON log formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data: Dict[str, Any] = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"module": record.module,
"message": record.getMessage(),
}
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Usage in services
logger = logging.getLogger(__name__)
async def create_user(db: Session, user_data: dict):
logger.info("Creating new user", extra={"email": user_data["email"]})
try:
# ... business logic
logger.info("User created successfully", extra={"user_id": str(user.id)})
return user
except Exception as e:
logger.error("Failed to create user", extra={"error": str(e)})
raise
Performance Considerations
Database Query Optimization
- Use
select_related()for foreign key relationships - Implement pagination for large datasets
- Add database indexes for frequently queried fields
- Use database-level constraints for data integrity
Caching Strategy
- Redis for session data and frequently accessed lookups
- Application-level caching for exchange rates
- Query result caching for expensive reports
Async Operations
- Use
async/awaitfor I/O operations - Background tasks for non-critical operations (emails, reports)
- Connection pooling for database and external API calls
This architecture provides a robust foundation for building a maintainable, scalable, and secure expense management API while following FastAPI and Python best practices.