基於Python-Flask的權限管理8:部門管理


一、前言

部門管理主要管理公司部門信息

 

 

 

一、后端實現

1.ORM類

from models.BaseModel import BaseModel
from db import db


class Dept(BaseModel):
    """
    部門表
    """
    __tablename__ = "t_dept"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="部門ID")
    parent_id = db.Column(db.Integer, comment="父部門id")
    dept_name = db.Column(db.String(30), comment="部門名稱")
    order_num = db.Column(db.Integer, comment="顯示順序")
    leader = db.Column(db.String(20), comment="負責人")
    phone = db.Column(db.String(11), comment="聯系電話")
    email = db.Column(db.String(20), comment="郵箱")
    status = db.Column(db.Integer, default=1, comment="部門狀態(1正常 2停用)")

2.permission下新建dept.py並注冊藍圖

from permission import *

dept = Blueprint('dept', __name__)

app.register_blueprint(dept.dept, url_prefix='/api/dept')

3.增刪改查實現

# !/usr/bin/python3
# -*- coding: utf-8 -*-

from permission import *

dept = Blueprint('dept', __name__)


@dept.route('/findall', methods=["POST"])
def find_all():
    '''
    獲取部門信息
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dept_name = res_dir.get("dept_name")
    page = res_dir.get("page")
    page_size = res_dir.get("page_size")
    status = res_dir.get("status")
    if dept_name:
        # 部門名不為空,直接根據條件查詢數據
        try:
            model = Dept.query.filter(Dept.dept_name.like("%" + dept_name + "%"))
            if status is not None:
                model = model.filter(Dept.status.in_((1, 2))) if status == 0 else model.filter(Dept.status == status)
            if not page or page <= 0:
                page = 1
            if not page_size or page_size <= 0:
                page_size = 10
            result = model.order_by("order_num").paginate(page, page_size, error_out=False)
            data = construct_page_data(result)
            return SUCCESS(data=data)
        except Exception as e:
            app.logger.error(f"獲取崗位信息失敗:{e}")
            return REQUEST_ERROR()
    else:
        # 部門為空,獲取全部部門
        data = constructDeptTrees()  # 獲取菜單樹
        return jsonify(code=Code.SUCCESS.value, msg="ok", data=data)


@dept.route('/update', methods=["POST", "PUT"])
def update():
    '''
        更新崗位
        POST方法根據id返回數據
        PUT方法更新
        :return:
        '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    if request.method == "POST":
        id = res_dir.get("id")
        if id:
            model = Dept.query.get(id)
            if model:
                data = model_to_dict(model)
                return SUCCESS(data=data)
            else:
                return ID_NOT_FOUND()
        else:
            PARAMETER_ERR()
    if request.method == "PUT":
        id = res_dir.get("id")
        dept_name = res_dir.get("dept_name")
        email = res_dir.get("email")
        leader = res_dir.get("leader")
        order_num = res_dir.get("order_num")
        parent_id = res_dir.get("parent_id")
        phone = res_dir.get("phone")
        remark = res_dir.get("remark")
        status = res_dir.get("status")

        if id and dept_name:
            model = Dept.query.get(id)
            if model:
                try:
                    token = request.headers["Authorization"]
                    user = verify_token(token)
                    model.dept_name = dept_name
                    model.parent_id = parent_id
                    model.leader = leader
                    model.email = email
                    model.order_num = order_num
                    model.phone = phone
                    model.status = status
                    model.remark = remark
                    model.update_by = user['name']
                    model.update()
                    return SUCCESS()
                except Exception as e:
                    app.logger.error(f"更新部門失敗:{e}")
                    return UPDATE_ERROR()
            else:
                return ID_NOT_FOUND()
        else:
            return NO_PARAMETER()


@dept.route('/create', methods=["PUT"])
def create():
    '''
    創建部門
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dept_name = res_dir.get("dept_name")
    email = res_dir.get("email")
    leader = res_dir.get("leader")
    order_num = res_dir.get("order_num")
    parent_id = res_dir.get("parent_id")
    phone = res_dir.get("phone")
    remark = res_dir.get("remark")
    status = res_dir.get("status")
    token = request.headers["Authorization"]
    user = verify_token(token)
    if dept_name:
        try:
            is_exist = Dept.query.filter(Dept.dept_name == dept_name, Dept.parent_id == parent_id).first()
            if is_exist:
                return CREATE_ERROR(msg="同級別該部門名稱已存在")
            model = Dept()
            model.dept_name = dept_name
            model.parent_id = parent_id
            model.leader = leader
            model.email = email
            model.order_num = order_num
            model.phone = phone
            model.remark = remark
            model.status = status
            model.create_by = user['name']
            model.save()
            return SUCCESS()
        except Exception as e:
            app.logger.error(f"新建部門失敗:{e}")
            return CREATE_ERROR()
    else:
        return NO_PARAMETER()


@dept.route('/delete', methods=["DELETE"])
def delete():
    '''
        根據ID刪除崗位
        :return:
        '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    id = res_dir.get("id")
    if id:
        try:
            parent = Dept.query.filter_by(parent_id=id).all()
            if parent:
                return DELETE_ERROR(msg="該部門下存在子部門,無法刪除!")
            role = Role_Dept.query.filter_by(dept_id=id).all()
            if role:
                return DELETE_ERROR(msg="該部門已與角色關聯,無法刪除!")
            model = Dept.query.get(id)
            if model:
                model.delete()
                return SUCCESS()
            else:
                return ID_NOT_FOUND()
        except Exception as e:
            app.logger.error(f"刪除崗位失敗:{e}")
            return DELETE_ERROR()
    else:
        return PARAMETER_ERR()


def constructDeptTrees(parentId=0):
    '''
    通過遞歸實現根據父ID查找子部門
    1.根據父ID獲取該部門下的子部門
    2.遍歷子部門,繼續向下獲取,直到最小部門
    3.如果沒有遍歷到,返回空的數組,有返回權限列表
    :param parentId:
    :return:dict
    '''
    dept_data = Dept.query.filter(Dept.parent_id == parentId).order_by('order_num').all()
    dept_dict = model_to_dict(dept_data)
    if len(dept_dict) > 0:
        data = []
        for dept in dept_dict:
            dept['children_list'] = constructDeptTrees(dept['id'])
            data.append(dept)
        return data
    return []

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM