VUE Flask登錄的初探-JWT的探索


上回簡單實現了基於JWT的登錄,並且留下了一些問題,jwt天生的弊端。本次用某些邏輯解決jwt的弊端

先列舉jwt可能遇到的問題:

1.注銷問題,當客戶端注銷登錄后,token在有效期內依然有效,實際上從服務端無法讓token失效
2.修改密碼,當用戶修改了密碼,按常規需要讓前次token失效。
3.續簽問題,jwt雖然有超時機制,但沒有實現自動續簽。

為了解決上述三個問題,我考慮用如下手段:

1.注銷問題,我將對每個用戶,維護一個sessionID,作為此用戶本次會話的有效ID,當注銷用戶后,服務端將此sessionID 刪除。於是服務端就能主動控制token的有效性

2.修改密碼問題,我將對用戶ID做一個替代ID,此ID生成邏輯是由用戶真實ID和用戶密碼加密產生,由此當用戶修改密碼后,替代ID將自動被更新,由此,客戶端的TokenID,無法在服務端查詢到,於是失效。

3.續簽問題,解決此事,只能不停刷新和下發token。

以上方案都擴大服務器資源開銷。

下面是代碼。

前端代碼,

1.對login.vue代碼做了微調。添加了修改密碼的按鈕

2.對request.js,添加了response攔截器,token的保存到cookie邏輯,遷移到此處

 

<template>
  <div class='login'>
    <h1>{{ titleMsg }}</h1>
    <el-form ref="loginForm" :model="loginData" label-width="100px">
      <el-form-item label="用戶名" prop="username" :rules="[{required: true, message: '用戶名不能為空'}]">
        <el-input ref="username" v-model="loginData.username" autocomplete="off"></el-input>
      </el-form-item>
      <el-form-item label="密碼" prop="password" :rules="[{required: true, message: '密碼不能為空'}]">
        <el-input type="password" v-model="loginData.password" autocomplete="off"></el-input>
      </el-form-item>      
      <el-form-item>
        <el-button type="primary" @click="loginForm('loginForm')">提交</el-button>
        <el-button @click="resetForm('loginForm')">重置</el-button>
      </el-form-item>
      <el-form-item>
        <el-button @click="testForm()">測試</el-button>
        <el-button @click="logoutForm()">登出</el-button>
      </el-form-item>
      <el-form-item>
        <el-button @click="changePws()">修改密碼為456</el-button>
      </el-form-item>
    </el-form>
  </div>
</template>

<script>
import qs from 'qs'
import service from '../utils/request'
export default {
  name: 'loginForm',
  data() {
    return {
      titleMsg: '歡迎來到旗幟世界',
      loginData: {
        username: '',
        password: ''
      }
    }
  },
  methods: {
    loginForm(formName) {
      this.$refs[formName].validate((valid) => {
         if (valid) {
           service({url: '/login',method: 'post',data: qs.stringify(this.loginData)})
             .then(response => {
               const { data } = response
               //Cookies.set('Authorization',data.data.token)
               alert('submit!!!' +'\n'+ data.msg)
             })
             .catch(error => {
               console.log(error)
             })
         } else {
           console.log('illegad submit!!');
           return false;
        }
      })
    },
    testForm() {
      service({url: '/first',method: 'get'})
        .then(response => {
          const { data } = response
          alert('firstPage!!!' +'\n'+ data.data.tips)
        })
        .catch(error => {
          console.log(error)
        })
    },
    logoutForm() {
      service({url: '/logout',method: 'get'})
        .then(response => {
          const { data } = response
          alert('logout!!!' +'\n'+ data.data.tips)
        })
        .catch(error => {
          console.log(error)
        })
    },
    changePws() {
      service({url: '/changepws',method: 'get'})
        .then(response => {
          const { data } = response
          alert('changepws!!!' +'\n'+ data.data)
        })
        .catch(error => {
          console.log(error)
        })
    },
  }
}
</script>
Login.vue
import axios from 'axios'
import Cookies from 'js-cookie'


/****** 創建axios實例 ******/
const service = axios.create({
  baseURL: 'http://localhost:5000',  // api的base_url
  timeout: 5000  // 請求超時時間
})

service.interceptors.request.use(
  config => {
    config.headers['Authorization'] = Cookies.get('Authorization')
    return config
  },
  error => {
    console.log(error)
    return Promise.reject(error)
  }
)

/****** respone攔截器==>對響應做處理 ******/
service.interceptors.response.use(
  response => {
    const { data } = response    
    if (isJSON(data.data)) {
      let jsonObj = JSON.parse(JSON.stringify(data.data))
      if (Object.prototype.hasOwnProperty.call(jsonObj,"token")){
        console.log('update token')
        Cookies.set('Authorization',jsonObj.token)
      }
    } else {
      console.log('not')
    }
    return response
  },
  error => {
    console.log(error);
    return Promise.reject(error)
  }
)

function isJSON(str) {
  let jsonData = JSON.stringify(str)
  try {
    if (typeof JSON.parse(jsonData) == "object") {
      return true;
    } else {
      return false;
    }
  } 
  catch(e) {
    return false;
  }
}

export default service;
request.js

后端代碼,改動還是蠻大,

1.login.py 將更多無關邏輯下發給其他py文件,保持login.py的純粹性,只解決登錄相關問題

2.user.py 用戶操作的所有代碼,均由此單元負責。

3.jwt_token.py 僅僅是將超時時間,改為入參。

import time
import json
from flask import Blueprint,request
from flask_login import LoginManager,login_user,logout_user,login_required,current_user
from user import UserLogin,Operaters

login_page = Blueprint('login_page',__name__)

login_manager = LoginManager()
login_manager.login_view = None

@login_page.record_once
def on_load(state):
  login_manager.init_app(state.app)

# @login_manager.user_loader
# def load_user(user_id):
#   return User.get(user_id)

@login_manager.request_loader
def load_user_from_request(request):
  token = request.headers.get('Authorization')
  if token == None:
    return None

  payload = UserLogin.verfiyToken(token)
  if payload != None:
    alternativeID = payload['data']['alternativeID']
    sessionID = payload['data']['sessionID']
    user = UserLogin.queryUser(alternativeID=alternativeID,sessionID=sessionID)
  else:
    user = None
  return user


@login_page.route('/first')
@login_required
def firstPage():
  returnData = {'code': 0, 'msg': 'success', 'data': {'token':current_user.token,'tips':'First Blood(來自' + current_user.userName +')'}}
  return returnData,200

@login_page.route('/login', methods=['POST'])
def login():
  if request.method == 'POST':
    username = request.form['username']
    password = request.form['password']
    user = UserLogin.queryUser(userName = username)
    if (user != None) and (user.verifyPassword(password)) :
      login_user(user)
      returnData = {'code': 0, 'msg': 'success', 'data': {'token':user.token}}
      return json.dumps(returnData),200
    else :
      returnData = {'code': 1, 'msg': 'failed', 'data': {'tips':'username or password is not correct'} }
      return json.dumps(returnData),200  

@login_page.route('/logout') 
@login_required
def logout():
  userName = current_user.userName
  alternativeID = current_user.alternativeID
  sessionID = current_user.sessionID
  UserLogin.dropSessionID(alternativeID,sessionID)
  logout_user()
  returnData = {'code': 0, 'msg': 'success', 'data': {'tips':'Bye ' + userName} }
  return json.dumps(returnData),200  


@login_page.route('/changepws') 
@login_required
def changePws():
  user = UserLogin.queryUser(userID = current_user.id)  
  user.changePws()
  returnData = {'code': 0, 'msg': 'success', 'data': {'tips':'password was changed'} }
  return json.dumps(returnData),200  
login.py
import uuid
from flask_login import UserMixin
from werkzeug.security import check_password_hash,generate_password_hash
from jwt_token import genToken,verfiyToken
from datetime import datetime,timedelta

Operaters = [
    {
        "id": 1,
        "name": "admin",
        "password": generate_password_hash('123'),
        "alternativeID": generate_password_hash('1'+generate_password_hash('123')),
        "sessionIDs": []
    },
    {
        "id": 2,
        "name": "李四",
        "password": generate_password_hash('123'),
        "alternativeID": generate_password_hash('2'+generate_password_hash('123')),
        "sessionIDs": []
    },    
]

class UserLogin(UserMixin):
    def __init__(self,operater):
        self.id = operater.get("id")
        self.userName = operater.get("name")
        self.passwordHash = operater.get("password")
        self.alternativeID = operater.get("alternativeID")
        self.sessionID = None
        self.token = None
        self.oper = operater

        exp = datetime.utcnow() + timedelta(seconds=10)
        self.genSessionID(exp)
        self.genToken(exp)

        clearOvertimeSeesionID(operater)

    def genSessionID(self,exp,sessionID=None):
        if sessionID == None:
            self.sessionID = str(uuid.uuid4())            
            self.oper["sessionIDs"].append({'id':self.sessionID,'exp':exp})
        else:            
            self.sessionID = sessionID

    
    def verifyPassword(self,password):
        if self.passwordHash is None:
            return False
        return check_password_hash(self.passwordHash,password)

    def get_id(self):
        return self.id

    def get(user_id):
        if not user_id:
            return None
        for oper in Operaters:
            if str(oper.get('id')) == str(user_id) :
                return User(oper)
        return None

    def clearOvertimeSeesionID(operater):        
        for userSessionID in operater["sessionIDs"][::-1]:
            if userSessionID['exp'] < datetime.utcnow() :
                operater["sessionIDs"].remove(userSessionID)
    
    @staticmethod
    def queryUser(**kwargs):
        if 'userID' in kwargs:
            return UserLogin.queryUserByID( kwargs['userID'])
        elif 'userName' in kwargs:
            return UserLogin.queryUserByName(kwargs['userName'])
        elif ('alternativeID' in kwargs) and ('sessionID' in kwargs):
            return UserLogin.queryUserBySessionID(kwargs['alternativeID'],kwargs['sessionID'])
        else:
            return None
    

    @staticmethod
    def queryUserByID(userID):
        for oper in Operaters:
            if (oper.get('id') == userID) :
                user = UserLogin(oper)
                return user
        return None

    @staticmethod
    def queryUserByName(username):
        for oper in Operaters:
            if (oper.get('name') == username) :
                user = UserLogin(oper)
                return user
        return None

    @staticmethod
    def queryUserBySessionID(alternativeID,sessionID):
        exists = False
        for oper in Operaters:
            if (oper.get('alternativeID') == alternativeID) :
                sessionIDs = oper["sessionIDs"]
                exists = True
                break
        
        if exists:
            exists = False
            for userSessionID in sessionIDs:
                if (userSessionID['id'] == sessionID) :
                    exists = True
                    break

        if exists: 
            user = UserLogin(oper)
            return user
        else:
            return None

    @staticmethod
    def dropSessionID(alternativeID,sessionID):
        exists = False
        for oper in Operaters:
            if (oper.get('alternativeID') == alternativeID) :
                sessionIDs = oper["sessionIDs"]
                exists = True
                break
        
        if exists :
            exists = False
            for userSessionID in sessionIDs:
                if (userSessionID['id'] == sessionID) :
                    exists = True
                    break
        if exists :
            sessionIDs.remove(userSessionID)

    @staticmethod
    def verfiyToken(token):
        verfiyed = verfiyToken(token)
        if verfiyed :
            return True        

        for oper in Operaters:
            clearOvertimeSeesionID(oper)

        return verfiyed


    def changePws(self):
        for oper in Operaters:
            if (oper.get('id') == self.id) :              
                oper['password'] = generate_password_hash('456')
                oper['alternativeID'] = generate_password_hash('1'+generate_password_hash('456'))
                return true
        return False

    def genToken(self,exp):
        token = genToken(exp,{'alternativeID':self.alternativeID,'sessionID':self.sessionID})
        self.token = token
        return token

    
user.py
import jwt
from jwt import PyJWTError
from datetime import datetime,timedelta

SECRECT_KEY = b'\x92R!\x8e\xc6\x9c\xb3\x89#\xa6\x0c\xcb\xf6\xcb\xd7\xbc'


def genToken(exp,data):
  payload = {
    'exp': exp,
    'data': data 
    }
  token = jwt.encode(payload,key= SECRECT_KEY,algorithm= 'HS256')
  return bytes.decode(token)

def verfiyToken(tokenStr):
  try:
    tokenBytes =  tokenStr.encode('utf-8')
    payload = jwt.decode(tokenBytes,key= SECRECT_KEY,algorithm= 'HS256')
    return payload
  except PyJWTError as e:
    print("jwt驗證失敗: %s" % e)
    return None
jwt_token.py

 

題外話,學習python已經有幾個月了,這個語言還是蠻有趣,相對來說,沒有那么多約束,所以代碼看起來比較隨性。未來的日子,需要逐漸找出合適python的項目結構以及格式規范。

下一個話題,將步入數據庫方面,我會把用戶字典創建到數據庫中,並用python+sql操作用戶表。前端+后端+數據庫,如此以來一套基本的項目就搭建完畢。

再有空就可以把緩存機制搭建起來。甚至是docker。現在程序員需要的知識量越來越繁雜,多而不精。這樣真的好么?我沒有答案。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM