權限管理是一個很常見的功能模塊,本文基於RBAC模型針對於多用戶,多角色,多權限的場景,介紹一種Flask權限管理方案。
Flask系列文章:
本文將在開發初探的代碼基礎上進行重構。
介紹
在本文所述場景中,具體的權限管理是:權限和角色關聯,給用戶添加角色,用戶即擁有角色的權限,也就是基於角色的權限控制。當然,若需要基於用戶的權限控制也是可以的,只需要修改下相關數據結構即可。
具體的權限驗證采用了位運算,將權限值用十六進制表示,每個角色擁有一個權限總值,當判斷該角色是否有特定權限時:
In [1]: permission = 0X02
In [2]: permissions = 0X0D
In [3]: print((permissions & permission) == permission)
False
In [4]: permissions = 0X07
In [5]: print((permissions & permission) == permission)
True
返回值為True表示擁有該權限,False為沒有該權限,原理與位運算的原理有關。
0x07 = 0x01 + 0x02 + 0x04
轉換為二進制數值可以看做是:0111 = 0001 + 0010 + 0100
按照位運算,運算符&(按位與)相應位都為1,則該位為1,否則為0,那么權限總值和權限值執行按位與運算,結果恆為權限值時才能得出擁有該權限。
實現
創建
首先,針對以上場景,我們創建數據表。
用戶
創建用戶表,保存用戶信息和對應的角色:
class User(db.Model):
"""
用戶表
"""
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), unique=True)
email = db.Column(db.String(128))
password = db.Column(db.String(128))
role_id = db.Column(db.Integer)
def __init__(self, name, email, password):
self.name = name
self.email = email
self.password = bcrypt_sha256.encrypt(str(password))
權限
創建權限類,賦予每種操作權限值,這里舉例用戶管理和更新權限:
class Permissions:
"""
權限類
"""
USER_MANAGE = 0X01
UPDATE_PERMISSION = 0x02
角色
需要創建角色表結構,我們暫定兩種角色:普通用戶和管理員,並初始化角色和權限。
class Role(db.Model):
"""
角色表
"""
__tablename__ = "role"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), unique=True, commit="角色名")
permissions = db.Column(db.Integer, commit="權限總值")
@staticmethod
def init_role():
role_name_list = ['user', 'admin']
roles_permission_map = {
'user': [Permissions.USER_MANAGE],
'admin': [Permissions.USER_MANAGE, Permissions.UPDATE_PERMISSION]
}
try:
for role_name in role_name_list:
role = Role.query.filter_by(name=role_name).first()
if not role:
role = Role(name=role_name)
role.reset_permissions()
for permission in roles_permission_map[role_name]:
role.add_permission(permission)
db.session.add(role)
db.session.commit()
except:
db.session.rollback()
db.session.close()
def reset_permissions(self):
self.permissions = 0
def has_permission(self, permission):
return self.permissions & permission == permission
def add_permission(self, permission):
if not self.has_permission(permission):
self.permissions += permission
隨着應用更新,權限值會不斷增加,角色對應的權限值隨之增大,為了保證每次更新同步到表,可以在flask應用初始化時添加:
Role.init_role()
這樣,我們就賦予了每個角色其擁有的權限值。
重啟應用,可以看到role表:
鑒權
前期數據准備妥當了,接下來就是鑒權。
為了保證訪問的安全性,需要對接口和權限進行關聯綁定,我嘗試過兩種方案:
1. 裝飾器
封裝裝飾器,對接口視圖函數進行裝飾,裝飾器傳入權限值作為參數,在裝飾器中根據用戶角色的權限和權限值進行對比,判斷該用戶是否有該接口的訪問權限。
剛開始我是用這種方式的,小型應用接口不多的場景下使用還好,但隨着應用愈來愈復雜,賦權操作就有點繁瑣。
2. 接口賦權
這是我在裝飾器之后想到的一種方式,在大型應用接口比較多的情況下比較推薦,而且這種方式耦合度低,易於擴展。
具體操作:首先,將接口地址和權限關聯,接口比較多的話,推薦用藍圖,基本上保證一個藍圖中的接口是一個權限,這樣操作會簡單一些,然后,在應用初始化時將接口地址和權限入庫,這樣可以保證每次重啟應用后數據都是最新的,最后,當用戶登錄時,會根據用戶角色和請求的地址判斷其是否有權限訪問。
以上兩種方式,今天以裝飾器鑒權舉例說明。
首先,創建鑒權裝飾器:
from functools import wraps
from flask import session, abort
from app.models import db, Users, Role
Permission_code = [0X01, 0X02]
def permission_can(current_user, permission):
"""
檢測用戶是否有特定權限
:param current_user
:param permission
:return:
"""
role_id = current_user.role_id
role = db.session.query(Role).filter_by(id=role_id).first()
return (role.permissions & permission) == permission
def permission_required(permission):
"""
權限認證裝飾器
:param permission:
:return:
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
current_user = Users.query.filter_by(id=session.get('user_id')).first()
if not current_user and permission_can(current_user, permission):
abort(403)
return f(*args, **kwargs)
except:
abort(403)
return decorated_function
return decorator
其中,用到了flask session,獲取當前登錄用戶的user_id,根據當前用戶的角色判斷其是否擁有該權限permission。
然后在視圖函數上添加該裝飾器,就可以鑒權了。舉例用戶管理功能:
@user.route('/user-manage', methods=['POST', 'GET'])
@permission_required(Permissions.USER_MANAGE)
def user_manage():
"""
用戶管理
:return:
"""
if request.method == 'POST':
# 處理...
ret_data = dict(code=0, ret_msg='user manage')
else:
# 數據處理 ...
ret_data = dict(code=0, ret_msg='user list')
return jsonify(ret_data)
最后,分別構造請求,訪問接口測試:
import requests
session = requests.Session()
# login
login_url = 'http://0.0.0.0:9001/login'
login_data = dict(user='test', pwd='pwd')
login_request = session.post(login_url, json=login_data)
print(login_request.json())
# user_manage
user_manage_url = 'http://0.0.0.0:9001/user-manage'
login_request = session.post(user_manage_url)
print(login_request.json())
# permission_manege
permission_manage_url = 'http://0.0.0.0:9001/permission-manage'
login_request = session.post(permission_manage_url)
print(login_request.json())
具體代碼見 my github
以上。