forked from liaogx/fastapi-tutorial
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchapter06.py
More file actions
214 lines (160 loc) · 6.94 KB
/
chapter06.py
File metadata and controls
214 lines (160 loc) · 6.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# __author__ = '__Jack__'
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
app06 = APIRouter()
"""OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer"""
"""
OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送username和password参数,然后得到一个Token值
OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址
当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED)
"""
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token") # 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token
@app06.get("/oauth2_password_bearer")
async def oauth2_password_bearer(token: str = Depends(oauth2_schema)):
return {"token": token}
"""基于 Password 和 Bearer token 的 OAuth2 认证"""
fake_users_db = {
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
@app06.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token: str):
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_schema)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"}, # OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
return current_user
@app06.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
"""OAuth2 with Password (and hashing), Bearer with JWT tokens 开发基于JSON Web Tokens的认证"""
fake_users_db.update({
"john snow": {
"username": "john snow",
"full_name": "John Snow",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
})
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # 生成密钥 openssl rand -hex 32
ALGORITHM = "HS256" # 算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 访问令牌过期分钟
class Token(BaseModel):
"""返回给用户的Token"""
access_token: str
token_type: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/jwt/token")
def verity_password(plain_password: str, hashed_password: str):
"""对密码进行校验"""
return pwd_context.verify(plain_password, hashed_password)
def jwt_get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def jwt_authenticate_user(db, username: str, password: str):
user = jwt_get_user(db=db, username=username)
if not user:
return False
if not verity_password(plain_password=password, hashed_password=user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app06.post("/jwt/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
if not user:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
async def jwt_get_current_user(token: str = Depends(oauth2_schema)):
credentials_exception = HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = jwt_get_user(db=fake_users_db, username=username)
if user is None:
raise credentials_exception
return user
async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user")
return current_user
@app06.get("/jwt/users/me")
async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)):
return current_user