FastAPI 學習之路(二十九)使用(哈希)密碼和 JWT Bearer 令牌的 OAuth2


既然我們已經有了所有的安全流程,就讓我們來使用 JWT 令牌和安全哈希密碼讓應用程序真正地安全。

        關於 JWT

        

        它是一個將 JSON 對象編碼為密集且沒有空格的長字符串的標准。字符串看起來像這樣:

    

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c



它沒有被加密,因此任何人都可以從字符串內容中還原數據。

但它經過了簽名。因此,當你收到一個由你發出的令牌時,可以校驗令牌是否真的由你發出。

通過這種方式,你可以創建一個有效期為 1 周的令牌。然后當用戶第二天使用令牌重新訪問時,你知道該用戶仍然處於登入狀態。

一周后令牌將會過期,用戶將不會通過認證,必須再次登錄才能獲得一個新令牌。而且如果用戶(或第三方)試圖修改令牌以篡改過期時間,你將因為簽名不匹配而能夠發覺。

        我們看下如何實現?

      一、安裝python-json產生和校驗JWT。

pip install python-jose

 二、使用PassLib處理哈希密碼

        

pip install passlib

 

    還需要安裝bcrypt

 pip install bcrypt

    三、我們看下如何使用,以及思路

創建一個工具函數以哈希來自用戶的密碼。
然后創建另一個工具函數,用於校驗接收的密碼是否與存儲的哈希值匹配。
再創建另一個工具函數用於認證並返回用戶。
創建用於設定 JWT 令牌簽名算法的變量 「ALGORITHM」,並將其設置為 "HS256"。
創建一個設置令牌過期時間的變量。
定義一個將在令牌端點中用於響應的 Pydantic 模型。
創建一個生成新的訪問令牌的工具函數。
get_current_user使用的是 JWT 令牌解碼,接收到的令牌,對其進行校驗,然后返回當前用戶。
如果令牌無效,立即返回一個 HTTP 錯誤。
使用令牌的過期時間創建一個 timedelta 對象。
創建一個真實的 JWT 訪問令牌並返回它。

我們最后看下實現代碼

from fastapi import FastAPI, Depends,status,HTTPException
from pydantic import BaseModel
from typing import Optional
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta
from passlib.context import CryptContext
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
fake_users = {
    "leizi": {
        "username": "leizi",
        "full_name": "leizishuoceshikaifa",
        "email": "leizi@leizi.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False
    }
}
app = FastAPI()


def fake_hash_password(password: str):
    return password

class Token(BaseModel):
    access_token: str
    token_type: str
class TokenData(BaseModel):
    username: Optional[str] = None
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None
class UserInDB(User):
    hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
    return pwd_context.hash(password)
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
def fake_decode_token(token):
    user = get_user(fake_users, token)
    return user


def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="驗證失敗",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="已經刪除")
    return current_user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
def read_users_me(current_user: User = Depends(get_current_active_user)):

    return current_user

 我們去測試下,效果,我們去在docs上使用Authorize,默認的密碼是:secret

 

 

 

  我們去填寫下

 

 

 我們去認證下

 

 

 我們看下接口的請求

 

 

 

 我們看下如果不認證呢

 

 

 我們可以看下請求帶的數據

 

 

  這樣就完成了:使用(哈希)密碼和 JWT Bearer 令牌的 OAuth2。注意:接口返回的用戶不應該返回密碼,這個需要在實際中需要屏蔽

文章首發在公眾號,歡迎關注。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM