Loading IconFastLaunchAPI
Features

Database Management

Learn how to work with async SQLAlchemy models, run migrations, and extend the database schema in FastLaunchAPI

Overview

The database architecture in FastLaunchAPI follows a modular, feature-driven approach with full async/await support using SQLAlchemy 2.0's async features. Each component of your application manages its own database models, promoting maintainability, scalability, and modern Python patterns.

Key Features

Async SQLAlchemy

Full async/await support with SQLAlchemy 2.0 and asyncpg driver

Alembic Migrations

Automatic schema migrations with version control

Modular Design

Models organized by feature for better maintainability

Connection Pooling

Optimized async database connections for production use

Type Safety

Full type annotations with FastAPI dependency injection

Architecture Structure

backend/
├── app/
│   ├── db/
│   │   ├── database.py          # Core async database configuration
│   │   └── __init__.py
│   └── routers/
│       ├── auth/
│       │   ├── models.py        # User authentication models
│       │   └── ...
│       ├── payments/
│       │   ├── models.py        # Payment-related models
│       │   └── ...
│       └── your_feature/
│           ├── models.py        # Your custom models
│           └── ...
├── alembic/
│   ├── env.py                   # Alembic configuration
│   ├── versions/                # Migration files
│   └── alembic.ini             # Alembic settings
└── docker-compose.yml          # Database container setup

Database Configuration

The core database configuration is centralized in app/db/database.py using async SQLAlchemy. Let's break down each component:

import os
from dotenv import load_dotenv
from fastapi import Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from typing import Annotated

# Load environment variables from .env file
load_dotenv()

# Get database URL from environment (must use async driver)
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL")

# Create async SQLAlchemy engine
engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=False)

# Create async session factory
SessionLocal = async_sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False  # Prevents lazy-loading issues after commit
)

# Create Base class for declarative models
Base = declarative_base()

# Async database dependency function
async def get_db():
    """
    Async database dependency that provides database sessions.
    Automatically handles session creation and cleanup.
    """
    async with SessionLocal() as session:
        yield session

# Type annotation for FastAPI dependency injection
db_dependency = Annotated[AsyncSession, Depends(get_db)]

Component Explanations

  • create_async_engine: Creates an async engine that manages database connections with async/await support
  • async_sessionmaker: Factory for creating async database sessions
  • AsyncSession: Async version of SQLAlchemy's Session class
  • expire_on_commit=False: Prevents SQLAlchemy from expiring all objects after commit, avoiding lazy-loading issues
  • async with SessionLocal(): Async context manager that automatically handles session cleanup
  • async def get_db(): Async dependency function for FastAPI routes
  • db_dependency: Type annotation combining AsyncSession with dependency injection

Async Driver Required: When using async SQLAlchemy, you must use an async database driver. For PostgreSQL, use postgresql+asyncpg:// instead of postgresql://.

Setting Up Your Database

Configure Environment Variables

Create a .env file in your project root with your database connection details using async drivers:

# PostgreSQL with asyncpg (recommended for production)
DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/your_database

# SQLite with aiosqlite (good for development and testing)
DATABASE_URL=sqlite+aiosqlite:///./app.db

# MySQL with aiomysql (if you prefer MySQL)
DATABASE_URL=mysql+aiomysql://username:password@localhost:3306/your_database

Important: Notice the +asyncpg, +aiosqlite, or +aiomysql suffix in the URL. These specify the async drivers required for async SQLAlchemy.

Install Required Dependencies

Make sure you have the necessary async database drivers installed:

# For PostgreSQL (recommended)
pip install asyncpg

# For SQLite
pip install aiosqlite

# For MySQL
pip install aiomysql

These should already be in your requirements.txt.

Start Database Server (Docker)

If you're using the included Docker setup, start your PostgreSQL database:

# Start PostgreSQL container
docker-compose up -d postgres

# Verify the container is running
docker ps

The Docker configuration in docker-compose.yml sets up a PostgreSQL instance:

services:
  postgres:
    image: postgres
    container_name: template-postgres
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: template-db
    volumes:
      - postgres-data:/var/lib/postgresql/data
    restart: unless-stopped

Initialize Database Tables

The database tables are automatically created when you run your FastAPI application. In main.py, you'll see:

from contextlib import asynccontextmanager
from app.db.database import engine
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: Create database tables asynchronously
    async with engine.begin() as conn:
        await conn.run_sync(auth_models.Base.metadata.create_all)
        await conn.run_sync(payments_models.Base.metadata.create_all)

    yield

    # Shutdown: Clean up resources
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

This ensures all your model tables exist when the application starts.

Working with Models

Understanding the User Model

Let's examine the built-in User model to understand the patterns and conventions used:

from sqlalchemy.orm import relationship
from ...db.database import Base
from sqlalchemy import Column, Integer, String, Boolean

class User(Base):
    """User model for authentication"""
    __tablename__ = "users"

    # Primary key with auto-increment
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)

    # Unique user identifiers
    username = Column(String, unique=True)
    email = Column(String, unique=True)

    # Password storage (always hashed, never plain text)
    hashed_password = Column(String)

    # Email verification status
    is_verified = Column(Boolean, default=False)

    # OAuth integration fields
    is_oauth = Column(Boolean, default=False)
    google_sub = Column(String(100), nullable=True, unique=True, index=True)

    # Email verification and password reset tokens
    verification_token = Column(String, nullable=True)
    reset_token = Column(String, nullable=True)

    # Stripe payment integration
    customer_id = Column(String(255), nullable=True)
    plan_id = Column(Integer, nullable=True)
    subscription_id = Column(String(255), nullable=True)
    subscription_status = Column(String(64), nullable=True)
    subscription_last_renew = Column(String, nullable=True)
    subscription_next_renew = Column(String, nullable=True)

Model Best Practices

  • __tablename__: Explicitly define table names using plural nouns (users, products, orders)
  • Primary Keys: Always use id as the primary key with auto-increment
  • Indexes: Add index=True to frequently queried columns for performance
  • Unique Constraints: Use unique=True for fields that must be unique across all records
  • Nullable Fields: Use nullable=True for optional fields, nullable=False (default) for required fields
  • Default Values: Set sensible defaults with default=value
  • String Lengths: Specify maximum lengths for VARCHAR columns: String(100)

Creating New Models

Create the Model File

Create a new model in your feature directory, for example app/routers/blog/models.py:

from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from app.db.database import Base

class BlogPost(Base):
    """Blog post model for content management"""
    __tablename__ = "blog_posts"

    # Primary key
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)

    # Content fields
    title = Column(String(200), nullable=False, index=True)
    slug = Column(String(200), nullable=False, unique=True, index=True)
    content = Column(Text, nullable=False)
    excerpt = Column(String(500), nullable=True)

    # Status and visibility
    is_published = Column(Boolean, default=False)
    is_featured = Column(Boolean, default=False)

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    published_at = Column(DateTime, nullable=True)

    # Foreign key to User model
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    # Relationship to User model
    author = relationship("User", back_populates="blog_posts")

class BlogCategory(Base):
    """Blog category model for organizing posts"""
    __tablename__ = "blog_categories"

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    name = Column(String(100), nullable=False, unique=True)
    slug = Column(String(100), nullable=False, unique=True, index=True)
    description = Column(Text, nullable=True)
    is_active = Column(Boolean, default=True)

    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

Update the User Model (if needed)

If you want bidirectional relationships, update the User model:

# In app/routers/auth/models.py
class User(Base):
    # ... existing fields ...

    # Add relationship back to blog posts
    blog_posts = relationship("BlogPost", back_populates="author")

Register Models in Main Application

Update your main.py lifespan to include the new models:

from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models
from app.routers.blog import models as blog_models  # Add this import

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create database tables for all models
    async with engine.begin() as conn:
        await conn.run_sync(auth_models.Base.metadata.create_all)
        await conn.run_sync(payments_models.Base.metadata.create_all)
        await conn.run_sync(blog_models.Base.metadata.create_all)  # Add this

    yield

    await engine.dispose()

Update Alembic Configuration

Modify alembic/env.py to include your new models in migrations:

# Import all model modules
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models
from app.routers.blog import models as blog_models  # Add this import

# Add all metadata to target
target_metadata = [
    auth_models.Base.metadata,
    payments_models.Base.metadata,
    blog_models.Base.metadata,  # Add this
]

Important: You must add every new model module to both main.py and alembic/env.py for proper table creation and migration support.

Database Migrations with Alembic

Alembic handles database schema changes through versioned migrations. The migrations work with both sync and async databases.

Understanding Alembic Configuration

The alembic/env.py file configures how Alembic discovers and manages your models:

import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Alembic configuration object
config = context.config

# Setup logging from alembic.ini
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Import all your model modules
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models

# This tells Alembic about all your models
target_metadata = [auth_models.Base.metadata]

def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode - generates SQL without database connection"""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    """Run migrations in 'online' mode - executes against live database"""
    # Get database URL from environment
    db_url = os.getenv('DATABASE_URL')
    if db_url:
        config.set_main_option("sqlalchemy.url", db_url)

    # Create engine (note: migrations use sync engine even with async app)
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )
        with context.begin_transaction():
            context.run_migrations()

# Run appropriate migration mode
if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Migrations Use Sync Engine: Alembic migrations use the synchronous SQLAlchemy engine even when your application uses async. The postgresql+asyncpg:// URL automatically falls back to the sync driver during migrations.

Creating and Running Migrations

Generate Migration from Model Changes

After creating or modifying models, generate a migration file:

# Auto-generate migration with descriptive message
alembic revision --autogenerate -m "Add blog post and category models"

# For specific changes
alembic revision --autogenerate -m "Add email verification to users"
alembic revision --autogenerate -m "Add indexes to blog posts"

Review Generated Migration

Always review the generated migration file in alembic/versions/. Here's an example:

"""Add blog post and category models

Revision ID: abc123def456
Revises: previous_revision
Create Date: 2024-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa

revision = 'abc123def456'
down_revision = 'previous_revision'

def upgrade():
    # Create blog_categories table
    op.create_table('blog_categories',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('name', sa.String(length=100), nullable=False),
        sa.Column('slug', sa.String(length=100), nullable=False),
        sa.Column('description', sa.Text(), nullable=True),
        sa.Column('is_active', sa.Boolean(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=True),
        sa.Column('updated_at', sa.DateTime(), nullable=True),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('name'),
        sa.UniqueConstraint('slug')
    )
    op.create_index('ix_blog_categories_slug', 'blog_categories', ['slug'])

    # Create blog_posts table
    op.create_table('blog_posts',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('title', sa.String(length=200), nullable=False),
        sa.Column('slug', sa.String(length=200), nullable=False),
        sa.Column('content', sa.Text(), nullable=False),
        sa.Column('author_id', sa.Integer(), nullable=False),
        sa.ForeignKeyConstraint(['author_id'], ['users.id'], ),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('slug')
    )
    op.create_index('ix_blog_posts_slug', 'blog_posts', ['slug'])

def downgrade():
    op.drop_table('blog_posts')
    op.drop_table('blog_categories')

Apply Migration to Database

# Apply all pending migrations
alembic upgrade head

# Apply to specific revision
alembic upgrade abc123def456

# Apply one migration at a time
alembic upgrade +1

Verify Migration Success

# Show current migration status
alembic current

# Show migration history
alembic history --verbose

# Show pending migrations
alembic show head

Rolling Back Migrations

# Rollback to previous migration
alembic downgrade -1

# Rollback to specific revision
alembic downgrade abc123def456

# Rollback all migrations (dangerous!)
alembic downgrade base

Using Async Database in Your Application

Async Database Operations in Routes

Here's how to use the async database in your FastAPI routes:

from fastapi import HTTPException
from sqlalchemy import select
from app.db.database import db_dependency
from app.routers.blog.models import BlogPost, BlogCategory

@app.get("/blog/posts")
async def get_blog_posts(
    db: db_dependency,
    skip: int = 0,
    limit: int = 10,
    published_only: bool = True
):
    """Get blog posts with pagination and filtering"""
    # Use SQLAlchemy 2.0 select() syntax
    query = select(BlogPost)

    if published_only:
        query = query.where(BlogPost.is_published == True)

    # Execute async query
    result = await db.execute(query.offset(skip).limit(limit))
    posts = result.scalars().all()

    return posts

@app.get("/blog/posts/{post_id}")
async def get_blog_post(post_id: int, db: db_dependency):
    """Get single blog post"""
    result = await db.execute(
        select(BlogPost).where(BlogPost.id == post_id)
    )
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    return post

@app.post("/blog/posts")
async def create_blog_post(
    post_data: BlogPostCreate,
    db: db_dependency,
    current_user: User = Depends(get_current_user)
):
    """Create new blog post"""
    db_post = BlogPost(
        title=post_data.title,
        slug=post_data.slug,
        content=post_data.content,
        excerpt=post_data.excerpt,
        author_id=current_user.id
    )

    db.add(db_post)
    await db.commit()
    await db.refresh(db_post)

    return db_post

@app.put("/blog/posts/{post_id}")
async def update_blog_post(
    post_id: int,
    post_data: BlogPostUpdate,
    db: db_dependency,
    current_user: User = Depends(get_current_user)
):
    """Update existing blog post"""
    result = await db.execute(
        select(BlogPost).where(BlogPost.id == post_id)
    )
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    if post.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")

    # Update fields
    for field, value in post_data.dict(exclude_unset=True).items():
        setattr(post, field, value)

    await db.commit()
    await db.refresh(post)

    return post

@app.delete("/blog/posts/{post_id}")
async def delete_blog_post(
    post_id: int,
    db: db_dependency,
    current_user: User = Depends(get_current_user)
):
    """Delete blog post"""
    result = await db.execute(
        select(BlogPost).where(BlogPost.id == post_id)
    )
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")

    if post.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")

    await db.delete(post)
    await db.commit()

    return {"message": "Post deleted successfully"}

SQLAlchemy 2.0 Style: Notice we use select() instead of db.query(). This is the modern SQLAlchemy 2.0 syntax that works with async. Also note await on all database operations like execute(), commit(), refresh(), and delete().

Advanced Async Query Examples

from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload
from datetime import datetime, timedelta

# Count queries
result = await db.execute(select(func.count(BlogPost.id)))
post_count = result.scalar()

# Eager loading relationships
result = await db.execute(
    select(BlogPost).options(selectinload(BlogPost.author))
)
posts_with_authors = result.scalars().all()

# Complex filtering
result = await db.execute(
    select(BlogPost).where(
        and_(
            BlogPost.is_published == True,
            BlogPost.created_at >= datetime.utcnow() - timedelta(days=7)
        )
    )
)
recent_posts = result.scalars().all()

# Ordering and limiting
result = await db.execute(
    select(BlogPost)
    .where(BlogPost.is_published == True)
    .order_by(BlogPost.created_at.desc())
    .limit(5)
)
popular_posts = result.scalars().all()

# Joins and aggregations
result = await db.execute(
    select(User.username, func.count(BlogPost.id).label('post_count'))
    .join(BlogPost)
    .group_by(User.username)
)
posts_by_author = result.all()

Best Practices and Performance

Async Best Practices

  1. Always Await Database Operations

    # Correct
    await db.commit()
    await db.refresh(user)
    result = await db.execute(query)
    
    # Wrong - will not work
    db.commit()
    db.refresh(user)
  2. Use SQLAlchemy 2.0 Syntax

    # Modern (recommended)
    result = await db.execute(select(User).where(User.id == 1))
    user = result.scalar_one_or_none()
    
    # Old style (avoid with async)
    user = db.query(User).filter(User.id == 1).first()
  3. Eager Load Relationships

    # Prevents N+1 queries
    from sqlalchemy.orm import selectinload
    
    result = await db.execute(
        select(BlogPost).options(selectinload(BlogPost.author))
    )

Performance Tips

  1. Use Indexes Wisely

    slug = Column(String(100), index=True)
    created_at = Column(DateTime, index=True)
  2. Optimize Queries

    # Select specific columns
    result = await db.execute(select(User.username, User.email))
    
    # Use pagination
    result = await db.execute(query.offset(skip).limit(limit))
  3. Connection Pooling

    # Configure in database.py
    engine = create_async_engine(
        SQLALCHEMY_DATABASE_URL,
        pool_size=10,
        max_overflow=20,
        pool_pre_ping=True  # Verify connections
    )

Troubleshooting

Common Issues

"No async driver for postgresql://"

  • Solution: Use postgresql+asyncpg:// in your DATABASE_URL

"Cannot use sync operations on async session"

  • Solution: Use await on all database operations

"Lazy loading is not supported"

  • Solution: Use eager loading with selectinload() or joinedload()

"Database is locked" (SQLite)

  • Solution: Use sqlite+aiosqlite:/// and consider PostgreSQL for production

Debug Mode

Enable SQL logging to see generated queries:

# In database.py
engine = create_async_engine(
    SQLALCHEMY_DATABASE_URL,
    echo=True  # Logs all SQL queries
)

Production Ready: The async SQLAlchemy setup provides excellent performance for modern FastAPI applications with proper connection pooling and async/await support throughout the stack.