flask刷新token


我們在做前后端分離的項目中,最常用的都是使用token認證。

登錄后將用戶信息,過期時間以及私鑰一起加密生成token,但是比較頭疼的就是token過期刷新的問題,因為用戶在登錄后,如果在使用過程中,突然提示token過期了,需要重新登錄,會覺得很奇怪。

我使用的方式,是在解析token的時候,如果token過期了,但是還在可刷新的時間范圍內,我們應該自動刷新token,並將新的token返回給用戶。

但是如果前端采用異步請求,同時發來了多個接口的話,我們不可能對每個請求的token都進行刷新。

我的解決方案是,將過期但還在刷新范圍的token存入redis,同時設置token的過期時間為可刷新時間,過了可刷新時間,token就會被自動刪除

當前端多個請求過來時,會對請求帶來的token進行驗證,分三種情況:

  1)如果token已經過了刷新時間,則拋出異常。

  2)如果token不在redis中,表示剛剛過期,還沒有進行刷新token操作,需要刷新token。

  3)如果token在redis中,則權限默認通過。

下面上代碼:

  1)為了給token加上可刷新時間,需要重寫TimedJSONWebSignatureSerializer 的make_header和loads方法

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer_
import redis
r = redis.Redis(host="127.0.0.1", port=6379,db=0)
class Serializer(Serializer_):
    def __init__(self, secret_key, expires_in=None, **kwargs):
        self.expires_in = expires_in
        super(Serializer, self).__init__(secret_key, expires_in, **kwargs)

    def make_header(self, header_fields):
        header = JSONWebSignatureSerializer.make_header(self, header_fields)
        iat = self.now()
        exp = iat + self.expires_in
        refresh_exp = iat+current_app.config["REFRESH_TIME"]
        header["iat"] = iat
        header["exp"] = exp
        header["refresh_exp"] = refresh_exp
        return header

    def loads(self, s, salt=None, return_header=False):
        payload, header = JSONWebSignatureSerializer.loads(
            self, s, salt, return_header=True
        )

        if "exp" not in header:
            raise BadSignature("Missing expiry date", payload=payload)

        int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
        try:
            header["exp"] = int(header["exp"])
        except ValueError:
            raise int_date_error
        if header["exp"] < 0:
            raise int_date_error
        now = self.now()
        if header["exp"] < now:
            if header["refresh_exp"]<now:
                # 已經過了可刷新時間,直接拋出異常
                raise SignatureExpired(
                    "Signature expired",
                    payload=payload,
                    date_signed=self.get_issue_date(header),
                )
            else:
                # TODO 增加判斷,看是否有存儲在redis中,如果有存儲過,表示token已經被刷新過了,直接放行即可。
                if r.get(s):
                    return payload
                pxt = header["refresh_exp"] - now
                if pxt>0:
                    r.set(s, header["exp"], px = pxt)
                # 還在可刷新時間內
                # 生成新的token返回給前端
                serializer = Serializer(current_app.config["SECRET_KEY"], expires_in=self.expires_in)
                # 調用serializer的dumps方法將uid和type寫入生成token
                token = serializer.dumps(payload)
                res = make_response()
                res.headers["Authorization"] = token
                res.set_cookie("authorization",token.decode("ascii"))
                return payload, token
        if return_header:
            return payload, header
        return payload 

  2)認證權限

auth = HTTPBasicAuth()
user = namedtuple("User",["uid","type","scope"])

@auth.verify_password
def check_authorization(token, pwd):
    user_info = check_auth_token(token)
    if not user_info:
        return False
    else:
        if isinstance(user_info, tuple):
            user_info_ = user_info[0]
            token = user_info[1]
        else:
            user_info_ = user_info
        g.user = user_info_
        return True if not token else token


def check_auth_token(token):
    serialzer = Serializer(current_app.config["SECRET_KEY"])
    try:
        s = serialzer.loads(token)
    except BadSignature:
        raise AuthFailed(msg="token is invalid", error_code=1004)
    except SignatureExpired:
        raise AuthFailed(msg="token is expired", error_code=1004)
    token = ""
    if isinstance(s, tuple):
        u_info = s[0]
        token = s[1]
    else:
        u_info = s
    uid = u_info["uid"]
    type = u_info["type"]
    scope = u_info["scope"]
    return user(uid, type, scope), token

  3)生成新的token后,將新的token放入response的header中,前端人員從response header中去取authorization

@api.router("/get/<int:uid>")
@auth.login_required
def get_user(uid, token=None):
    user = User.query.get_or_404(uid)
    res_json = jsonify(user)
    res = make_response(res_json)
    res.headers["Authorization"] = token
    return res

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM