-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
100 lines (79 loc) · 2.65 KB
/
main.py
File metadata and controls
100 lines (79 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Session, select
from database import check_db, create_db_and_tables, get_session
from models import Task, TaskCreate, TaskRead, TaskUpdate
@asynccontextmanager
async def lifespan(app: FastAPI):
# The database resource may still be starting — retry a few times
for attempt in range(5):
try:
create_db_and_tables()
break
except Exception:
if attempt < 4:
await asyncio.sleep(2)
else:
raise
yield
app = FastAPI(
title="FastAPI + PostgreSQL",
description="A task management API deployed on Convox.",
lifespan=lifespan,
)
SessionDep = Annotated[Session, Depends(get_session)]
@app.get("/")
def root():
return {
"app": "fastapi-postgres",
"endpoints": ["/health", "/tasks", "/docs"],
}
@app.get("/health")
def health():
try:
check_db()
except Exception:
raise HTTPException(status_code=503, detail="Database unavailable")
return {"status": "healthy"}
@app.post("/tasks", response_model=TaskRead, status_code=201)
def create_task(task: TaskCreate, session: SessionDep):
db_task = Task.model_validate(task)
session.add(db_task)
session.commit()
session.refresh(db_task)
return db_task
@app.get("/tasks", response_model=list[TaskRead])
def list_tasks(
session: SessionDep,
offset: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=100)] = 20,
):
tasks = session.exec(select(Task).offset(offset).limit(limit)).all()
return tasks
@app.get("/tasks/{task_id}", response_model=TaskRead)
def get_task(task_id: int, session: SessionDep):
task = session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.put("/tasks/{task_id}", response_model=TaskRead)
def update_task(task_id: int, task_update: TaskUpdate, session: SessionDep):
task = session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
for key, value in task_update.model_dump(exclude_unset=True).items():
setattr(task, key, value)
session.add(task)
session.commit()
session.refresh(task)
return task
@app.delete("/tasks/{task_id}")
def delete_task(task_id: int, session: SessionDep):
task = session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
session.delete(task)
session.commit()
return {"deleted": True}