一、准備
1、python-jose
JavaScript對象簽名和加密(JOSE)技術。
- JSON Web Signatures(JWS)
- JSON Web Encryption(JWK)
- JSON Web Key(JWK)
- JSON Web Algorithms(JWA)
使用各種算法對內容進行加密和簽名,其中JSON Web Signatures是對JSON編碼對象進行簽名,然后將其編成復雜的URL安全字符串。支持的算法有:
Algorithm Value | Digital Signature or MAC Algorithm |
HS256 | HMAC using SHA-256 hash algorithm |
HS384 | HMAC using SHA-384 hash algorithm |
HS512 | HMAC using SHA-512 hash algorithm |
RS256 | RSASSA using SHA-256 hash algorithm |
RS384 | RSASSA using SHA-384 hash algorithm |
RS512 | RSASSA using SHA-512 hash algorithm |
ES256 | ECDSA using SHA-256 hash algorithm |
ES384 | ECDSA using SHA-384 hash algorithm |
ES512 | ECDSA using SHA-512 hash algorithm |
在使用前先進行安裝包:
pip insstall python-jose
然后使用:
>>> from jose import jwt >>> token = jwt.encode({"key":"value"},"secret",algorithm="HS256") >>> token 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJ2YWx1ZSJ9.FG-8UppwHaFp1LgRYQQeS6EDQF7_6-bMFegNucHjmWg' >>> jwt.decode(token,"secret",algorithms=["HS256"]) {'key': 'value'}
2、passlib
passlib是一個用於python2 & 3的密碼散列包,它提供了超過30多種密碼散列算法,同時也是一個管理現有密碼散列的框架。
passlib可以大致分為四類:
- Password Hashes
- Password Contexts
- Two-Factor Authentication
- Application Helpers
使用這個模塊,需要先安裝passlib:
pip install passlib
然后這里以Password Hashes為例:
# 加密 >>> from passlib.hash import pbkdf2_sha256 >>> hash = pbkdf2_sha256.hash("password") >>> hash '$pbkdf2-sha256$29000$8l4rpTRmDGEsRUhJac05Bw$eajW7PFThCDyQ2DiCbVIeaMw6pF/bLXj5XkLlI3dOY0' # 進行驗證 >>> pbkdf2_sha256.verify("password", hash) True
二、Hash Password和JWT Bearer Token認證
(一)流程
- 客戶端發送用戶名和密碼到生成token的路徑操作
- 服務器路徑操作函數生成對應的JWT Token
- 返回JWT Token到客戶端
- 客戶端發送請求,並且請求頭中攜帶對應的Token
- 服務端檢查JWT Token,並且從Token中得到用戶信息
- 服務端將用戶信息返回到客戶端
(二)獲取jwt token
from pydantic import BaseModel from fastapi import FastAPI, Depends, HTTPException, status from typing import Optional from passlib.context import CryptContext from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from datetime import datetime, timedelta from jose import jwt, JWTError app = FastAPI() fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTE = 30 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str class Token(BaseModel): """返回給用戶的Token""" access_token: str token_type: str oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verity_password(plain_password: str, hashed_password: str): """對密碼進行校驗""" return pwd_context.verify(plain_password, hashed_password) def jwt_get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def jwt_authenticate_user(db, username: str, password: str): """根據前台傳遞的用戶名來從數據庫中取出用戶,然后進行密碼驗證""" user = jwt_get_user(db=db, username=username) if not user: return False if not verity_password(plain_password=password, hashed_password=user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """用戶驗證成功,並且取出用戶信息,然后據此創建jwt token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app.post("/jwt/token", response_model=Token) async def login_jwt_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"} ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTE) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
在交互文檔中輸入用戶名和密碼獲取jwt token:
(三)獲取活躍用戶
from pydantic import BaseModel from fastapi import FastAPI, Depends, HTTPException, status from typing import Optional from passlib.context import CryptContext from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from datetime import datetime, timedelta from jose import jwt, JWTError app = FastAPI() fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTE = 30 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str class Token(BaseModel): """返回給用戶的Token""" access_token: str token_type: str oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verity_password(plain_password: str, hashed_password: str): """對密碼進行校驗""" return pwd_context.verify(plain_password, hashed_password) def jwt_get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def jwt_authenticate_user(db, username: str, password: str): """根據前台傳遞的用戶名來從數據庫中取出用戶,然后進行密碼驗證""" user = jwt_get_user(db=db, username=username) if not user: return False if not verity_password(plain_password=password, hashed_password=user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """用戶驗證成功,並且取出用戶信息,然后據此創建jwt token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app.post("/jwt/token", response_model=Token) async def login_jwt_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"} ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTE) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} async def jwt_get_current_user(token: str = Depends(oauth2_schema)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"} ) try: payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if not username: raise credentials_exception except JWTError: raise credentials_exception user = jwt_get_user(db=fake_users_db, username=username) if not user: raise credentials_exception return user async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)): if current_user.disabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) return current_user @app.get("/jwt/users/me") async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)): return current_user