forked from mouredev/Hello-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjwt_auth_users.py
More file actions
115 lines (81 loc) · 3.06 KB
/
jwt_auth_users.py
File metadata and controls
115 lines (81 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Clase en vídeo: https://youtu.be/_y9qQZXE24A?t=17664
### Users API con autorización OAuth2 JWT ###
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from datetime import datetime, timedelta, timezone
ALGORITHM = "HS256"
ACCESS_TOKEN_DURATION = 1
SECRET = "201d573bd7d1344d3a3bfce1550b69102fd11be3db6d379508b6cccc58ea230b"
router = APIRouter(
prefix="/jwtauth",
tags=["jwtauth"],
responses={status.HTTP_404_NOT_FOUND: {"message": "No encontrado"}}
)
oauth2 = OAuth2PasswordBearer(tokenUrl="login")
crypt = CryptContext(schemes=["bcrypt"])
class User(BaseModel):
username: str
full_name: str
email: str
disabled: bool
class UserDB(User):
password: str
users_db = {
"mouredev": {
"username": "mouredev",
"full_name": "Brais Moure",
"email": "[email protected]",
"disabled": False,
"password": "$2a$12$B2Gq.Dps1WYf2t57eiIKjO4DXC3IUMUXISJF62bSRiFfqMdOI2Xa6"
},
"mouredev2": {
"username": "mouredev2",
"full_name": "Brais Moure 2",
"email": "[email protected]",
"disabled": True,
"password": "$2a$12$SduE7dE.i3/ygwd0Kol8bOFvEABaoOOlC8JsCSr6wpwB4zl5STU4S"
}
}
def search_user_db(username: str):
if username in users_db:
return UserDB(**users_db[username])
def search_user(username: str):
if username in users_db:
return User(**users_db[username])
async def auth_user(token: str = Depends(oauth2)):
exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Credenciales de autenticación inválidas",
headers={"WWW-Authenticate": "Bearer"})
try:
username = jwt.decode(token, SECRET, algorithms=[ALGORITHM]).get("sub")
if username is None:
raise exception
except JWTError:
raise exception
return search_user(username)
async def current_user(user: User = Depends(auth_user)):
if user.disabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Usuario inactivo")
return user
@router.post("/login")
async def login(form: OAuth2PasswordRequestForm = Depends()):
user_db = users_db.get(form.username)
if not user_db:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="El usuario no es correcto")
user = search_user_db(form.username)
if not crypt.verify(form.password, user.password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="La contraseña no es correcta")
access_token = {"sub": user.username,
"exp": datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_DURATION)}
return {"access_token": jwt.encode(access_token, SECRET, algorithm=ALGORITHM), "token_type": "bearer"}
@router.get("/users/me")
async def me(user: User = Depends(current_user)):
return user