Database Management
Learn how to work with async SQLAlchemy models, run migrations, and extend the database schema in FastLaunchAPI
Overview
The database architecture in FastLaunchAPI follows a modular, feature-driven approach with full async/await support using SQLAlchemy 2.0's async features. Each component of your application manages its own database models, promoting maintainability, scalability, and modern Python patterns.
Key Features
Async SQLAlchemy
Full async/await support with SQLAlchemy 2.0 and asyncpg driver
Alembic Migrations
Automatic schema migrations with version control
Modular Design
Models organized by feature for better maintainability
Connection Pooling
Optimized async database connections for production use
Type Safety
Full type annotations with FastAPI dependency injection
Architecture Structure
backend/
├── app/
│ ├── db/
│ │ ├── database.py # Core async database configuration
│ │ └── __init__.py
│ └── routers/
│ ├── auth/
│ │ ├── models.py # User authentication models
│ │ └── ...
│ ├── payments/
│ │ ├── models.py # Payment-related models
│ │ └── ...
│ └── your_feature/
│ ├── models.py # Your custom models
│ └── ...
├── alembic/
│ ├── env.py # Alembic configuration
│ ├── versions/ # Migration files
│ └── alembic.ini # Alembic settings
└── docker-compose.yml # Database container setupDatabase Configuration
The core database configuration is centralized in app/db/database.py using async SQLAlchemy. Let's break down each component:
import os
from dotenv import load_dotenv
from fastapi import Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from typing import Annotated
# Load environment variables from .env file
load_dotenv()
# Get database URL from environment (must use async driver)
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL")
# Create async SQLAlchemy engine
engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=False)
# Create async session factory
SessionLocal = async_sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession,
expire_on_commit=False # Prevents lazy-loading issues after commit
)
# Create Base class for declarative models
Base = declarative_base()
# Async database dependency function
async def get_db():
"""
Async database dependency that provides database sessions.
Automatically handles session creation and cleanup.
"""
async with SessionLocal() as session:
yield session
# Type annotation for FastAPI dependency injection
db_dependency = Annotated[AsyncSession, Depends(get_db)]Component Explanations
create_async_engine: Creates an async engine that manages database connections with async/await supportasync_sessionmaker: Factory for creating async database sessionsAsyncSession: Async version of SQLAlchemy's Session classexpire_on_commit=False: Prevents SQLAlchemy from expiring all objects after commit, avoiding lazy-loading issuesasync with SessionLocal(): Async context manager that automatically handles session cleanupasync def get_db(): Async dependency function for FastAPI routesdb_dependency: Type annotation combining AsyncSession with dependency injection
Async Driver Required: When using async SQLAlchemy, you must use an async
database driver. For PostgreSQL, use postgresql+asyncpg:// instead of
postgresql://.
Setting Up Your Database
Configure Environment Variables
Create a .env file in your project root with your database connection details using async drivers:
# PostgreSQL with asyncpg (recommended for production)
DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/your_database
# SQLite with aiosqlite (good for development and testing)
DATABASE_URL=sqlite+aiosqlite:///./app.db
# MySQL with aiomysql (if you prefer MySQL)
DATABASE_URL=mysql+aiomysql://username:password@localhost:3306/your_databaseImportant: Notice the +asyncpg, +aiosqlite, or +aiomysql suffix in
the URL. These specify the async drivers required for async SQLAlchemy.
Install Required Dependencies
Make sure you have the necessary async database drivers installed:
# For PostgreSQL (recommended)
pip install asyncpg
# For SQLite
pip install aiosqlite
# For MySQL
pip install aiomysqlThese should already be in your requirements.txt.
Start Database Server (Docker)
If you're using the included Docker setup, start your PostgreSQL database:
# Start PostgreSQL container
docker-compose up -d postgres
# Verify the container is running
docker psThe Docker configuration in docker-compose.yml sets up a PostgreSQL instance:
services:
postgres:
image: postgres
container_name: template-postgres
ports:
- 5432:5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: template-db
volumes:
- postgres-data:/var/lib/postgresql/data
restart: unless-stoppedInitialize Database Tables
The database tables are automatically created when you run your FastAPI application. In main.py, you'll see:
from contextlib import asynccontextmanager
from app.db.database import engine
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Create database tables asynchronously
async with engine.begin() as conn:
await conn.run_sync(auth_models.Base.metadata.create_all)
await conn.run_sync(payments_models.Base.metadata.create_all)
yield
# Shutdown: Clean up resources
await engine.dispose()
app = FastAPI(lifespan=lifespan)This ensures all your model tables exist when the application starts.
Working with Models
Understanding the User Model
Let's examine the built-in User model to understand the patterns and conventions used:
from sqlalchemy.orm import relationship
from ...db.database import Base
from sqlalchemy import Column, Integer, String, Boolean
class User(Base):
"""User model for authentication"""
__tablename__ = "users"
# Primary key with auto-increment
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Unique user identifiers
username = Column(String, unique=True)
email = Column(String, unique=True)
# Password storage (always hashed, never plain text)
hashed_password = Column(String)
# Email verification status
is_verified = Column(Boolean, default=False)
# OAuth integration fields
is_oauth = Column(Boolean, default=False)
google_sub = Column(String(100), nullable=True, unique=True, index=True)
# Email verification and password reset tokens
verification_token = Column(String, nullable=True)
reset_token = Column(String, nullable=True)
# Stripe payment integration
customer_id = Column(String(255), nullable=True)
plan_id = Column(Integer, nullable=True)
subscription_id = Column(String(255), nullable=True)
subscription_status = Column(String(64), nullable=True)
subscription_last_renew = Column(String, nullable=True)
subscription_next_renew = Column(String, nullable=True)Model Best Practices
__tablename__: Explicitly define table names using plural nouns (users, products, orders)- Primary Keys: Always use
idas the primary key with auto-increment - Indexes: Add
index=Trueto frequently queried columns for performance - Unique Constraints: Use
unique=Truefor fields that must be unique across all records - Nullable Fields: Use
nullable=Truefor optional fields,nullable=False(default) for required fields - Default Values: Set sensible defaults with
default=value - String Lengths: Specify maximum lengths for VARCHAR columns:
String(100)
Creating New Models
Create the Model File
Create a new model in your feature directory, for example app/routers/blog/models.py:
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from app.db.database import Base
class BlogPost(Base):
"""Blog post model for content management"""
__tablename__ = "blog_posts"
# Primary key
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Content fields
title = Column(String(200), nullable=False, index=True)
slug = Column(String(200), nullable=False, unique=True, index=True)
content = Column(Text, nullable=False)
excerpt = Column(String(500), nullable=True)
# Status and visibility
is_published = Column(Boolean, default=False)
is_featured = Column(Boolean, default=False)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
published_at = Column(DateTime, nullable=True)
# Foreign key to User model
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Relationship to User model
author = relationship("User", back_populates="blog_posts")
class BlogCategory(Base):
"""Blog category model for organizing posts"""
__tablename__ = "blog_categories"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(100), nullable=False, unique=True)
slug = Column(String(100), nullable=False, unique=True, index=True)
description = Column(Text, nullable=True)
is_active = Column(Boolean, default=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)Update the User Model (if needed)
If you want bidirectional relationships, update the User model:
# In app/routers/auth/models.py
class User(Base):
# ... existing fields ...
# Add relationship back to blog posts
blog_posts = relationship("BlogPost", back_populates="author")Register Models in Main Application
Update your main.py lifespan to include the new models:
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models
from app.routers.blog import models as blog_models # Add this import
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create database tables for all models
async with engine.begin() as conn:
await conn.run_sync(auth_models.Base.metadata.create_all)
await conn.run_sync(payments_models.Base.metadata.create_all)
await conn.run_sync(blog_models.Base.metadata.create_all) # Add this
yield
await engine.dispose()Update Alembic Configuration
Modify alembic/env.py to include your new models in migrations:
# Import all model modules
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models
from app.routers.blog import models as blog_models # Add this import
# Add all metadata to target
target_metadata = [
auth_models.Base.metadata,
payments_models.Base.metadata,
blog_models.Base.metadata, # Add this
]Important: You must add every new model module to both main.py and
alembic/env.py for proper table creation and migration support.
Database Migrations with Alembic
Alembic handles database schema changes through versioned migrations. The migrations work with both sync and async databases.
Understanding Alembic Configuration
The alembic/env.py file configures how Alembic discovers and manages your models:
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Alembic configuration object
config = context.config
# Setup logging from alembic.ini
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Import all your model modules
from app.routers.auth import models as auth_models
from app.routers.payments import models as payments_models
# This tells Alembic about all your models
target_metadata = [auth_models.Base.metadata]
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode - generates SQL without database connection"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode - executes against live database"""
# Get database URL from environment
db_url = os.getenv('DATABASE_URL')
if db_url:
config.set_main_option("sqlalchemy.url", db_url)
# Create engine (note: migrations use sync engine even with async app)
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
# Run appropriate migration mode
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()Migrations Use Sync Engine: Alembic migrations use the synchronous
SQLAlchemy engine even when your application uses async. The
postgresql+asyncpg:// URL automatically falls back to the sync driver during
migrations.
Creating and Running Migrations
Generate Migration from Model Changes
After creating or modifying models, generate a migration file:
# Auto-generate migration with descriptive message
alembic revision --autogenerate -m "Add blog post and category models"
# For specific changes
alembic revision --autogenerate -m "Add email verification to users"
alembic revision --autogenerate -m "Add indexes to blog posts"Review Generated Migration
Always review the generated migration file in alembic/versions/. Here's an example:
"""Add blog post and category models
Revision ID: abc123def456
Revises: previous_revision
Create Date: 2024-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'abc123def456'
down_revision = 'previous_revision'
def upgrade():
# Create blog_categories table
op.create_table('blog_categories',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('slug', sa.String(length=100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name'),
sa.UniqueConstraint('slug')
)
op.create_index('ix_blog_categories_slug', 'blog_categories', ['slug'])
# Create blog_posts table
op.create_table('blog_posts',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=200), nullable=False),
sa.Column('slug', sa.String(length=200), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('author_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['author_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('slug')
)
op.create_index('ix_blog_posts_slug', 'blog_posts', ['slug'])
def downgrade():
op.drop_table('blog_posts')
op.drop_table('blog_categories')Apply Migration to Database
# Apply all pending migrations
alembic upgrade head
# Apply to specific revision
alembic upgrade abc123def456
# Apply one migration at a time
alembic upgrade +1Verify Migration Success
# Show current migration status
alembic current
# Show migration history
alembic history --verbose
# Show pending migrations
alembic show headRolling Back Migrations
# Rollback to previous migration
alembic downgrade -1
# Rollback to specific revision
alembic downgrade abc123def456
# Rollback all migrations (dangerous!)
alembic downgrade baseUsing Async Database in Your Application
Async Database Operations in Routes
Here's how to use the async database in your FastAPI routes:
from fastapi import HTTPException
from sqlalchemy import select
from app.db.database import db_dependency
from app.routers.blog.models import BlogPost, BlogCategory
@app.get("/blog/posts")
async def get_blog_posts(
db: db_dependency,
skip: int = 0,
limit: int = 10,
published_only: bool = True
):
"""Get blog posts with pagination and filtering"""
# Use SQLAlchemy 2.0 select() syntax
query = select(BlogPost)
if published_only:
query = query.where(BlogPost.is_published == True)
# Execute async query
result = await db.execute(query.offset(skip).limit(limit))
posts = result.scalars().all()
return posts
@app.get("/blog/posts/{post_id}")
async def get_blog_post(post_id: int, db: db_dependency):
"""Get single blog post"""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@app.post("/blog/posts")
async def create_blog_post(
post_data: BlogPostCreate,
db: db_dependency,
current_user: User = Depends(get_current_user)
):
"""Create new blog post"""
db_post = BlogPost(
title=post_data.title,
slug=post_data.slug,
content=post_data.content,
excerpt=post_data.excerpt,
author_id=current_user.id
)
db.add(db_post)
await db.commit()
await db.refresh(db_post)
return db_post
@app.put("/blog/posts/{post_id}")
async def update_blog_post(
post_id: int,
post_data: BlogPostUpdate,
db: db_dependency,
current_user: User = Depends(get_current_user)
):
"""Update existing blog post"""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
# Update fields
for field, value in post_data.dict(exclude_unset=True).items():
setattr(post, field, value)
await db.commit()
await db.refresh(post)
return post
@app.delete("/blog/posts/{post_id}")
async def delete_blog_post(
post_id: int,
db: db_dependency,
current_user: User = Depends(get_current_user)
):
"""Delete blog post"""
result = await db.execute(
select(BlogPost).where(BlogPost.id == post_id)
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
if post.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
await db.delete(post)
await db.commit()
return {"message": "Post deleted successfully"}SQLAlchemy 2.0 Style: Notice we use select() instead of db.query().
This is the modern SQLAlchemy 2.0 syntax that works with async. Also note
await on all database operations like execute(), commit(), refresh(),
and delete().
Advanced Async Query Examples
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import selectinload
from datetime import datetime, timedelta
# Count queries
result = await db.execute(select(func.count(BlogPost.id)))
post_count = result.scalar()
# Eager loading relationships
result = await db.execute(
select(BlogPost).options(selectinload(BlogPost.author))
)
posts_with_authors = result.scalars().all()
# Complex filtering
result = await db.execute(
select(BlogPost).where(
and_(
BlogPost.is_published == True,
BlogPost.created_at >= datetime.utcnow() - timedelta(days=7)
)
)
)
recent_posts = result.scalars().all()
# Ordering and limiting
result = await db.execute(
select(BlogPost)
.where(BlogPost.is_published == True)
.order_by(BlogPost.created_at.desc())
.limit(5)
)
popular_posts = result.scalars().all()
# Joins and aggregations
result = await db.execute(
select(User.username, func.count(BlogPost.id).label('post_count'))
.join(BlogPost)
.group_by(User.username)
)
posts_by_author = result.all()Best Practices and Performance
Async Best Practices
-
Always Await Database Operations
# Correct await db.commit() await db.refresh(user) result = await db.execute(query) # Wrong - will not work db.commit() db.refresh(user) -
Use SQLAlchemy 2.0 Syntax
# Modern (recommended) result = await db.execute(select(User).where(User.id == 1)) user = result.scalar_one_or_none() # Old style (avoid with async) user = db.query(User).filter(User.id == 1).first() -
Eager Load Relationships
# Prevents N+1 queries from sqlalchemy.orm import selectinload result = await db.execute( select(BlogPost).options(selectinload(BlogPost.author)) )
Performance Tips
-
Use Indexes Wisely
slug = Column(String(100), index=True) created_at = Column(DateTime, index=True) -
Optimize Queries
# Select specific columns result = await db.execute(select(User.username, User.email)) # Use pagination result = await db.execute(query.offset(skip).limit(limit)) -
Connection Pooling
# Configure in database.py engine = create_async_engine( SQLALCHEMY_DATABASE_URL, pool_size=10, max_overflow=20, pool_pre_ping=True # Verify connections )
Troubleshooting
Common Issues
"No async driver for postgresql://"
- Solution: Use
postgresql+asyncpg://in your DATABASE_URL
"Cannot use sync operations on async session"
- Solution: Use
awaiton all database operations
"Lazy loading is not supported"
- Solution: Use eager loading with
selectinload()orjoinedload()
"Database is locked" (SQLite)
- Solution: Use
sqlite+aiosqlite:///and consider PostgreSQL for production
Debug Mode
Enable SQL logging to see generated queries:
# In database.py
engine = create_async_engine(
SQLALCHEMY_DATABASE_URL,
echo=True # Logs all SQL queries
)Production Ready: The async SQLAlchemy setup provides excellent performance for modern FastAPI applications with proper connection pooling and async/await support throughout the stack.