PyJWT是一個Python庫,用來編碼/解碼JWT(JSON Web Token)的。
1:安裝PyJWT
2: 直接上代碼了:
import datetime, jwt, time from app.dao.userDao import UserDao from flask import jsonify from .. import common class Auth(): @staticmethod def encode_auth_token(user_id, login_time): """ 生成認證Token :param user_id: int :param login_time: int(timestamp) :return: string """ try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10), 'iat': datetime.datetime.utcnow(), 'iss': 'ken', 'data': { 'id':user_id, 'login_time': login_time } } return jwt.encode( payload, 'secret', algorithm='HS256' ) except Exception as e: return e @staticmethod def decode_auth_token(auth_token): """ 驗證Token :param auth_token: :return: integer|string """ try: payload = jwt.decode(auth_token, 'secret', options= {'verify_exp':False}) if ('data' in payload and 'id' in payload['data']): return payload else: raise jwt.InvalidTokenError except jwt.ExpiredSignatureError: return "Token過期" except jwt.InvalidTokenError: return "無效的Token" def authenticate(self, username, password): """ 用戶登錄,登錄成功返回token,寫將登錄時間寫入數據庫;登錄失敗返回失敗原因 :param password: :return: json """ userDao = UserDao() user = userDao.search(username) if (user is None): return jsonify(common.falseReturn('', '找不到用戶')) else: if (user.password == password): login_time = int(time.time()) token = self.encode_auth_token(user.username, login_time) return jsonify(common.trueReturn(token.decode(), '登陸成功')) else: return jsonify(common.falseReturn('', '密碼不正確')) def identify(self, request): """ 用戶鑒權 :return: list """ auth_header = request.headers.get('Authorization') if (auth_header): auth_tokenArr = auth_header.split(" ") if (not auth_tokenArr or auth_tokenArr[0]!= 'jwt' or len(auth_tokenArr) != 2 ): result = common.falseReturn('','請傳遞正確的驗證頭信息') else: auth_token = auth_tokenArr[1] payload = self.decode_auth_token(auth_token) if not isinstance(payload, str): userDao = UserDao() user = userDao.search(payload['data']['id']) if (user is None): result = common.falseReturn('', '找不到該用戶信息') else: result = common.trueReturn('', '請求成功') else: result = common.falseReturn('', payload) else: result = common.falseReturn('','沒有提供認證token') return result
代碼說明:
authenticate: 根據用戶名/密碼,到DB中進行校驗,如果是合法的用戶名/密碼,調用encode_auth_token生成token返回;username加入到token的payload中。
encode_auth_token: 生成token的函數,payload可以存儲一些不敏感的信息,比如用戶名等,但是不能存密碼;還有指定簽名算法和秘鑰。
identify: 用戶的請求需要攜帶token信息,這個函數對request進行校驗,調用decode_auth_token完成的校驗。
decode_auth_token: 調用jwt.decode進行token校驗(要指定秘鑰,秘鑰和生成token的秘鑰一樣)
3: 攔截所有的請求,都進行token校驗
@app.before_request
def before_request():
Auth.identify(Auth,request )