基於Python-Flask的權限管理10:用戶管理


一、前言

用戶管理主要管理用戶信息

 

 

二、后端實現

1.ORM類

import hashlib

from sqlalchemy import func

from db import db
from models.BaseModel import BaseModel


class User(BaseModel):
    """
    用戶表
    """
    __tablename__ = "t_user"
    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="用戶ID")
    nickname = db.Column(db.String(30), comment="用戶昵稱")
    user_name = db.Column(db.String(30), comment="登錄賬號")
    user_type = db.Column(db.Boolean, default=1, comment="用戶類型(1系統用戶")
    email = db.Column(db.String(50), comment="用戶郵箱")
    phone = db.Column(db.String(20), comment="手機號")
    phonenumber = db.Column(db.String(11), comment="手機號碼")
    sex = db.Column(db.INTEGER, default=1, comment="用戶性別(1男 2女 3未知)")
    avatar = db.Column(db.String(100), comment="頭像路徑")
    password = db.Column(db.String(50), comment="密碼")
    salt = db.Column(db.String(20), comment="鹽加密")
    status = db.Column(db.INTEGER, default=1, comment="帳號狀態(1正常 2禁用")
    dept_id = db.Column(db.INTEGER, comment="部門id")
    del_flag = db.Column(db.INTEGER, default=1, comment="刪除標志(1代表存在 2代表刪除)")
    login_ip = db.Column(db.String(50), comment="最后登陸IP")
    login_date = db.Column(db.TIMESTAMP, comment="最后登陸時間", nullable=False,
                           onupdate=func.now())

    def check_password(self, passwd):
        '''
        檢查密碼
        :param passwd:
        :return: 0/1
        '''
        # 創建md5對象
        m = hashlib.md5()
        b = passwd.encode(encoding='utf-8')
        m.update(b)
        str_md5 = m.hexdigest()
        if self.password == str_md5:
            return 1
        else:
            return 0
from db import db
from models.BaseModel import BaseModel


class User_Post(BaseModel):
    """
    用戶與崗位關聯表
    """
    __tablename__ = "t_user_post"
    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="ID")
    user_id = db.Column(db.Integer, comment="用戶ID")
    post_id = db.Column(db.Integer, comment="崗位ID")
    create_by = None
    created_at = None
    update_by = None
    updated_at = None
    remark = None

2.permission下的user.py中實現增刪改查

# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@Author         :  Huguodong
@Version        :  
------------------------------------
@File           :  route.py
@Description    :  用戶相關的請求
@CreateTime     :  2020/2/24 21:55
------------------------------------
@ModifyTime     :  
"""
from permission import *

user = Blueprint('user', __name__)


@user.route('/login', methods=["POST"])
def login():
    '''
    用戶登錄
    :return:token
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    # 獲取前端傳過來的參數
    username = res_dir.get("username")
    password = res_dir.get("password")
    # 校驗參數
    if not all([username, password]):
        return jsonify(code=Code.NOT_NULL.value, msg="用戶名和密碼不能為空")
    try:
        user = User.query.filter_by(user_name=username).first()
    except Exception as e:
        app.logger.error("login error:{}".format(e))
        return jsonify(code=Code.REQUEST_ERROR.value, msg="獲取信息失敗")
    if user is None or not user.check_password(password) or user.del_flag == 2 or user.status == 2:
        return jsonify(code=Code.ERR_PWD.value, msg="用戶名或密碼錯誤")

    # 獲取用戶信息,傳入生成token的方法,並接收返回的token
    # 獲取用戶角色
    user_role = Role.query.join(User_Role, Role.id == User_Role.role_id).join(User,
                                                                              User_Role.user_id == user.id).filter(
        User.id == user.id).all()
    role_list = [i.role_key for i in user_role]
    token = create_token(user.id, user.user_name, role_list)
    data = {'token': token, 'userId': user.id, 'userName': user.user_name, 'nickname': user.nickname}
    # 記錄登錄ip將token存入rerdis
    try:
        user.login_ip = request.remote_addr
        user.update()
        Redis.write(f"token_{user.user_name}", token)

    except Exception as e:
        return jsonify(code=Code.UPDATE_DB_ERROR.value, msg="登錄失敗")
    if token:
        # 把token返回給前端
        return jsonify(code=Code.SUCCESS.value, msg="登錄成功", data=data)
    else:
        return jsonify(code=Code.REQUEST_ERROR.value, msg="請求失敗", data=token)


@user.route('/logout', methods=["POST"])
@login_required()
def logout():
    '''
    注銷方法:redis刪除token
    :return:
    '''
    try:
        token = request.headers["Authorization"]
        user = verify_token(token)
        if user:
            key = f"token_{user.get('name')}"
            redis_token = Redis.read(key)
            if redis_token:
                Redis.delete(key)
            return SUCCESS()
        else:
            return AUTH_ERR()
    except Exception as e:
        app.logger.error(f"注銷失敗")
        return REQUEST_ERROR()


@user.route('/check_token', methods=["POST"])
def check_token():
    # 在請求頭上拿到token
    token = request.headers["Authorization"]
    user = verify_token(token)
    if user:
        key = f"token_{user.get('name')}"
        redis_token = Redis.read(key)
        if redis_token == token:
            return SUCCESS(data=user.get('id'))
        else:
            return OTHER_LOGIN()
    else:
        return AUTH_ERR()


@user.route('/index', methods=["POST"])
def index():
    '''
    獲取用戶
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    user_name = res_dir.get("user_name")
    phone = res_dir.get("phone")
    dept_id = res_dir.get("dept_id")
    order_column_name = res_dir.get("order_column_name")
    order_type = res_dir.get("order_type")
    page = res_dir.get("page")
    page_size = res_dir.get("page_size")
    status = res_dir.get("status")
    try:
        model = User.query.filter(User.del_flag == 1)
        if user_name:
            model = model.filter(User.user_name.like("%" + user_name + "%"))
        if phone:
            model = model.filter(User.phone.like("%" + phone + "%"))
        if dept_id:
            # 根據部門id查找該部門下的用戶,包括子部門
            dept = Dept.query.filter_by(id=dept_id).first()
            dept_ids = find_childern(dept)
            model = model.filter(User.dept_id.in_(dept_ids))
        if status is not None:
            model = model.filter(User.status.in_((1, 2))) if status == 0 else model.filter(User.status == status)
        if order_column_name and order_type and order_type.lower() in ['asc', 'desc']:
            model = model.order_by(text(f"{order_column_name} {order_type}"))
        if not page or page <= 0:
            page = 1
        if not page_size or page_size <= 0:
            page_size = 10
        result = model.paginate(page, page_size, error_out=False)
        data = construct_page_data(result)
        return SUCCESS(data=data)
    except Exception as e:
        app.logger.error(f"獲取用戶信息失敗:{e}")
        return REQUEST_ERROR()


@user.route('/update', methods=["POST", "PUT"])
def update():
    '''
    更新角色
    POST方法根據id返回數據
    PUT方法更新
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    if request.method == "POST":
        id = res_dir.get("id")
        if id:
            model = User.query.get(id)
            if model:
                dict_data = model_to_dict(model)
                del dict_data["password"]  # 刪除密碼,不返回前端
                # 獲取角色和崗位
                user_post = User_Post.query.with_entities(User_Post.post_id).filter(User_Post.user_id == id).order_by(
                    "post_id").all()
                user_role = User_Role.query.with_entities(User_Role.role_id).filter(User_Role.user_id == id).order_by(
                    "role_id").all()
                post_list = [str(i[0]) for i in user_post]
                role_list = [str(i[0]) for i in user_role]
                dict_data['user_post'] = ','.join(post_list)
                dict_data['user_role'] = ','.join(role_list)
                return SUCCESS(dict_data)
            else:
                return ID_NOT_FOUND()
        else:
            PARAMETER_ERR()
    if request.method == "PUT":
        id = res_dir.get("id")
        user_name = res_dir.get("user_name")
        phone = res_dir.get("phone")
        dept_id = res_dir.get("dept_id")
        avatar = res_dir.get("avatar")
        email = res_dir.get("email")
        nickname = res_dir.get("nickname")
        status = res_dir.get("status")
        user_role = res_dir.get("user_role")
        user_post = res_dir.get("user_post")
        sex = res_dir.get("sex")
        remark = res_dir.get("remark")
        if id and user_name and dept_id:
            model = User.query.get(id)
            if model:
                try:
                    token = request.headers["Authorization"]
                    user = verify_token(token)
                    model.user_name = user_name
                    model.phone = phone
                    model.dept_id = dept_id
                    model.avatar = avatar
                    model.email = email
                    model.nickname = nickname
                    model.sex = sex
                    model.remark = remark
                    model.status = status
                    model.update_by = user['name']
                    model.update()
                    try:
                        update_post(id, user_post)
                        update_role(id, user_role)
                        return SUCCESS()
                    except Exception as e:
                        app.logger.error(f"更新角色或崗位失敗:{e}")
                        return UPDATE_ERROR(msg="更新角色或崗位失敗")
                except Exception as e:
                    app.logger.error(f"更新用戶失敗:{e}")
                    return UPDATE_ERROR()
            else:
                return ID_NOT_FOUND()
        else:
            return NO_PARAMETER()
    return SUCCESS()


@user.route('/create', methods=["PUT"])
def create():
    '''
    創建用戶
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    user_name = res_dir.get("user_name")
    phone = res_dir.get("phone")
    dept_id = res_dir.get("dept_id")
    avatar = res_dir.get("avatar")
    email = res_dir.get("email")
    nickname = res_dir.get("nickname")
    password = res_dir.get("password")
    status = res_dir.get("status")
    user_role = res_dir.get("user_role")
    user_post = res_dir.get("user_post")
    sex = res_dir.get("sex")
    remark = res_dir.get("remark")
    token = request.headers["Authorization"]
    user = verify_token(token)
    if user_name and dept_id and password and nickname:
        try:
            is_exist = User.query.filter(User.user_name == user_name).first()
            if is_exist:
                return CREATE_ERROR(msg="角色名已存在")
            # 添加用戶
            model = User()
            model.user_name = user_name
            model.phone = phone
            model.dept_id = dept_id
            model.avatar = avatar
            model.email = email
            model.nickname = nickname
            model.password = create_passwd(password)
            model.sex = sex
            model.remark = remark
            model.status = status
            model.create_by = user['name']
            model.save()
            # 添加角色和崗位
            try:
                # 添加角色
                if user_role:
                    role_list = user_role.split(',')
                    insert_list = []
                    for role_id in role_list:
                        insert_list.append({"role_id": role_id, "user_id": model.id})
                    if len(insert_list) > 0:
                        user_role_model = User_Role()
                        user_role_model.save_all(insert_list)
                # 添加崗位
                if user_post:
                    post_list = user_post.split(',')
                    insert_list = []
                    for post_id in post_list:
                        insert_list.append({"post_id": post_id, "user_id": model.id})
                    if len(insert_list) > 0:
                        user_post_model = User_Post()
                        user_post_model.save_all(insert_list)
                return SUCCESS()
            except Exception as e:
                app.logger.error(f"添加角色或崗位失敗:{e}")
                return CREATE_ERROR(msg="添加角色或崗位失敗")
        except Exception as e:
            return CREATE_ERROR()
    else:
        return NO_PARAMETER()


@user.route('/delete', methods=["DELETE"])
def delete():
    '''
    根據id刪除用戶
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    userid = res_dir.get("id")
    if userid:
        try:
            model = User.query.get(userid)
            if model:
                model.del_flag = 2
                model.update()
                return SUCCESS()
            else:
                return ID_NOT_FOUND()
        except Exception as e:
            app.logger.error(f"刪除失敗:{e}")
            return DELETE_ERROR()
    else:
        return PARAMETER_ERR()


def find_childern(dept):
    '''
     獲取部門下的子部門ID元祖
     :param dept:
    :return: tuple
     '''
    dept_ids = [dept.id]
    dept_id = get_dept_by_parentId(dept.id)
    dept_ids += dept_id
    return tuple(dept_ids)


def get_dept_by_parentId(parentId):
    '''
    遞歸查找部門和子部門id
    :param parentId:
    :return:
    '''
    dept_data = Dept.query.filter(Dept.parent_id == parentId).all()
    if len(dept_data) > 0:
        data = []
        for dept in dept_data:
            ids = get_dept_by_parentId(dept.id)
            data += ids
            data += (str(dept.id))
        return data
    return []


def update_post(userid, user_post):
    post_db = User_Post.query.filter(
        User_Post.user_id == userid).all()
    if user_post:  # 如果有菜單
        # 獲取數據庫菜單列表
        db_list = [str(i.post_id) for i in post_db]
        # 獲取傳過來的參數
        par_list = user_post.split(',')
        # 獲取需要刪除和增加的數據
        add_list, less_list = get_diff(db_list, par_list)
        if len(less_list) > 0:
            # 刪除沒有權限的菜單
            for post in post_db:
                if str(post.post_id) in less_list:
                    post.delete()
        if len(add_list) > 0:
            insert_list = []
            for post_id in add_list:
                insert_list.append({"user_id": userid, "post_id": post_id})
            role_menu_model = User_Post()
            role_menu_model.save_all(insert_list)
    else:
        for post in post_db:
            post.delete()


def update_role(userid, user_role):
    role_db = User_Role.query.filter(
        User_Role.user_id == userid).all()
    if user_role:  # 如果有菜單
        # 獲取數據庫菜單列表
        db_list = [str(i.role_id) for i in role_db]
        # 獲取傳過來的參數
        par_list = user_role.split(',')
        # 獲取需要刪除和增加的數據
        add_list, less_list = get_diff(db_list, par_list)
        if len(less_list) > 0:
            # 刪除沒有權限的菜單
            for role in role_db:
                if str(role.role_id) in less_list:
                    role.delete()
        if len(add_list) > 0:
            insert_list = []
            for role_id in add_list:
                insert_list.append({"user_id": userid, "role_id": role_id})
            role_menu_model = User_Role()
            role_menu_model.save_all(insert_list)
    else:
        for role in role_db:
            role.delete()

三、上傳頭像

1.basic文件夾下新建upload.py,並注冊藍圖

import os
import random
import time

from basic import *

upload = Blueprint('upload', __name__)

app.register_blueprint(upload.upload, url_prefix='/api/upload')

2.實現上傳頭像

#允許的后綴名
ALLOWED_EXTENSIONS = ['png', 'jpg', 'jpeg']


def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS


@upload.route('/head_image', methods=["POST"])
def head_image():
    file = request.files.get('file')
    if file is None:
        return FILE_NO_FOUND(msg="請選擇頭像!")
    # 格式判斷
    if file.content_type != "image/jpeg" and file.content_type != "image/png":
        return ERROR_FILE_TYPE("請上傳有效圖片文件!")
    # 后綴名判斷
    if not allowed_file(file.filename):
        return ERROR_FILE_TYPE()
    # 大小限制判斷
    if is_over_size(file, 2):
        return OVER_SIZE(msg="頭像文件大小不能超過2MB")
    # 保存文件
    try:
        new_filename = time.strftime('%Y%m%d%H%M%S') + '%d' % random.randint(0, 100)
        new_filename = md5_sum(new_filename)
        new_filename += "." + file.filename.rsplit('.', 1)[1]
        upload_path = os.path.join(UPLOAD_HEAD_FOLDER, new_filename)
        file.save(upload_path)
        image_path = app_url + "/" + upload_path
        return SUCCESS(data=image_path)
    except Exception as e:
        return UPLOAD_FAILD("上傳頭像失敗")



def is_over_size(file, max_size):
    '''
    判斷文件大小是否超出限制
    :param file:
    :param M:
    :return:
    '''
    size = len(file.read())
    file.seek(0)
    if size / (1024 * 1024) > int(max_size):
        return True
    else:
        return False

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM