FastAPI安全系列(三) 基於Hash Password和JWT Bearer Token的OAuth2 .0認證


一、准備

1、python-jose

JavaScript對象簽名和加密(JOSE)技術。

  • JSON Web Signatures(JWS)
  • JSON Web Encryption(JWK)
  • JSON Web Key(JWK)
  • JSON Web Algorithms(JWA)

使用各種算法對內容進行加密和簽名,其中JSON Web Signatures是對JSON編碼對象進行簽名,然后將其編成復雜的URL安全字符串。支持的算法有:

Algorithm Value Digital Signature or MAC Algorithm
HS256 HMAC using SHA-256 hash algorithm
HS384 HMAC using SHA-384 hash algorithm
HS512 HMAC using SHA-512 hash algorithm
RS256 RSASSA using SHA-256 hash algorithm
RS384 RSASSA using SHA-384 hash algorithm
RS512 RSASSA using SHA-512 hash algorithm
ES256 ECDSA using SHA-256 hash algorithm
ES384 ECDSA using SHA-384 hash algorithm
ES512 ECDSA using SHA-512 hash algorithm

在使用前先進行安裝包:

pip insstall python-jose

然后使用:

>>> from jose import jwt
>>> token = jwt.encode({"key":"value"},"secret",algorithm="HS256")
>>> token
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJ2YWx1ZSJ9.FG-8UppwHaFp1LgRYQQeS6EDQF7_6-bMFegNucHjmWg'
>>> jwt.decode(token,"secret",algorithms=["HS256"])
{'key': 'value'}

2、passlib

  passlib是一個用於python2 & 3的密碼散列包,它提供了超過30多種密碼散列算法,同時也是一個管理現有密碼散列的框架。

  passlib可以大致分為四類:

  • Password Hashes
  • Password Contexts
  • Two-Factor Authentication
  • Application Helpers

使用這個模塊,需要先安裝passlib:

pip install passlib

然后這里以Password Hashes為例:

# 加密
>>> from passlib.hash import pbkdf2_sha256
>>> hash = pbkdf2_sha256.hash("password")
>>> hash
'$pbkdf2-sha256$29000$8l4rpTRmDGEsRUhJac05Bw$eajW7PFThCDyQ2DiCbVIeaMw6pF/bLXj5XkLlI3dOY0'

# 進行驗證
>>> pbkdf2_sha256.verify("password", hash)
True

二、Hash Password和JWT Bearer Token認證

(一)流程

  •  客戶端發送用戶名和密碼到生成token的路徑操作
  • 服務器路徑操作函數生成對應的JWT Token
  • 返回JWT Token到客戶端
  • 客戶端發送請求,並且請求頭中攜帶對應的Token
  • 服務端檢查JWT Token,並且從Token中得到用戶信息
  • 服務端將用戶信息返回到客戶端

(二)獲取jwt token

from pydantic import BaseModel
from fastapi import FastAPI, Depends, HTTPException, status
from typing import Optional
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from jose import jwt, JWTError

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTE = 30


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


class Token(BaseModel):
    """返回給用戶的Token"""
    access_token: str
    token_type: str


oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verity_password(plain_password: str, hashed_password: str):
    """對密碼進行校驗"""
    return pwd_context.verify(plain_password, hashed_password)


def jwt_get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def jwt_authenticate_user(db, username: str, password: str):
    """根據前台傳遞的用戶名來從數據庫中取出用戶,然后進行密碼驗證"""
    user = jwt_get_user(db=db, username=username)
    if not user:
        return False
    if not verity_password(plain_password=password, hashed_password=user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """用戶驗證成功,並且取出用戶信息,然后據此創建jwt token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


@app.post("/jwt/token", response_model=Token)
async def login_jwt_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTE)
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

在交互文檔中輸入用戶名和密碼獲取jwt token:

(三)獲取活躍用戶

from pydantic import BaseModel
from fastapi import FastAPI, Depends, HTTPException, status
from typing import Optional
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from jose import jwt, JWTError

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTE = 30


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


class Token(BaseModel):
    """返回給用戶的Token"""
    access_token: str
    token_type: str


oauth2_schema = OAuth2PasswordBearer(tokenUrl="/jwt/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verity_password(plain_password: str, hashed_password: str):
    """對密碼進行校驗"""
    return pwd_context.verify(plain_password, hashed_password)


def jwt_get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def jwt_authenticate_user(db, username: str, password: str):
    """根據前台傳遞的用戶名來從數據庫中取出用戶,然后進行密碼驗證"""
    user = jwt_get_user(db=db, username=username)
    if not user:
        return False
    if not verity_password(plain_password=password, hashed_password=user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """用戶驗證成功,並且取出用戶信息,然后據此創建jwt token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


@app.post("/jwt/token", response_model=Token)
async def login_jwt_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTE)
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


async def jwt_get_current_user(token: str = Depends(oauth2_schema)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"} ) try: payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if not username: raise credentials_exception except JWTError: raise credentials_exception user = jwt_get_user(db=fake_users_db, username=username) if not user: raise credentials_exception return user


async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)): if current_user.disabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) return current_user


@app.get("/jwt/users/me") async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_active_user)): return current_user

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM