FastAPI安全系列(二) 基於Password和Bearer Token的OAuth2 .0認證


一、獲取username和password

 后台獲取前台提交的username和password,可以使用FastAPI的安全實用性工具獲取username和password。

 OAuth2規定客戶端必需將username和password字段作為表單數據發送(不可使用JSON)。而且規范了字段必須這樣命名。

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    pass

OAuth2PasswordRequestForm是一個類依賴項,聲明了如下的請求表單:

  • username
  • password
  • 可選scope字段,是一個由空格分隔的字符串組成的大字符串
  • 可選的grant_type字段
  • 可選的client_id字段
  • 可選的 client_secret字段

現在使用表單中的username從(偽)數據庫中獲取用戶數據,如果沒有這個用戶就返回一個錯誤。若是獲取到這個用戶,先進行Pydantic UserInDB模型,然后進行密碼密文校驗。

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# 偽數據庫
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


def fake_hash_password(password: str):
    return "fakehashed" + password


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)  # 從數據庫中取出用戶信息
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    user = UserInDB(**user_dict)  # 進行用戶模型校驗
    hashed_password = fake_hash_password(form_data.password)  # 將表單密碼進行hash加密
    if not hashed_password == user.hashed_password:  # 表單加密后的密碼與數據庫中的密碼進行比對
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    return {"access_token": user.username, "token_type": "bearer"}

最后返回一個JSON數據的token令牌,該JSON對象包含:

  •  token_type   因為實例中使用了Bearer令牌,所以令牌類型為Bearer
  • access_token   令牌字符串

二、獲取活躍用戶信息

這里已經完成了令牌的獲取功能,與之前的獲取當前用戶的功能進行合並:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

oauth2_schema = OAuth2PasswordBearer(tokenUrl="/token") # 執行生成token的地址 # 偽數據庫
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}


def fake_hash_password(password: str):
    return "fakehashed" + password


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)  # 從數據庫中取出用戶信息
    if not user_dict:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    user = UserInDB(**user_dict)  # 進行用戶模型校驗
    hashed_password = fake_hash_password(form_data.password)  # 將表單密碼進行hash加密
    if not hashed_password == user.hashed_password:  # 表單加密后的密碼與數據庫中的密碼進行比對
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password!")
    return {"access_token": user.username, "token_type": "bearer"}


def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def fake_decode_token(token: str): user = get_user(fake_users_db, token) return user async def get_current_user(token: str = Depends(oauth2_schema)): user = fake_decode_token(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authenticate credentials", headers={"WWW-Authentiacate": "Bearer"} ) return user


async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user @app.get("/users/me") async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user

在生成token的基礎上進行活躍用戶信息的獲取:

  • 首先需要將生成token的地址進行指向實際生成token的路徑操作函數
  • 獲取活躍用戶請求時就會去獲取認證的token
  • 一旦獲取到token就與正常的API請求一樣

具體的表現是在交互文檔中:

  • 先點擊右上角的Authorize按鈕進行認證,此時后台會生成一個JSON對象,內含access_token字段
  • 前台獲取到 這個access_token,將其存到某處
  • 點擊下面的路徑操作,如獲取活躍用戶的API,會在請求頭中攜帶該token進行認證,以及獲取相應的用戶信息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM