基於Python-Flask的權限管理5:字典管理


一、前言

在系統設計中,我們希望很多可變的內容能夠可配置化,比如有個多選按鈕,顯示某個值得類型(float,int,string,dict),如果有個地方能夠配置這些值,這樣需要修改的話就不用更新前端了。字典管理就很好的實現了這一功能,通過網頁配置,只要添加或修改了某個值,所有的組件內容都會變化。

 

 

二、功能實現

1.實體類ORM

from models.BaseModel import BaseModel
from db import db


class Dict_Data(BaseModel):
    """
    字典數據表
    """
    __tablename__ = "t_dict_data"
    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="id")
    dict_id = db.Column(db.Integer, nullable=False, comment="dict_id")
    dict_sort = db.Column(db.Integer, comment="字典排序")
    dict_label = db.Column(db.String(100), comment="字典標簽")
    dict_number = db.Column(db.Integer, comment="字典值")
    dict_value = db.Column(db.String(100), default="", comment="字典鍵值")
    dict_type = db.Column(db.String(100), comment="字典類型")
    dict_value_type = db.Column(db.Integer, comment="標識")
    css_class = db.Column(db.String(100), comment="樣式屬性(其他樣式擴展)")
    list_class = db.Column(db.String(100), comment="表格回顯樣式")
    is_default = db.Column(db.Integer, default=1, comment="是否默認(1是 0否)")
    status = db.Column(db.INTEGER, default=1, comment="狀態(1正常 2停用)")
class Dict_Type(BaseModel):
    """
    字典類型表
    """
    __tablename__ = "t_dict_type"
    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="ID")
    dict_name = db.Column(db.String(100), comment="字典名稱")
    dict_type = db.Column(db.String(100), index=True, comment="字典類型")
    dict_value_type = db.Column(db.Integer, comment="標識")
    status = db.Column(db.Integer, default=1, comment="狀態(1正常 2停用)")

2.permission下新建dict.py和dict_data.py,並在app.py注冊藍圖

from permission import *

dict = Blueprint('dict', __name__)
from permission import *

dictData = Blueprint('dictData', __name__)
app.register_blueprint(dict.dict, url_prefix='/api/dict')
app.register_blueprint(dict_data.dictData, url_prefix='/api/dictData')

3.字典數據的增刪改查實現

# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@Author         :  Huguodong
@Version        :  
------------------------------------
@File           :  dict_route.py
@Description    :  字典數據
@CreateTime     :  2020/3/14 16:10
------------------------------------
@ModifyTime     :  
"""
from permission import *

dictData = Blueprint('dictData', __name__)


@dictData.route('/index', methods=["POST"])
def index():
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dict_type = res_dir.get("dict_type")
    dict_id = res_dir.get("dict_id")
    page = res_dir.get("page")
    page_size = res_dir.get("page_size")
    dict_label = res_dir.get("dict_label")
    dict_number = res_dir.get("dict_number")
    status = res_dir.get("status")
    order_column_name = res_dir.get("order_column_name")
    order_type = res_dir.get("order_type")
    if not page or page <= 0:
        page = 1
    if not page_size or page_size <= 0:
        page_size = 10

    if dict_type and dict_id is None:
        dict_data = Dict_Data.query.filter(Dict_Data.dict_type == dict_type).order_by('dict_sort').paginate(page,
                                                                                                            page_size,
                                                                                                            error_out=False)
        data = construct_page_data(dict_data)  # 格式化返回數據
        return jsonify(code=Code.SUCCESS.value, msg="ok", data=data)
    elif dict_id is not None:
        dict_data = Dict_Data.query
        if dict_id != 0:
            dict_data = dict_data.filter(Dict_Data.dict_id == dict_id)
        if dict_label:
            dict_data = dict_data.filter(Dict_Data.dict_label.like("%" + dict_label + "%"))
        if dict_number:
            dict_data = dict_data.filter(Dict_Data.dict_number == int(dict_number))
        if dict_type:
            dict_data = dict_data.filter(Dict_Data.dict_type.like("%" + dict_type + "%"))
        if status is not None:
            dict_data = dict_data.filter(Dict_Data.status.in_((1, 2))) if status == 0 else dict_data.filter(
                Dict_Data.status == status)
        if order_column_name and order_type and order_type.lower() in ['asc', 'desc']:
            dict_data = dict_data.order_by(text(f"{order_column_name} {order_type}"))

        result = dict_data.paginate(page, page_size, error_out=False)
        data = construct_page_data(result)  # 格式化返回數據
        return SUCCESS(data=data)
    else:
        return PARAMETER_ERR()


@dictData.route('/create', methods=["PUT"])
def create():
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dict_id = res_dir.get("dict_id")
    dict_label = res_dir.get("dict_label")
    dict_number = res_dir.get("dict_number")
    css_class = res_dir.get("css_class")
    list_class = res_dir.get("list_class")
    is_default = res_dir.get("is_default")
    dict_sort = res_dir.get("dict_sort")
    dict_value = res_dir.get("dict_value")
    status = res_dir.get("status")
    remark = res_dir.get("remark")
    if dict_id and dict_label if dict_label is not None else dict_value:
        try:
            token = request.headers["Authorization"]
            user = verify_token(token)
            dict_type = Dict_Type.query.get(dict_id).dict_type
            model = Dict_Data()
            model.dict_id = dict_id
            model.dict_type = dict_type
            model.dict_label = dict_label
            model.dict_number = dict_number
            model.css_class = css_class
            model.list_class = list_class
            model.is_default = is_default
            model.dict_sort = dict_sort
            model.dict_value = dict_value
            model.remark = remark
            model.status = status
            model.create_by = user['name']
            model.save()
            return SUCCESS()
        except Exception as e:
            app.logger.error(f"創建字典數據失敗:{e}")
            return CREATE_ERROR()
    else:
        return NO_PARAMETER()


@dictData.route('/update', methods=["POST", "PUT"])
def update():
    '''
    更新字典數據
    POST方法根據id返回數據
    PUT方法更新
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    if request.method == "POST":
        id = res_dir.get("id")
        if id:
            model = Dict_Data.query.get(id)
            if model:
                dict_data = model_to_dict(model)
                return SUCCESS(dict_data)
            else:
                return ID_NOT_FOUND()
        else:
            PARAMETER_ERR()
    if request.method == "PUT":
        id = res_dir.get("id")
        dict_id = res_dir.get("dict_id")
        dict_label = res_dir.get("dict_label")
        dict_number = res_dir.get("dict_number")
        css_class = res_dir.get("css_class")
        list_class = res_dir.get("list_class")
        is_default = res_dir.get("is_default")
        dict_sort = res_dir.get("dict_sort")
        status = res_dir.get("status")
        remark = res_dir.get("remark")
        if id and dict_id and dict_label and dict_number:
            model = Dict_Data.query.get(id)
            if model:
                token = request.headers["Authorization"]
                user = verify_token(token)
                model.dict_id = dict_id
                model.dict_label = dict_label
                model.dict_number = dict_number
                model.css_class = css_class
                model.list_class = list_class
                model.is_default = is_default
                model.dict_sort = dict_sort
                model.remark = remark
                model.status = status
                model.update_by = user['name']
                try:
                    model.update()
                    return SUCCESS()
                except Exception as e:
                    app.logger.error(f"更新字典數據失敗:{e}")
                    return UPDATE_ERROR()
            else:
                return ID_NOT_FOUND()
        else:
            return NO_PARAMETER()


@dictData.route('/delete', methods=["DELETE"])
def delete():
    '''
    根據id刪除字典數據
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dict_id = res_dir.get("id")
    if dict_id:
        try:
            model = Dict_Data.query.get(dict_id)
            if model:
                model.delete()
                return SUCCESS()
            else:
                return ID_NOT_FOUND()
        except Exception as e:
            app.logger.error(f"刪除失敗:{e}")
            return DELETE_ERROR()
    else:
        return PARAMETER_ERR()
View Code

4.字典管理的增刪改查實現

# !/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@Author         :  Huguodong
@Version        :  
------------------------------------
@File           :  dict_route.py.py
@Description    :  
@CreateTime     :  2020/3/16 21:45
------------------------------------
@ModifyTime     :  
"""
from permission import *

dict = Blueprint('dict', __name__)


@dict.route('/index', methods=["POST"])
def index():
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    model = Dict_Type.query
    dict_name = res_dir.get("dict_name")
    dict_type = res_dir.get("dict_type")
    page = res_dir.get("page")
    page_size = res_dir.get("page_size")
    status = res_dir.get("status")
    order_column_name = res_dir.get("order_column_name")
    order_type = res_dir.get("order_type")
    try:
        if dict_name:
            model = model.filter(Dict_Type.dict_name.like("%" + dict_name + "%"))
        if dict_type:
            model = model.filter(Dict_Type.dict_type.like("%" + dict_type + "%"))
        if status is not None:
            model = model.filter(Dict_Type.status.in_((1, 2))) if status == 0 else model.filter(
                Dict_Type.status == status)
        if order_column_name and order_type and order_type.lower() in ['asc', 'desc']:
            model = model.order_by(text(f"{order_column_name} {order_type}"))
        if not page or page <= 0:
            page = 1
        if not page_size or page_size <= 0:
            page_size = 10
        result = model.paginate(page, page_size, error_out=False)
        data = construct_page_data(result)
        return SUCCESS(data=data)
    except Exception as e:
        app.logger.error(f"獲取字典失敗:{e}")
        return REQUEST_ERROR()


@dict.route('/update', methods=["POST", "PUT"])
def update():
    '''
    更新字典
    POST方法根據id返回數據
    PUT方法更新
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    if request.method == "POST":
        id = res_dir.get("id")
        if id:
            model = Dict_Type.query.get(id)
            if model:
                data = model_to_dict(model)
                return SUCCESS(data)
            else:
                return ID_NOT_FOUND()
        else:
            PARAMETER_ERR()
    if request.method == "PUT":
        id = res_dir.get("id")
        dict_value_type = res_dir.get("dict_value_type")
        dict_name = res_dir.get("dict_name")
        dict_type = res_dir.get("dict_type")
        status = res_dir.get("status")
        dict_id = res_dir.get("dict_id")
        if id and dict_type and dict_name and dict_value_type:
            model = Dict_Type.query.get(id)
            if model:
                token = request.headers["Authorization"]
                user = verify_token(token)
                model.dict_name = dict_name
                model.dict_type = dict_type
                model.dict_value_type = dict_value_type
                model.status = status
                model.dict_id = dict_id
                model.update_by = user['name']
                try:
                    model.update()
                    return SUCCESS()
                except Exception as e:
                    app.logger.error(f"更新字典失敗:{e}")
                    return UPDATE_ERROR()
            else:
                return ID_NOT_FOUND()
        else:
            return NO_PARAMETER()


@dict.route('/create', methods=["PUT"])
def create():
    '''
    創建字典
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dict_name = res_dir.get("dict_name")
    dict_type = res_dir.get("dict_type")
    dict_value_type = res_dir.get("dict_value_type")
    remark = res_dir.get("remark")
    status = res_dir.get("status")
    if dict_name and dict_type and dict_value_type:
        try:
            is_exist = Dict_Type.query.filter(Dict_Type.dict_type == dict_type).first()
            if is_exist:
                return CREATE_ERROR(msg="字典類型已存在")
            token = request.headers["Authorization"]
            user = verify_token(token)
            model = Dict_Type()
            model.dict_name = dict_name
            model.dict_type = dict_type
            model.dict_value_type = dict_value_type
            model.remark = remark
            model.status = status
            model.create_by = user['name']
            model.save()
            return SUCCESS()
        except Exception as e:
            app.logger.error(f"新建字典失敗:{e}")
            return CREATE_ERROR()
    else:
        return NO_PARAMETER()


@dict.route('/delete', methods=["DELETE"])
def delete():
    '''
    根據ID刪除字典
    :return:
    '''
    res_dir = request.get_json()
    if res_dir is None:
        return NO_PARAMETER()
    dict_id = res_dir.get("id")
    if dict_id:
        try:
            is_exist = Dict_Data.query.filter(Dict_Data.dict_id == dict_id).first()
            if is_exist:
                return DELETE_ERROR(msg="該字典存在數據,無法刪除!")
            model = Dict_Type.query.get(dict_id)
            if model:
                model.delete()
                return SUCCESS()
            else:
                return ID_NOT_FOUND()
        except Exception as e:
            app.logger.error(f"刪除字典失敗:{e}")
            return DELETE_ERROR()
    else:
        return PARAMETER_ERR()
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM