一、前言
部門管理主要管理公司部門信息
一、后端實現
1.ORM類
from models.BaseModel import BaseModel from db import db class Dept(BaseModel): """ 部門表 """ __tablename__ = "t_dept" id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="部門ID") parent_id = db.Column(db.Integer, comment="父部門id") dept_name = db.Column(db.String(30), comment="部門名稱") order_num = db.Column(db.Integer, comment="顯示順序") leader = db.Column(db.String(20), comment="負責人") phone = db.Column(db.String(11), comment="聯系電話") email = db.Column(db.String(20), comment="郵箱") status = db.Column(db.Integer, default=1, comment="部門狀態(1正常 2停用)")
2.permission下新建dept.py並注冊藍圖
from permission import * dept = Blueprint('dept', __name__) app.register_blueprint(dept.dept, url_prefix='/api/dept')
3.增刪改查實現
# !/usr/bin/python3 # -*- coding: utf-8 -*- from permission import * dept = Blueprint('dept', __name__) @dept.route('/findall', methods=["POST"]) def find_all(): ''' 獲取部門信息 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dept_name = res_dir.get("dept_name") page = res_dir.get("page") page_size = res_dir.get("page_size") status = res_dir.get("status") if dept_name: # 部門名不為空,直接根據條件查詢數據 try: model = Dept.query.filter(Dept.dept_name.like("%" + dept_name + "%")) if status is not None: model = model.filter(Dept.status.in_((1, 2))) if status == 0 else model.filter(Dept.status == status) if not page or page <= 0: page = 1 if not page_size or page_size <= 0: page_size = 10 result = model.order_by("order_num").paginate(page, page_size, error_out=False) data = construct_page_data(result) return SUCCESS(data=data) except Exception as e: app.logger.error(f"獲取崗位信息失敗:{e}") return REQUEST_ERROR() else: # 部門為空,獲取全部部門 data = constructDeptTrees() # 獲取菜單樹 return jsonify(code=Code.SUCCESS.value, msg="ok", data=data) @dept.route('/update', methods=["POST", "PUT"]) def update(): ''' 更新崗位 POST方法根據id返回數據 PUT方法更新 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() if request.method == "POST": id = res_dir.get("id") if id: model = Dept.query.get(id) if model: data = model_to_dict(model) return SUCCESS(data=data) else: return ID_NOT_FOUND() else: PARAMETER_ERR() if request.method == "PUT": id = res_dir.get("id") dept_name = res_dir.get("dept_name") email = res_dir.get("email") leader = res_dir.get("leader") order_num = res_dir.get("order_num") parent_id = res_dir.get("parent_id") phone = res_dir.get("phone") remark = res_dir.get("remark") status = res_dir.get("status") if id and dept_name: model = Dept.query.get(id) if model: try: token = request.headers["Authorization"] user = verify_token(token) model.dept_name = dept_name model.parent_id = parent_id model.leader = leader model.email = email model.order_num = order_num model.phone = phone model.status = status model.remark = remark model.update_by = user['name'] model.update() return SUCCESS() except Exception as e: app.logger.error(f"更新部門失敗:{e}") return UPDATE_ERROR() else: return ID_NOT_FOUND() else: return NO_PARAMETER() @dept.route('/create', methods=["PUT"]) def create(): ''' 創建部門 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() dept_name = res_dir.get("dept_name") email = res_dir.get("email") leader = res_dir.get("leader") order_num = res_dir.get("order_num") parent_id = res_dir.get("parent_id") phone = res_dir.get("phone") remark = res_dir.get("remark") status = res_dir.get("status") token = request.headers["Authorization"] user = verify_token(token) if dept_name: try: is_exist = Dept.query.filter(Dept.dept_name == dept_name, Dept.parent_id == parent_id).first() if is_exist: return CREATE_ERROR(msg="同級別該部門名稱已存在") model = Dept() model.dept_name = dept_name model.parent_id = parent_id model.leader = leader model.email = email model.order_num = order_num model.phone = phone model.remark = remark model.status = status model.create_by = user['name'] model.save() return SUCCESS() except Exception as e: app.logger.error(f"新建部門失敗:{e}") return CREATE_ERROR() else: return NO_PARAMETER() @dept.route('/delete', methods=["DELETE"]) def delete(): ''' 根據ID刪除崗位 :return: ''' res_dir = request.get_json() if res_dir is None: return NO_PARAMETER() id = res_dir.get("id") if id: try: parent = Dept.query.filter_by(parent_id=id).all() if parent: return DELETE_ERROR(msg="該部門下存在子部門,無法刪除!") role = Role_Dept.query.filter_by(dept_id=id).all() if role: return DELETE_ERROR(msg="該部門已與角色關聯,無法刪除!") model = Dept.query.get(id) if model: model.delete() return SUCCESS() else: return ID_NOT_FOUND() except Exception as e: app.logger.error(f"刪除崗位失敗:{e}") return DELETE_ERROR() else: return PARAMETER_ERR() def constructDeptTrees(parentId=0): ''' 通過遞歸實現根據父ID查找子部門 1.根據父ID獲取該部門下的子部門 2.遍歷子部門,繼續向下獲取,直到最小部門 3.如果沒有遍歷到,返回空的數組,有返回權限列表 :param parentId: :return:dict ''' dept_data = Dept.query.filter(Dept.parent_id == parentId).order_by('order_num').all() dept_dict = model_to_dict(dept_data) if len(dept_dict) > 0: data = [] for dept in dept_dict: dept['children_list'] = constructDeptTrees(dept['id']) data.append(dept) return data return []