作者:麥克煎蛋 出處:https://www.cnblogs.com/mazhiyong/ 轉載請保留這段聲明,謝謝!
下面我們模擬用戶登陸的過程,具體講解下登陸驗證的流程,並完善代碼邏輯。
密碼哈希
為了數據安全,我們利用PassLib對入庫的用戶密碼進行加密處理,推薦的加密算法是"Bcrypt"。我們需要安裝依賴包:
pip install passlib
pip install bcrypt
PassLib也可以對密碼進行校驗。
用戶登陸
用戶通過終端發送username
和password
到后端。后端收到數據后,進行一下操作:
1、用戶信息校驗:查詢當前系統是否存在該用戶,以及密碼是否正確。
2、如果用戶存在,則生成JWT token並返回,JWT payload中可以攜帶自定義數據。
from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm import jwt from pydantic import BaseModel from passlib.context import CryptContext # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 模擬數據庫數據 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } class Token(BaseModel): access_token: str token_type: str class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") app = FastAPI() # 校驗密碼 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 密碼哈希 def get_password_hash(password): return pwd_context.hash(password) # 模擬從數據庫讀取用戶信息 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 用戶信息校驗:username和password分別校驗 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user # 生成token,帶有過期時間 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 首先校驗用戶信息 user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成並返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": form_data.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
數據請求
終端獲取到token信息后,必須在后續請求的Authorization
頭信息中帶有Bearer
token,才能被允許訪問。
我們添加一個校驗函數,對請求的合法性進行校驗,讀取token內容解析並進行驗證:
async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except PyJWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user
添加數據請求接口,依賴上面的校驗函數:
@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
以下是Postman測試結果:
參照以上業務流程,我們可以實現其他的需要權限校驗的數據請求接口。
個人認為,如果只是單純的提供開發接口,通過JWT的校驗機制已經足以滿足業務需求。