JWT
- JSON Web Tokens
- 它是一個將 JSON 對象編碼為密集且沒有空格的長字符串的標准
- 使用 JWT token 和安全密碼 hash 使應用程序真正安全
JWT 小栗子
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 它還沒有加密,因此任何人都可以從該字符串中恢復信息
- 但是已經加簽了,因此,當收到發出的 token 時,可以驗證是否實際發出了它
- 創建一個有效期為 1 周的 token,然后當用戶第二天帶着 token 回來時,知道該用戶仍然登錄到系統中
- 一周后,令牌將過期,用戶將無法獲得授權,必須重新登錄以獲取新的 token
- 如果用戶(或第三方)試圖修改 token 以更改過期時間,將能夠發現它,因為簽名不匹配
前提
需要安裝 python-jose 來在 Python 中生成和驗證 JWT token
pip install python-jose
pip install cryptography
JWT 流程
- 前端登錄提交用戶名、密碼
- 后端拿到用戶名、密碼進行驗證,如果沒問題,則返回 token
- 前端訪問需要認證的 url 時攜帶 token
- 后端拿到 token 進行驗證
- 驗證通過返回用戶信息及訪問的 url 信息
hash 密碼
前提
- 數據庫存儲的密碼不能是明文的,需要加密
- PassLib 是一個用於處理哈希密碼的包
- 推薦的算法是 「Bcrypt」
pip install passlib
pip install bcrypt
包含的功能
- hash 密碼
- 驗證 hash 密碼是否一致
- 通過用戶名、密碼驗證用戶
hash 密碼
# 導入 CryptContext from passlib.context import CryptContext pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto") # 密碼加密 def hash_password(password: str) -> str: return pwd_context.hash(password)
驗證 hash 密碼是否一致
# 驗證密碼 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)
通過用戶名、密碼驗證用戶
# 模擬從數據庫中根據用戶名查找用戶 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 根據用戶名、密碼來驗證用戶 def authenticate_user(db, username: str, password: str): # 1、通過用戶名模擬去數據庫查找用戶 user = get_user(db, username) if not user: # 2、用戶不存在 return False if not verify_password(password, user.hashed_password): # 3、密碼驗證失敗 return False # 4、驗證通過,返回用戶信息 return user
處理 JWT token
生成用於簽名 JWT token 的隨機密鑰
在命令行敲
> openssl rand -hex 32
dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c
常量池
方便后續復用
# 常量池 # 通過 openssl rand -hex 32 生成的隨機密鑰 SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c" # 加密算法 ALGORITHM = "HS256" # 過期時間,分鍾 ACCESS_TOKEN_EXPIRE_MINUTES = 30
創建生成 JWT token 需要用的 Pydantic Model
其實不創建也沒事,但這里為了規范和數據校驗功能,還是建吧
# 返回給客戶端的 Token Model class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None
生成 JWT token
# 導入 JWT 相關庫 from jose import JWTError, jwt # 用戶名、密碼驗證成功后,生成 token def create_access_token( data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 加密 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
修改 get_current_user
獲取 token 后解碼並獲取用戶
# 導入 JWT 相關庫 from jose import JWTError, jwt # 根據當前用戶的 token 獲取用戶,token 已失效則返回錯誤碼 async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # 1、解碼收到的 token payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM) # 2、拿到 username username: str = payload.get("sub") if not username: # 3、若 token 失效,則返回錯誤碼 raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception # 4、獲取用戶 user = get_user(fake_users_db, username=token_data.username) if not user: raise credentials_exception # 5、返回用戶 return user
修改獲取 token 的路徑操作函數
# OAuth2 獲取 token 的請求路徑 @app.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 1、獲取客戶端傳過來的用戶名、密碼 username = form_data.username password = form_data.password # 2、驗證用戶 user = authenticate_user(fake_users_db, username, password) if not user: # 3、驗證失敗,返回錯誤碼 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 4、生成 token access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # 5、返回 JSON 響應 return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
- JWT 規范中有一個 sub key,子健
- 它是可選的,這里的作用是通過用戶名設置用戶標識
- 子健應該在整個應用程序中具有唯一的標識符,並且它應該是一個字符串
完整的代碼
#!usr/bin/env python # -*- coding:utf-8 _*- """ # author: 小菠蘿測試筆記 # blog: https://www.cnblogs.com/poloyy/ # time: 2021/10/6 12:05 下午 # file: 49_bearer.py """ from typing import Optional import uvicorn from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from datetime import datetime, timedelta # 導入 CryptContext from passlib.context import CryptContext # 導入 JWT 相關庫 from jose import JWTError, jwt # 常量池 # 通過 openssl rand -hex 32 生成的隨機密鑰 SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c" # 加密算法 ALGORITHM = "HS256" # 過期時間,分鍾 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 模擬數據庫 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } # 返回給客戶端的 User Model,不需要包含密碼 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None # 繼承 User,用於密碼驗證,所以要包含密碼 class UserInDB(User): hashed_password: str # 獲取 token 路徑操作函數的響應模型 class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None # 實例對象池 app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto") # 密碼加密 def hash_password(password: str) -> str: return pwd_context.hash(password) # 驗證密碼 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 模擬從數據庫中根據用戶名查找用戶 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 根據用戶名、密碼來驗證用戶 def authenticate_user(db, username: str, password: str): # 1、通過用戶名模擬去數據庫查找用戶 user = get_user(db, username) if not user: # 2、用戶不存在 return False if not verify_password(password, user.hashed_password): # 3、密碼驗證失敗 return False # 4、驗證通過,返回用戶信息 return user # 用戶名、密碼驗證成功后,生成 token def create_access_token( data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 加密 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # OAuth2 獲取 token 的請求路徑 @app.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 1、獲取客戶端傳過來的用戶名、密碼 username = form_data.username password = form_data.password # 2、驗證用戶 user = authenticate_user(fake_users_db, username, password) if not user: # 3、驗證失敗,返回錯誤碼 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 4、生成 token access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # 5、返回 JSON 響應 return {"access_token": access_token, "token_type": "bearer"} # 根據當前用戶的 token 獲取用戶,token 已失效則返回錯誤碼 async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # 1、解碼收到的 token payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM) # 2、拿到 username username: str = payload.get("sub") if not username: # 3、若 token 失效,則返回錯誤碼 raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception # 4、獲取用戶 user = get_user(fake_users_db, username=token_data.username) if not user: raise credentials_exception # 5、返回用戶 return user # 判斷用戶是否活躍,活躍則返回,不活躍則返回錯誤碼 async def get_current_active_user(user: User = Depends(get_current_user)): if user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User") return user # 獲取當前用戶信息 @app.get("/user/me") async def read_user(user: User = Depends(get_current_active_user)): return user # 正常的請求 @app.get("/items/") async def read_items(token: str = Depends(oauth2_scheme)): return {"token": token} if __name__ == '__main__': uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)
請求結果