FastAPI 安全機制(二) 基於OAuth2和JWT的Token認證機制(一)生成token


作者:麥克煎蛋   出處:https://www.cnblogs.com/mazhiyong/ 轉載請保留這段聲明,謝謝!

 

JWT簡介

基於JWT的Token認證機制實現(一)概念 

基於JWT的Token認證機制實現(二)認證過程

OAuth2PasswordBearer

OAuth2PasswordBearer是接收URL作為參數的一個類:客戶端會向該URL發送usernamepassword參數,然后得到一個token值。

OAuth2PasswordBearer並不會創建相應的URL路徑操作,只是指明了客戶端用來獲取token的目標URL。

當請求到來的時候,FastAPI會檢查請求的Authorization頭信息,如果沒有找到Authorization頭信息,或者頭信息的內容不是Bearer token,它會返回401狀態碼(UNAUTHORIZED)。

我們可以看一下OAuth2PasswordBearer的源碼:

class OAuth2PasswordBearer(OAuth2):
    def __init__(
        self,
        tokenUrl: str,
        scheme_name: str = None,
        scopes: dict = None,
        auto_error: bool = True,
    ):
        if not scopes:
            scopes = {}
        flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
        super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)

    async def __call__(self, request: Request) -> Optional[str]:
        authorization: str = request.headers.get("Authorization")
        scheme, param = get_authorization_scheme_param(authorization) if not authorization or scheme.lower() != "bearer": if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        return param

 

我們需要安裝PyJWT來產生和校驗JWT token。

pip install pyjwt

我們也需要安裝python-multipart,因為OAuth2需要通過表單數據來發送usernamepassword信息。

pip install python-multipart

獲取token的代碼示例如下:

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import jwt
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30


class Token(BaseModel):
    access_token: str
    token_type: str


app = FastAPI()

# oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")


# 生成token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
# 請求接口 @app.post(
"/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": "test"}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}

 

可以使用postman或者交互式文檔來測試這個接口。

 

如果單純作為API使用來獲取token值,下面這行代碼暫時是用不到的:

# oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM