基於Python-Flask的權限管理4:基於Token的登錄


一、前言

登錄功能是每個系統的基礎,本篇實現了基於token的用戶登錄和請求權限控制。

二、數據庫model

models文件夾下新建user.py,創建ORM實體類

# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@Author         :  Huguodong
@Version        :  
------------------------------------
@File           :  user.py
@Description    :  用戶表
@CreateTime     :  2020/3/7 14:45
------------------------------------
@ModifyTime     :  
"""
import hashlib

from sqlalchemy import func

from db import db
from models.BaseModel import BaseModel


class User(BaseModel):
    """
    用戶表
    """
    __tablename__ = "t_user"
    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="用戶ID")
    nickname = db.Column(db.String(30), comment="用戶昵稱")
    user_name = db.Column(db.String(30), comment="登錄賬號")
    user_type = db.Column(db.Boolean, default=1, comment="用戶類型(1系統用戶")
    email = db.Column(db.String(50), comment="用戶郵箱")
    phone = db.Column(db.String(20), comment="手機號")
    phonenumber = db.Column(db.String(11), comment="手機號碼")
    sex = db.Column(db.INTEGER, default=1, comment="用戶性別(1男 2女 3未知)")
    avatar = db.Column(db.String(100), comment="頭像路徑")
    password = db.Column(db.String(50), comment="密碼")
    salt = db.Column(db.String(20), comment="鹽加密")
    status = db.Column(db.INTEGER, default=1, comment="帳號狀態(1正常 2禁用")
    dept_id = db.Column(db.INTEGER, comment="部門id")
    del_flag = db.Column(db.INTEGER, default=1, comment="刪除標志(1代表存在 2代表刪除)")
    login_ip = db.Column(db.String(50), comment="最后登陸IP")
    login_date = db.Column(db.TIMESTAMP, comment="最后登陸時間", nullable=False,
                           onupdate=func.now())

    def check_password(self, passwd):
        '''
        檢查密碼
        :param passwd:
        :return: 0/1
        '''
        # 創建md5對象
        m = hashlib.md5()
        b = passwd.encode(encoding='utf-8')
        m.update(b)
        str_md5 = m.hexdigest()
        if self.password == str_md5:
            return 1
        else:
            return 0

 

二、創建藍圖

1.permission文件夾下新建藍圖文件user.py

from permission import *

user = Blueprint('user', __name__)

 

2.app.py注冊藍圖

app.register_blueprint(user.user, url_prefix='/api/user')

 3.寫一個測試方法

@user.route('/test', methods=["GET"])
def test():
    return SUCCESS()

4.瀏覽器輸入http://127.0.0.1:5000/api/user/test

 

二、實現token方法

用token校驗身份,是前后端交互的常用方式。
它有以下特性:

    • 會失效
    • 加密
    • 可以根據它拿到用戶的信息

 

關於token的方法寫在utils下的common.py下

1.生成token:生成方式( 內部配置的私鑰+有效期+用戶的id +用戶名+用戶角色列表) 

def create_token(user_id, user_name, role_list):
    '''
    生成token
    :return: token
    '''
    # 第一個參數是內部的私鑰,這里寫在共用的配置信息里了,如果只是測試可以寫死
    # 第二個參數是有效期(秒)
    s = Serializer(config.SECRET_KEY, expires_in=config.EXPIRES_IN)
    # 接收用戶id轉換與編碼
    token = None
    try:
        token = s.dumps({"id": user_id, "name": user_name, "role": role_list}).decode("ascii")
    except Exception as e:
        app.logger.error("獲取token失敗:{}".format(e))
    return token

 2.校驗token:校驗接收到的token,如果成功返回用戶信息,否則返回None

def verify_token(token):
    '''
    校驗token
    :param token:
    :return: 用戶信息 or None
    '''
    # 參數為私有秘鑰,跟上面方法的秘鑰保持一致
    s = Serializer(config.SECRET_KEY)
    try:
        # 轉換為字典
        data = s.loads(token)
        return data
    except Exception as e:
        return None

3.有很多接口是必須登錄才能操作的,最好的方式就是在寫一個裝飾器,添加在需要的api上

def login_required(*role):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            try:
                # 在請求頭上拿到token
                token = request.headers["Authorization"]
            except Exception as e:
                # 沒接收的到token,給前端拋出錯誤
                return jsonify(code=Code.NO_PARAMETER.value, msg='缺少參數token')
            s = Serializer(config.SECRET_KEY)
            try:
                user = s.loads(token)
                if role:
                    # 獲取token中的權限列表如果在參數列表中則表示有權限,否則就表示沒有權限
                    user_role = user['role']
                    result = [x for x in user_role if x in list(role)]
                    if not result:
                        return jsonify(code=Code.ERR_PERMISSOM.value, msg="權限不夠")
            except Exception as e:
                return jsonify(code=Code.LOGIN_TIMEOUT.value, msg="登錄已過期")
            return func(*args, **kw)
        return wrapper
    return decorator

這樣只需在方法上添加裝飾器就能限制用戶訪問

#限制只有admin可以操作
@user.route('/xxx', methods=["POST"])
@login_required(‘admin’)
def xxx()
    return xx

二、用戶登錄

點擊前端頁面的登錄按鈕,會請求登錄接口

 

 

 

persmisson文件夾下的user.py新建登錄方法

@user.route('/login', methods=["POST"])
def login():
    '''
    用戶登錄
    :return:token
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    # 獲取前端傳過來的參數
    username = res_dir.get("username")
    password = res_dir.get("password")
    # 校驗參數
    if not all([username, password]):
        return jsonify(code=Code.NOT_NULL.value, msg="用戶名和密碼不能為空")
    try:
        user = User.query.filter_by(user_name=username).first()
    except Exception as e:
        app.logger.error("login error:{}".format(e))
        return jsonify(code=Code.REQUEST_ERROR.value, msg="獲取信息失敗")
    if user is None or not user.check_password(password) or user.del_flag == 2 or user.status==2 :
        return jsonify(code=Code.ERR_PWD.value, msg="用戶名或密碼錯誤")

    # 獲取用戶信息,傳入生成token的方法,並接收返回的token
    # 獲取用戶角色
    user_role = Role.query.join(User_Role, Role.id == User_Role.role_id).join(User,
                                                                              User_Role.user_id == user.id).filter(
        User.id == user.id).all()
    role_list = [i.role_key for i in user_role]
    token = create_token(user.id, user.user_name, role_list)
    data = {'token': token, 'userId': user.id, 'userName': user.user_name, 'nickname': user.nickname}
    # 記錄登錄ip將token存入rerdis
    try:
        user.login_ip = request.remote_addr
        user.update()
        Redis.write(f"token_{user.user_name}", token)

    except Exception as e:
        return jsonify(code=Code.UPDATE_DB_ERROR.value, msg="登錄失敗")
    if token:
        # 把token返回給前端
        return jsonify(code=Code.SUCCESS.value, msg="登錄成功", data=data)
    else:
        return jsonify(code=Code.REQUEST_ERROR.value, msg="請求失敗", data=token)

二、用戶注銷

persmisson文件夾下的user.py新建注銷方法

@user.route('/logout', methods=["POST"])
@login_required()
def logout():
    '''
    注銷方法:redis刪除token
    :return:
    '''
    try:
        token = request.headers["Authorization"]
        user = verify_token(token)
        if user:
            key = f"token_{user.get('name')}"
            redis_token = Redis.read(key)
            if redis_token:
                Redis.delete(key)
            return SUCCESS()
        else:
            return AUTH_ERR()
    except Exception as e:
        app.logger.error(f"注銷失敗")
        return REQUEST_ERROR()

三、檢查Token

登錄成功后,系統會請求一個check_token方法,主要是檢查用戶token是否合法

persmisson文件夾下的user.py新建check_token方法

@user.route('/check_token', methods=["POST"])
def check_token():
    # 在請求頭上拿到token
    token = request.headers["Authorization"]
    user = verify_token(token)
    if user:
        key = f"token_{user.get('name')}"
        redis_token = Redis.read(key)
        if redis_token == token:
            return SUCCESS(data=user.get('id'))
        else:
            return OTHER_LOGIN()
    else:
        return AUTH_ERR()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM