一、前言
登錄功能是每個系統的基礎,本篇實現了基於token的用戶登錄和請求權限控制。
二、數據庫model
models文件夾下新建user.py,創建ORM實體類
# !/usr/bin/python3 # -*- coding: utf-8 -*- """ @Author : Huguodong @Version : ------------------------------------ @File : user.py @Description : 用戶表 @CreateTime : 2020/3/7 14:45 ------------------------------------ @ModifyTime : """ import hashlib from sqlalchemy import func from db import db from models.BaseModel import BaseModel class User(BaseModel): """ 用戶表 """ __tablename__ = "t_user" id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="用戶ID") nickname = db.Column(db.String(30), comment="用戶昵稱") user_name = db.Column(db.String(30), comment="登錄賬號") user_type = db.Column(db.Boolean, default=1, comment="用戶類型(1系統用戶") email = db.Column(db.String(50), comment="用戶郵箱") phone = db.Column(db.String(20), comment="手機號") phonenumber = db.Column(db.String(11), comment="手機號碼") sex = db.Column(db.INTEGER, default=1, comment="用戶性別(1男 2女 3未知)") avatar = db.Column(db.String(100), comment="頭像路徑") password = db.Column(db.String(50), comment="密碼") salt = db.Column(db.String(20), comment="鹽加密") status = db.Column(db.INTEGER, default=1, comment="帳號狀態(1正常 2禁用") dept_id = db.Column(db.INTEGER, comment="部門id") del_flag = db.Column(db.INTEGER, default=1, comment="刪除標志(1代表存在 2代表刪除)") login_ip = db.Column(db.String(50), comment="最后登陸IP") login_date = db.Column(db.TIMESTAMP, comment="最后登陸時間", nullable=False, onupdate=func.now()) def check_password(self, passwd): ''' 檢查密碼 :param passwd: :return: 0/1 ''' # 創建md5對象 m = hashlib.md5() b = passwd.encode(encoding='utf-8') m.update(b) str_md5 = m.hexdigest() if self.password == str_md5: return 1 else: return 0
二、創建藍圖
1.permission文件夾下新建藍圖文件user.py
from permission import * user = Blueprint('user', __name__)
2.app.py注冊藍圖
app.register_blueprint(user.user, url_prefix='/api/user')
3.寫一個測試方法
@user.route('/test', methods=["GET"]) def test(): return SUCCESS()
4.瀏覽器輸入http://127.0.0.1:5000/api/user/test
二、實現token方法
用token校驗身份,是前后端交互的常用方式。
它有以下特性:
- 會失效
- 加密
- 可以根據它拿到用戶的信息
關於token的方法寫在utils下的common.py下
1.生成token:生成方式( 內部配置的私鑰+有效期+用戶的id +用戶名+用戶角色列表)
def create_token(user_id, user_name, role_list): ''' 生成token :return: token ''' # 第一個參數是內部的私鑰,這里寫在共用的配置信息里了,如果只是測試可以寫死 # 第二個參數是有效期(秒) s = Serializer(config.SECRET_KEY, expires_in=config.EXPIRES_IN) # 接收用戶id轉換與編碼 token = None try: token = s.dumps({"id": user_id, "name": user_name, "role": role_list}).decode("ascii") except Exception as e: app.logger.error("獲取token失敗:{}".format(e)) return token
2.校驗token:校驗接收到的token,如果成功返回用戶信息,否則返回None
def verify_token(token): ''' 校驗token :param token: :return: 用戶信息 or None ''' # 參數為私有秘鑰,跟上面方法的秘鑰保持一致 s = Serializer(config.SECRET_KEY) try: # 轉換為字典 data = s.loads(token) return data except Exception as e: return None
3.有很多接口是必須登錄才能操作的,最好的方式就是在寫一個裝飾器,添加在需要的api上
def login_required(*role): def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): try: # 在請求頭上拿到token token = request.headers["Authorization"] except Exception as e: # 沒接收的到token,給前端拋出錯誤 return jsonify(code=Code.NO_PARAMETER.value, msg='缺少參數token') s = Serializer(config.SECRET_KEY) try: user = s.loads(token) if role: # 獲取token中的權限列表如果在參數列表中則表示有權限,否則就表示沒有權限 user_role = user['role'] result = [x for x in user_role if x in list(role)] if not result: return jsonify(code=Code.ERR_PERMISSOM.value, msg="權限不夠") except Exception as e: return jsonify(code=Code.LOGIN_TIMEOUT.value, msg="登錄已過期") return func(*args, **kw) return wrapper return decorator
這樣只需在方法上添加裝飾器就能限制用戶訪問
#限制只有admin可以操作 @user.route('/xxx', methods=["POST"]) @login_required(‘admin’) def xxx() return xx
二、用戶登錄
點擊前端頁面的登錄按鈕,會請求登錄接口
persmisson文件夾下的user.py新建登錄方法
@user.route('/login', methods=["POST"]) def login(): ''' 用戶登錄 :return:token ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() # 獲取前端傳過來的參數 username = res_dir.get("username") password = res_dir.get("password") # 校驗參數 if not all([username, password]): return jsonify(code=Code.NOT_NULL.value, msg="用戶名和密碼不能為空") try: user = User.query.filter_by(user_name=username).first() except Exception as e: app.logger.error("login error:{}".format(e)) return jsonify(code=Code.REQUEST_ERROR.value, msg="獲取信息失敗") if user is None or not user.check_password(password) or user.del_flag == 2 or user.status==2 : return jsonify(code=Code.ERR_PWD.value, msg="用戶名或密碼錯誤") # 獲取用戶信息,傳入生成token的方法,並接收返回的token # 獲取用戶角色 user_role = Role.query.join(User_Role, Role.id == User_Role.role_id).join(User, User_Role.user_id == user.id).filter( User.id == user.id).all() role_list = [i.role_key for i in user_role] token = create_token(user.id, user.user_name, role_list) data = {'token': token, 'userId': user.id, 'userName': user.user_name, 'nickname': user.nickname} # 記錄登錄ip將token存入rerdis try: user.login_ip = request.remote_addr user.update() Redis.write(f"token_{user.user_name}", token) except Exception as e: return jsonify(code=Code.UPDATE_DB_ERROR.value, msg="登錄失敗") if token: # 把token返回給前端 return jsonify(code=Code.SUCCESS.value, msg="登錄成功", data=data) else: return jsonify(code=Code.REQUEST_ERROR.value, msg="請求失敗", data=token)
二、用戶注銷
persmisson文件夾下的user.py新建注銷方法
@user.route('/logout', methods=["POST"]) @login_required() def logout(): ''' 注銷方法:redis刪除token :return: ''' try: token = request.headers["Authorization"] user = verify_token(token) if user: key = f"token_{user.get('name')}" redis_token = Redis.read(key) if redis_token: Redis.delete(key) return SUCCESS() else: return AUTH_ERR() except Exception as e: app.logger.error(f"注銷失敗") return REQUEST_ERROR()
三、檢查Token
登錄成功后,系統會請求一個check_token方法,主要是檢查用戶token是否合法
persmisson文件夾下的user.py新建check_token方法
@user.route('/check_token', methods=["POST"]) def check_token(): # 在請求頭上拿到token token = request.headers["Authorization"] user = verify_token(token) if user: key = f"token_{user.get('name')}" redis_token = Redis.read(key) if redis_token == token: return SUCCESS(data=user.get('id')) else: return OTHER_LOGIN() else: return AUTH_ERR()