一、什么是Token?
Token是服務端生成的一串字符串,以作客戶端進行請求的一個令牌,當第一次登錄后,服務器會生成一個Token並將此Token返回給客戶端,以后客戶端只需帶上這個Token前來請求數據即可,無需再次帶上用戶名和密碼。
二、為什么要使用Token?
在很多項目案例中,需要實現賬戶的功能,客戶端所有的功能都基於用戶已登陸的前提下才可以使用。這就要求每次客戶端像服務器請求數據時都要驗證賬戶是否正確,如果正確則按正常方式返回數據,如果錯誤則進行攔截並返回錯誤信息。但是當客戶端頻繁向服務器請求數據的話,每次服務器都要頻繁地查詢數據庫。而Token正是為了減輕服務器的壓力,減少頻繁的查詢數據庫,使服務器更加健壯。並取代傳統使用session的方法來進行驗證。
三、JWT json-web-token
1.三大組成
1,header
格式為字典,元數據格式如下
{'alg':'HS256', 'typ':'JWT'} #alg代表要使用的 算法 #typ表明該token的類別 - 此處必須為 大寫的 JWT
該部分數據需要轉成json串並用base64加密
2,payload
格式為字典,此部分分為公有聲明和私有聲明
公共聲明:JWT提供了內置關鍵字用於描述常見的問題
此部分均為可選項,用戶根據自己需求 按需添加key,常見公共聲明如下:
{'exp':xxx, # Expiration Time 此token的過期時間的時間戳 'iss':xxx,# (Issuer) Claim 指明此token的簽發者 'aud':xxx, #(Audience) Claim 指明此token的 'iat':xxx, # (Issued At) Claim 指明此創建時間的時間戳 'aud':xxx, # (Audience) Claim 指明此token簽發面向群體 }
私有聲明:用戶可根據自己業務需求,添加自定義的key,例如如下:
{'username': 'guoxiaonao'}
公共聲明和私有聲明均在同一個字典中;轉成json串並用base64加密
簽名規則如下:
根據header中的alg確定具體算法,以下用 HS256為例
HS256(自定義的key , base64后的header + '.' + base64后的payload)
解釋:用自定義的key, 對base64后的header + '.' + base64后的payload進行hmac計算
base64(header) + '.' + base64(payload) + '.' + base64(sign)
最終結果如下: b'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Imd1b3hpYW9uYW8iLCJpc3MiOiJnZ2cifQ.Zzg1u55DCBqPRGf9z3-NAn4kbA-MJN83SxyLFfc5mmM'
3,校驗jwt規則
1,解析header, 確認alg
3,獲取payload自定義內容
4,pyjwt
方法 | 參數說明 | 返回值 |
---|---|---|
encode(payload, key, algorithm) | payload: jwt三大組成中的payload,需要組成字典,按需添加公有聲明和私有聲明 例如: {'username': 'guoxiaonao', 'exp': 1562475112} 參數類型: dict | token串 返回類型:bytes |
key : 自定義的加密key 參數類型:str | ||
algorithm: 需要使用的加密算法[HS256, RSA256等] 參數類型:str | ||
decode(token,key,algorithm,) | token: token串 參數類型: bytes/str | payload明文 返回類型:dict |
key : 自定義的加密key ,需要跟encode中的key保持一致 參數類型:str | ||
algorithm: 同encode | ||
issuer: 發布者,若encode payload中添加 'iss' 字段,則可針對該字段校驗 參數類型:str | 若iss校驗失敗,則拋出jwt.InvalidIssuerError | |
audience:簽發的受眾群體,若encode payload中添加'aud'字段,則可針對該字段校驗 參數類型:str | 若aud校驗失敗,則拋出jwt.InvalidAudienceError |
老師手寫的Jwt類,很厲害:

import base64 import copy import hmac import json import time class Jwt(): def __init__(self): pass @staticmethod def encode(payload, key, exp=300): #init header header = {'typ': 'JWT', 'alg': 'HS256'} #separators - 指定序列化后的json串格式, 第一個參數 #指每個鍵值對之間的連接符號,第二個參數指的是每一個鍵值對中鍵和值之間的連接符號 #sort_keys - 將序列化后的字符串進行排序 header_json = json.dumps(header, separators=(',',':'), sort_keys=True) #生成b64 header header_bs = Jwt.b64encode(header_json.encode()) #參數中的 payload {'username': 'aaa'} payload = copy.deepcopy(payload) #添加公有聲明 - exp 且值為未來時間戳 payload['exp'] = int(time.time()) + exp payload_json = json.dumps(payload, separators=(',',':'), sort_keys=True) payload_bs = Jwt.b64encode(payload_json.encode()) #簽名 #判斷傳入的key的類型 if isinstance(key, str): key = key.encode() hm = hmac.new(key, header_bs + b'.' + payload_bs, digestmod='SHA256') hm_bs = Jwt.b64encode(hm.digest()) return header_bs + b'.' + payload_bs + b'.' + hm_bs @staticmethod def b64encode(j_s): #替換生成出來的b64串中的占位符 = return base64.urlsafe_b64encode(j_s).replace(b'=',b'') @staticmethod def b64decode(b64_s): rem = len(b64_s) % 4 if rem > 0: b64_s += b'=' * (4-rem) return base64.urlsafe_b64decode(b64_s) @staticmethod def decode(token, key): #校驗兩次HMAC結果 #檢查exp公有聲明的有效性 #注意 b64 = 要補全 #校驗成功 返回 payload 字典對象, 失敗的話 raise header_b , payload_b , sign = token.split(b'.') if isinstance(key, str): key = key.encode() #比較兩次HMAC結果 hm = hmac.new(key, header_b + b'.' + payload_b, digestmod='SHA256') if sign != Jwt.b64encode(hm.digest()): raise JwtSignError('---sign error !!! ') #獲取payload payload_json = Jwt.b64decode(payload_b) payload = json.loads(payload_json.decode()) #校驗exp是否過期 exp = payload['exp'] now = time.time() if now > exp: #過期 raise JwtExpireError('---The token is expire !!!') return payload class JwtSignError(Exception): def __init__(self, error_msg): self.error_msg = error_msg def __str__(self): return '<JwtSignError is %s>'%(self.error_msg) class JwtExpireError(Exception): def __init__(self, error_msg): self.error_msg = error_msg def __str__(self): return '<JwtExpireError is %s>' % (self.error_msg) if __name__ == '__main__': s = Jwt.encode({'username':'guoxiaonao'}, 'abcde') #time.sleep(2) #res = Jwt.decode(s, 'abcde') print(s)