Authentication System
Complete JWT authentication with user management, email verification, OAuth integration, and role-based access control
Overview
FastLaunchAPI's authentication system provides everything you need for secure user management:
- JWT Authentication - Stateless tokens with automatic refresh
- OAuth Integration - Google OAuth with extensible provider system
- Email Verification - Secure email verification workflow
- Password Security - Bcrypt hashing with strength validation
- User Management - Complete profile management and updates
- Reset Functionality - Secure password reset with time-limited tokens
The authentication system is production-ready and includes all security best practices like password hashing, token expiration, and email verification. Built with async/await patterns for optimal performance.
Quick Start
Get authentication working in your application with these simple steps:
Register a New User
curl -X POST "http://localhost:8000/auth/create-user" \
-H "Content-Type: application/json" \
-d '{
"username": "john_doe",
"email": "[email protected]",
"password": "securepassword123"
}'Verify Email
User receives an email with verification link. When clicked, redirects to your frontend with confirmation.
Login and Get Tokens
curl -X POST "http://localhost:8000/auth/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=john_doe&password=securepassword123"Returns access and refresh tokens for authenticated requests.
Access Protected Routes
curl -X GET "http://localhost:8000/auth/get-user" \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"Core Features
🔐 JWT Authentication
Stateless authentication with access and refresh tokens
🌐 OAuth Integration
Google OAuth with support for multiple providers
📧 Email Verification
Secure email verification with HTML templates
🔄 Password Reset
Token-based password reset with time limits
👤 User Management
Complete profile management and updates
🛡️ Security Features
Bcrypt hashing, token expiration, and validation
Database Schema
The authentication system uses a flexible user model that supports both traditional and OAuth authentication:
from sqlalchemy import Column, Integer, String, Boolean
from app.db.database import Base
class User(Base):
"""User model for authentication"""
__tablename__ = "users"
# Core authentication fields
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
username = Column(String, unique=True)
email = Column(String, unique=True)
hashed_password = Column(String)
is_verified = Column(Boolean, default=False)
# OAuth integration
is_oauth = Column(Boolean, default=False)
google_sub = Column(String(100), nullable=True, unique=True, index=True)
# Add more OAuth providers as needed:
# facebook_id = Column(String(100), nullable=True, unique=True, index=True)
# github_id = Column(String(100), nullable=True, unique=True, index=True)
# Token management
verification_token = Column(String, nullable=True)
reset_token = Column(String, nullable=True)
# Subscription integration (for payments)
customer_id = Column(String(255), nullable=True)
plan_id = Column(Integer, nullable=True)
subscription_id = Column(String(255), nullable=True)
subscription_status = Column(String(64), nullable=True)
subscription_last_renew = Column(String, nullable=True)
subscription_next_renew = Column(String, nullable=True)Payment Integration Ready: The user model includes subscription fields for seamless integration with the payment system.
API Reference
User Registration
Create User Account
Creates a new user account and sends verification email.
POST /auth/create-user
Content-Type: application/json
{
"username": "john_doe",
"email": "[email protected]",
"password": "securepassword123"
}Implementation:
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
@router.post("/create-user", status_code=status.HTTP_201_CREATED)
async def create_user(
db: db_dependency,
create_user_request: CreateUserRequest,
background_tasks: BackgroundTasks
):
"""Register a new user"""
# Check if username exists
result = await db.execute(
select(User).filter(User.username == create_user_request.username)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(status_code=400, detail="Username already taken.")
hashed_password = pwd_context.hash(create_user_request.password)
try:
user = User(
username=create_user_request.username,
email=create_user_request.email,
hashed_password=hashed_password
)
db.add(user)
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=400, detail="Email already taken.")
token = generate_verification_token(create_user_request.username)
background_tasks.add_task(send_verification_email, create_user_request.email, token)
return {"message": "User created. Check your email to verify."}Response:
{
"message": "User created. Check your email to verify."
}Validation Rules:
- Username must be unique
- Email must be valid and unique
- Password must be at least 8 characters
Error Responses:
400 Bad Request- Username or email already taken422 Unprocessable Entity- Invalid data format
Verify Email
Verifies user email address using token from email.
GET /auth/verify-email?token=<verification_token>Response:
- Redirects to
{FRONTEND_URL}/login?detail=email_verifiedon success - Redirects to
{FRONTEND_URL}/login?detail=already_verifiedif already verified
Token expires after 30 minutes.
Authentication
Login with Credentials
Authenticates user and returns JWT tokens.
POST /auth/token
Content-Type: application/x-www-form-urlencoded
username=john_doe&password=securepassword123Implementation:
@router.post("/token", response_model=Token)
async def login_for_access_token(
db: db_dependency,
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
"""Login user and return tokens"""
user = await authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=401, detail="Invalid username or password.")
if not user.is_verified:
raise HTTPException(status_code=401, detail="Verify your email first.")
access_token = create_access_token(
user.username,
user.id,
timedelta(days=app_settings.ACCESS_TOKEN_EXPIRATION_DAYS)
)
refresh_token = create_refresh_token(
user.username,
user.id,
timedelta(days=app_settings.REFRESH_TOKEN_EXPIRATION_DAYS)
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}Response:
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"token_type": "bearer"
}Error Responses:
401 Unauthorized- Invalid credentials401 Unauthorized- Email not verified
Refresh Tokens
Exchanges refresh token for new access and refresh tokens.
POST /auth/refresh
Content-Type: application/json
{
"refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}Response:
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"token_type": "bearer"
}Error Responses:
401 Unauthorized- Refresh token expired or invalid
OAuth Authentication
Setting up Google OAuth
Before using Google OAuth, you need to obtain credentials from Google Cloud Console:
Get Google OAuth Credentials
- Go to the Google Cloud Console
- Create a new project or select an existing one
- Enable the Google+ API (or Google Identity API)
- Go to "Credentials" → "Create Credentials" → "OAuth 2.0 Client ID"
- Configure the consent screen with your app information
- Add authorized redirect URIs:
http://localhost:8000/auth/oauth/callback/google(development)https://yourdomain.com/auth/oauth/callback/google(production)
Configure Environment Variables
# Required for Google OAuth
GOOGLE_CLIENT_ID=your_google_client_id_here
GOOGLE_CLIENT_SECRET=your_google_client_secret_here
# URLs for OAuth (redirect URI auto-constructed from BACKEND_URL)
BACKEND_URL=http://localhost:8000
FRONTEND_URL=http://localhost:3000Auto-generated Redirect URI: The OAuth redirect URI is automatically
constructed as {BACKEND_URL}/auth/oauth/callback/{provider}. You don't need
to set GOOGLE_REDIRECT_URI manually.
Google OAuth Flow
Initiate OAuth
GET /auth/oauth/googleImplementation:
@router.get("/oauth/{provider}")
async def login_oauth(provider: str, request: Request):
"""Start OAuth login for given provider"""
provider_config = oauth_providers.get_provider(provider)
if not provider_config:
raise HTTPException(
status_code=404,
detail=f"OAuth provider '{provider}' not supported"
)
oauth_client = getattr(oauth, provider, None)
if not oauth_client:
raise HTTPException(
status_code=503,
detail=f"OAuth provider '{provider}' not configured"
)
return await oauth_client.authorize_redirect(request, provider_config.redirect_uri)Redirects to Google's OAuth consent screen where users grant permissions.
Handle Callback
GET /auth/oauth/callback/google?code=<authorization_code>Implementation:
@router.get("/oauth/callback/{provider}")
async def auth_oauth_callback(provider: str, request: Request, db: db_dependency):
"""Handle OAuth callback and issue tokens"""
provider_config = oauth_providers.get_provider(provider)
if not provider_config:
raise HTTPException(
status_code=404,
detail=f"OAuth provider '{provider}' not supported"
)
oauth_client = getattr(oauth, provider, None)
if not oauth_client:
raise HTTPException(
status_code=503,
detail=f"OAuth provider '{provider}' not configured"
)
handler = oauth_handlers.get(provider)
if not handler:
raise HTTPException(
status_code=501,
detail=f"Handler for provider '{provider}' not implemented"
)
try:
token_response = await oauth_client.authorize_access_token(request)
except OAuthError as e:
raise HTTPException(status_code=401, detail="Could not validate credentials")
user = await handler(token_response, db)
access_token = create_access_token(
user.username,
user.id,
timedelta(days=app_settings.ACCESS_TOKEN_EXPIRATION_DAYS)
)
refresh_token = create_refresh_token(
user.username,
user.id,
timedelta(days=app_settings.REFRESH_TOKEN_EXPIRATION_DAYS)
)
return RedirectResponse(
f"{app_settings.FRONTEND_URL}/auth?access_token={access_token}&refresh_token={refresh_token}"
)Processes OAuth response and creates/updates user account. If user exists by email, links Google account. Otherwise creates new user.
Receive Tokens
On success, redirects to frontend with tokens:
https://yourapp.com/auth?access_token=<token>&refresh_token=<refresh_token>OAuth Configuration Required: Google OAuth will only work if
GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are
properly set.
OAuth Service Functions
The system uses async service functions for OAuth user management:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def get_user_by_oauth_sub(
provider: str,
sub: str,
db: AsyncSession
) -> Optional[User]:
"""Get user by OAuth provider + sub ID"""
if provider == 'google':
result = await db.execute(select(User).filter(User.google_sub == str(sub)))
return result.scalar_one_or_none()
return None
async def create_user_from_oauth_info(
provider: str,
oauth_user: OAuthUserInfo,
db: AsyncSession
) -> User:
"""Create/update user from OAuth info"""
result = await db.execute(select(User).filter(User.email == oauth_user.email))
existing_user = result.scalar_one_or_none()
if existing_user:
# Link OAuth account to existing user
if provider == 'google':
existing_user.google_sub = oauth_user.sub
await db.commit()
return existing_user
else:
# Create new user with unique username
base_username = oauth_user.email.split("@")[0]
unique_username = await get_unique_username(base_username, db)
new_user_data = {
"username": unique_username,
"email": oauth_user.email,
"is_oauth": True
}
if provider == 'google':
new_user_data["google_sub"] = oauth_user.sub
new_user = User(**new_user_data)
db.add(new_user)
await db.commit()
await db.refresh(new_user)
return new_userAdding More OAuth Providers
The system is designed to support multiple OAuth providers with dynamic credential loading. Here's how to add Facebook:
Add Environment Variables
# Add to .env
FACEBOOK_CLIENT_ID=your_facebook_client_id
FACEBOOK_CLIENT_SECRET=your_facebook_client_secretUpdate Settings
# Add to app/config/settings.py in the OAuth section
self.FACEBOOK_CLIENT_ID: str = os.getenv("FACEBOOK_CLIENT_ID")
self.FACEBOOK_CLIENT_SECRET: str = os.getenv("FACEBOOK_CLIENT_SECRET")Register Provider
# In oauth_providers.py
from .validators import FacebookUser # Create this Pydantic model
oauth_providers.register(
OAuthProviderConfig(
name='facebook',
registration_params={
'api_base_url': 'https://graph.facebook.com/v12.0/',
'authorize_url': 'https://www.facebook.com/v12.0/dialog/oauth',
'access_token_url': 'https://graph.facebook.com/v12.0/oauth/access_token',
'client_kwargs': {'scope': 'email public_profile'}
# Note: client_id and client_secret are auto-added from settings
},
user_model=FacebookUser,
db_field='facebook_id'
)
)Create Callback Handler
# Add handler function in oauth_providers.py
async def handle_facebook_callback(
token_response: Dict[str, Any],
db: AsyncSession
) -> User:
"""Process Facebook OAuth callback"""
from .services import get_user_by_oauth_sub, create_user_from_oauth_info
user_info = token_response.get("userinfo", {})
facebook_user = FacebookUser(**user_info)
user = await get_user_by_oauth_sub('facebook', facebook_user.id, db)
return user or await create_user_from_oauth_info('facebook', facebook_user, db)
# Register handler
oauth_handlers['facebook'] = handle_facebook_callbackUpdate User Model
# Add to User model in models.py
facebook_id = Column(String(100), nullable=True, unique=True, index=True)Then create and run a database migration:
alembic revision --autogenerate -m "Add Facebook OAuth support"
alembic upgrade headDynamic Provider System: The OAuth system automatically detects and registers providers based on environment variables. Client credentials are automatically injected into the OAuth configuration.
User Management
Get Current User
Returns authenticated user information.
GET /auth/get-user
Authorization: Bearer <access_token>Response:
{
"id": 1,
"username": "john_doe",
"email": "[email protected]",
"is_verified": true,
"is_oauth": false
}Update User Profile
Updates user profile information.
PATCH /auth/update-user
Authorization: Bearer <access_token>
Content-Type: application/json
{
"display_name": "John Doe",
"bio": "Software developer",
"niche_preference": "technology",
"avatar_url": "https://example.com/avatar.jpg"
}Implementation:
@router.patch("/update-user", response_model=UserUpdateRequest)
async def update_user(
user: user_dependency,
db: db_dependency,
user_update_request: UserUpdateRequest
):
"""Update user profile fields (some fields are restricted)"""
result = await db.execute(select(User).filter(User.id == user.id))
user_record = result.scalar_one_or_none()
if not user_record:
raise HTTPException(status_code=404, detail="User not found")
restricted = ['id', 'created_at', 'is_admin', 'hashed_password', 'google_sub', 'email']
update_data = user_update_request.model_dump(exclude_unset=True)
for field in restricted:
if field in update_data:
raise HTTPException(status_code=400, detail=f"Cannot update '{field}'.")
for key, value in update_data.items():
setattr(user_record, key, value)
await db.commit()
await db.refresh(user_record)
return user_recordProtected Fields: Cannot update id, created_at, is_admin, hashed_password, google_sub, or email.
Error Responses:
400 Bad Request- Attempting to update protected field404 Not Found- User not found
Pydantic 2.x: This codebase uses Pydantic 2.x. Use model_dump() instead
of deprecated dict() method.
Password Reset
Request Password Reset
Sends password reset email if user exists.
POST /auth/request-password-reset
Content-Type: application/json
{
"email": "[email protected]"
}Response:
{
"detail": "If the email exists, a reset link has been sent."
}Security Note: Response is the same whether email exists or not to prevent email enumeration.
Reset Password
Resets user's password using valid token.
POST /auth/reset-password
Content-Type: application/json
{
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"new_password": "newSecurePassword123"
}Response:
{
"detail": "Password reset successful"
}Error Responses:
400 Bad Request- Invalid or expired token422 Unprocessable Entity- Password doesn't meet requirements
Security Features
Password Security
- Bcrypt Hashing: All passwords are hashed using bcrypt with automatic salt generation
- Password Strength: Minimum 8 characters required (configurable with Pydantic validators)
- No Plain Text: Passwords are never stored in plain text
from pydantic import BaseModel, field_validator
class CreateUserRequest(BaseModel):
username: str
email: EmailStr
password: str
@field_validator('password')
@classmethod
def password_strength(cls, v):
"""Validate password strength"""
if len(v) < 8:
raise ValueError('Password must be at least 8 characters long')
return vToken Security
- JWT Tokens: Stateless authentication with configurable expiration
- Token Refresh: Automatic token refresh prevents session interruption
- Secure Storage: Tokens include user ID and expiration claims
Default Token Expiration:
- Access Token: 7 days (configurable via
ACCESS_TOKEN_EXPIRATION_DAYS) - Refresh Token: 14 days (configurable via
REFRESH_TOKEN_EXPIRATION_DAYS)
Email Security
- Token-based Verification: Email verification uses JWT tokens
- Time-limited Tokens: Verification tokens expire after 30 minutes
- Password Reset: Reset tokens expire after 1 hour
Configuration
Environment Variables
# Core Settings
SECRET_KEY=your-secret-key-min-32-chars-long-random-string-here
ACCESS_TOKEN_EXPIRATION_DAYS=7
REFRESH_TOKEN_EXPIRATION_DAYS=14
# URL Configuration (REQUIRED)
BACKEND_URL=http://localhost:8000
FRONTEND_URL=http://localhost:3000
# OAuth Configuration
GOOGLE_CLIENT_ID=your_google_client_id
GOOGLE_CLIENT_SECRET=your_google_client_secret
# Note: Redirect URI is auto-constructed as {BACKEND_URL}/auth/oauth/callback/{provider}
# Database (use asyncpg driver for async support)
DATABASE_URL=postgresql+asyncpg://user:password@localhost/dbname
# Redis (for Celery background tasks)
REDIS_DSN=redis://:yourpassword@redis:6379/0
# Email Configuration (SendGrid)
SENDGRID_API_KEY=your_sendgrid_api_key
SUPPORT_EMAIL=[email protected]
FROM_EMAIL=[email protected]
COMPANY_NAME="Your Company Name"Security Best Practices
- Use Strong Secret Keys: Generate cryptographically secure secret keys
- HTTPS Only: Always use HTTPS in production
- Token Expiration: Set appropriate token expiration times
- Rate Limiting: Implement rate limiting on authentication endpoints
- Input Validation: All inputs are validated using Pydantic models
Testing
Example Test Cases
All authentication tests should use async patterns:
import pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_create_user():
"""Test user registration"""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post("/auth/create-user", json={
"username": "testuser",
"email": "[email protected]",
"password": "password123"
})
assert response.status_code == 201
data = response.json()
assert "message" in data
@pytest.mark.asyncio
async def test_login():
"""Test user login"""
async with AsyncClient(app=app, base_url="http://test") as client:
# First create and verify user
await create_verified_test_user()
# Then login
response = await client.post("/auth/token", data={
"username": "testuser",
"password": "password123"
})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
@pytest.mark.asyncio
async def test_oauth_google():
"""Test Google OAuth flow"""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/auth/oauth/google", follow_redirects=False)
assert response.status_code in [302, 307] # Redirect to GoogleAsync Testing: All tests must use @pytest.mark.asyncio decorator and
AsyncClient from httpx for testing async endpoints.
Frontend Integration
JavaScript Example
// Login function
async function login(username, password) {
const response = await fetch("/auth/token", {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
},
body: `username=${username}&password=${password}`,
});
if (response.ok) {
const data = await response.json();
localStorage.setItem("access_token", data.access_token);
localStorage.setItem("refresh_token", data.refresh_token);
return data;
}
throw new Error("Login failed");
}
// Authenticated request
async function makeAuthenticatedRequest(url) {
const token = localStorage.getItem("access_token");
const response = await fetch(url, {
headers: {
Authorization: `Bearer ${token}`,
},
});
if (response.status === 401) {
// Token expired, try to refresh
await refreshToken();
return makeAuthenticatedRequest(url);
}
return response;
}
// Token refresh
async function refreshToken() {
const refresh_token = localStorage.getItem("refresh_token");
const response = await fetch("/auth/refresh", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ refresh_token }),
});
if (response.ok) {
const data = await response.json();
localStorage.setItem("access_token", data.access_token);
localStorage.setItem("refresh_token", data.refresh_token);
} else {
// Refresh failed, redirect to login
window.location.href = "/login";
}
}
// Google OAuth login
function loginWithGoogle() {
window.location.href = "/auth/oauth/google";
}
// Handle OAuth callback
function handleOAuthCallback() {
const urlParams = new URLSearchParams(window.location.search);
const accessToken = urlParams.get("access_token");
const refreshToken = urlParams.get("refresh_token");
if (accessToken && refreshToken) {
localStorage.setItem("access_token", accessToken);
localStorage.setItem("refresh_token", refreshToken);
window.location.href = "/dashboard";
}
}Error Handling
The authentication system provides consistent error responses:
{
"detail": "Error message description"
}Common Error Codes
400 Bad Request- Invalid input data or business logic error401 Unauthorized- Authentication failed or token invalid403 Forbidden- Access denied (user doesn't have permission)404 Not Found- Resource not found422 Unprocessable Entity- Validation error
Troubleshooting
Common Issues
OAuth Not Working
- Check
BACKEND_URLis set correctly - Verify environment variables are set for the provider
- Verify redirect URIs in provider console match
{BACKEND_URL}/auth/oauth/callback/{provider} - Ensure HTTPS is used in production
- Check that session middleware is configured with a valid
SECRET_KEY
Email Verification Not Working
- Check SendGrid API key is valid
- Verify email templates exist in
app/email/templates/ - Check spam folder
- Verify
FROM_EMAILis authorized in SendGrid
Token Errors
- Verify SECRET_KEY is consistent and not "TEST"
- Check token expiration settings
- Ensure clocks are synchronized
Session State Mismatch (OAuth)
- Clear browser cookies and try again
- Ensure
SECRET_KEYis set to a strong random string (not "TEST") - Check that
SessionMiddlewareis properly configured inmain.py
Debug Mode
Enable debug logging to troubleshoot issues:
import logging
logging.basicConfig(level=logging.DEBUG)Extending the System
Adding Custom User Fields
- Update the User Model: Add new columns to the User model
- Create Migration: Generate and run database migration
- Update Validators: Add fields to
UserUpdateRequest - Update API: Modify endpoints to handle new fields
Adding New OAuth Providers
The system supports easy addition of new OAuth providers:
- Add Environment Variables: Set
{PROVIDER}_CLIENT_IDand{PROVIDER}_CLIENT_SECRET - Update Settings: Add provider credentials to
settings.py - Register Provider: Add provider configuration to
oauth_providers.py - Create Handler: Implement callback handler function
- Update User Model: Add provider-specific field (e.g.,
github_id)
The system automatically:
- Detects configured providers based on environment variables
- Constructs redirect URIs from
BACKEND_URL - Injects client credentials into OAuth registration
Custom Authentication Logic
The system is designed to be extensible. You can:
- Add custom validation rules using Pydantic validators
- Implement additional security checks
- Add audit logging
- Integrate with external identity providers
Migration Guide
From Session-based Auth
- Install Dependencies: Add JWT and OAuth dependencies
- Update Database: Run migrations to add new user fields
- Update Frontend: Replace session handling with token storage
- Test Integration: Verify all authentication flows work
Upgrading
When upgrading the authentication system:
- Backup Database: Always backup before migrations
- Run Migrations: Apply database schema changes
- Update Environment: Add any new required variables (especially
BACKEND_URL) - Test Thoroughly: Verify all authentication flows