【FastAPI 學習 八】JWT token認證登陸


JWT token認證登陸

前一篇博客講述了獲取和驗證請求參數, 這一篇就實踐下,演示一個最基礎的JWT認證,我公司是用了兩個token方式驗證,一個請求token,一個刷新token,請求token過期時間短,專門用於請求數據,刷新token專門用於刷新過期請求token用的。

jwt官網 https://jwt.io/

如果還有不懂JWT的,就需要好好看看JWT的知識了,JWT認證目前是前后端分離中非常流行的一種認證方式: 由三段組成 第一段通常是加密算法,第二段是你存儲的自定義信息(未加密任何人可以去https://jwt.io/看到數據) 第三段是 第一段和第二段生成的簽名參數確保token沒有被修改

** 更多關於FastAPI的文章,請關注個人網站 https://www.charmcode.cn/**

生成Token

依賴庫

python 目前有好幾個庫實現jwt驗證

  • python-jose
  • pyjwt
  • jwcrypto
  • authlib (ps:有幸在PyCon2019上海見過此庫作者 github

這里不做對比演示,就隨便選一個

pip install python-jose

簡單的演示

from datetime import datetime, timedelta

from jose import jwt

# 加密密鑰 這個很重要千萬不能泄露了
SECRET_KEY = "kkkkk"

# 設置過期時間 現在時間 + 有效時間    示例5分鍾
expire = datetime.utcnow() + timedelta(minutes=5)

# exp 是固定寫法必須得傳  sub和uid是自己存的值
to_encode = {"exp": expire, "sub": str(123), "uid": "12345"}

# 生成token 
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
print(encoded_jwt) 
# eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTU1MDg5MzQsInN1YiI6IjEyMyIsInVpZCI6IjEyMzQ1In0.lttAYe808lVQgGhL9NXei2bbC1LIGs-SS0l6qfU_QxU

可以復制去 https://jwt.io/ 解出來看看
在這里插入圖片描述

解密token

payload = jwt.decode(
            "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTU1MDg5MzQsInN1YiI6IjEyMyIsInVpZCI6IjEyMzQ1In0.lttAYe808lVQgGhL9NXei2bbC1LIGs-SS0l6qfU_QxU",
            SECRET_KEY, algorithms="HS256"
        )
print(payload)
# {'exp': 1595508934, 'sub': '123', 'uid': '12345'}

正確的解密方式

上述方式是token什么都是正確的時候,而且還沒有過期,就會正常解出來。現在加上常見的異常捕獲。

from jose.exceptions import ExpiredSignatureError, JWTError
try:
    payload = jwt.decode(
                "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTU1MDk0ODQsInN1YiI6IjEyMyIsInVpZCI6IjEyMzQ1In0.deulPSOPfON-lfbXtvQfTfc-DwqvFoQqv7Y1BhMecBw",
                SECRET_KEY, algorithms="HS256"
            )
    print(payload)
# 當然兩個異常捕獲也可以寫在一起,不區分
except ExpiredSignatureError as e:
    print("token過期")
except JWTError as e:
    print("token驗證失敗")

在FastAPI中實現JWT認證登陸

上述的jwt加密解密的過程搞清楚了,這一步就很簡單了
首先創建一個security.py文件專門進行加密解密的

from datetime import datetime, timedelta
from typing import Any, Union, Optional
from jose import jwt
from fastapi import Header
# 導入配置文件
from setting import config

ALGORITHM = "HS256"


def create_access_token(
    subject: Union[str, Any], expires_delta: timedelta = None
) -> str:
    """
    # 生成token
    :param subject: 保存到token的值
    :param expires_delta: 過期時間
    :return:
    """
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(
            minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES
        )
    to_encode = {"exp": expire, "sub": str(subject)}
    encoded_jwt = jwt.encode(to_encode, config.SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


def check_jwt_token(
     token: Optional[str] = Header(...)
) -> Union[str, Any]:
    """
    解析驗證 headers中為token的值 當然也可以用 Header(..., alias="Authentication") 或者 alias="X-token"
    :param token:
    :return:
    """

    try:
        payload = jwt.decode(
            token,
            config.SECRET_KEY, algorithms=[ALGORITHM]
        )
        return payload
    except (jwt.JWTError, jwt.ExpiredSignatureError, AttributeError):
        # 拋出自定義異常, 然后捕獲統一響應
        raise custom_exc.TokenAuthError(err_desc="access token fail")

上面一定定義好了, 加密和解密token的方式,這一步來登陸生成token

# 從剛剛定義好jwt的文件導入生成方法
from security import create_access_token
from pydantic import BaseModel


class UserInfo(BaseModel):
    username: str
    password: str


@router.post("/login/access-token", summary="用戶登錄認證")
async def login_access_token(
        *,
        db: Session = Depends(deps.get_db),
        user_info: UserInfo,
) -> Any:
    """
    用戶登錄
    :param db:
    :param user_info:
    :return:
    """

    # 驗證用戶賬號密碼是否正確
     user = curd_user.authenticate(db, email=user_info.username, password=user_info.password)
    if not user:
        logger.info(f"用戶郵箱認證錯誤: email{user_info.username} password:{user_info.password}")
        return response_code.resp_500(message="用戶名或者密碼錯誤")
    elif not curd_user.is_active(user):
        return response_code.resp_500(message="用戶郵箱未激活")
    
    # 如果用戶正確通過 則生成token
    # 設置過期時間
    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)

    # 登錄token 只存放了user.id
    return response_code.resp_200(data={
        "token": create_access_token(user.id, expires_delta=access_token_expires),
    })

驗證token

這一步是使用 from fastapi import Depends 來驗證 headers中的token
在上面security.py文件中有定義取headers中的token參數

from typing import Any, Union

from fastapi import Depends
# 從剛剛定義好jwt的文件導入解密方法
from security import check_jwt_token

@router.get("/user/info", summary="獲取用戶信息", response_model=user.UserInfo)
async def get_user_info(
	token_data: Union[str, Any] = Depends(check_jwt_token)  
) -> Any:
    """
    獲取用戶信息
    :param token_data:
    :return:
    """
    print(token_data)
    # 這個狀態能響應說明token驗證通過
    return response_code.resp_200(data={
        "username": "用戶信息"
    })

所以正確的請求方式應該是這樣的,在headers中攜帶token字段, 再次重述也可以再check_jwt_token方法中給token取別名,最常見的如Authentication
為什么不在check_jwt_token參數中直接寫Authentication 了?
因為參數寫成大寫字母開頭不符合python 編程pep8規范,還有就是X-Token的這種,變量不支持-符號,所以寫成別名。

import requests

res = requests.get("http://127.0.0.1:8000/user/info", headers={
    "token": "xxxx",
    "content-type": "application/json"
})

總結

熟悉了前半部分的jwt token生成與解密的方式,就可以在任何Python框架(Django,Flask,Tornado,Sanic, Bottle等等)里面套用,很多封裝的擴展,本質也是這個,一般不喜歡用擴展。

jwt認證其實非常簡單,搞清楚加密解密的過程,原理稍微懂點就可以了,
后面結合redis, 可以完成單點登陸等操作。
還有就是token續簽的問題,比如你正寫博客了,博客還沒寫完token失效了,結果提交的時候token認證失敗了,就需要token心跳檢測續簽了。

完整代碼GitHub地址

見個人網站 https://www.charmcode.cn/article/2020-07-23_fastapi_jwt


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM