app/models.py
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
程序的權限
FOLLOW 關注用戶 0x01
COMMET 在他人文章中發表評論 0x02
WRITE_ARTICLES 寫文章 0x04
MODERATE_COMMENTS 管理他人發表的評論 0x08
ADMINISTER 管理員權限 0x80
class Permission:
FOLLOW = 0x01
COMMENT = 0x02
WRITE_ARTICLES = 0x04
MODERATE_COMMENTS = 0x08
ADMINISTER = 0x80
列出了要支持的用戶角色以及定義角色使用的權限位
用戶角色
匿名 0x00 未登錄的用戶,在程序中只有閱讀權限
用戶 0x07 具有發表文章,發表評論和關注其他用戶的權限。這是新用戶的默認角色
協管員 0x0f 增加審查不當評論的權限
管理員 0xff 具有所有權限,包括修改其他用戶所屬角色的權限
app/models.py:在數據庫中創建角色
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
@staticmethod
def insert_roles():
roles = {
'User': (Permission.FOLLOW |
Permission.COMMENT |
Permission.WRITE_ARTICLES, True),
'Moderator': (Permission.FOLLOW |
Permission.COMMENT |
Permission.WRITE_ARTICLES |
Permission.MODERATE_COMMENTS, False),
'Administrator': (0xff, False)
}
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.permissions = roles[r][0]
role.default = roles[r][1]
db.session.add(role)
db.session.commit()
這個Role表添加了一個靜態方法,執行insert_roles函數會創建三個name,分別是用戶,協管員和管理員

賦予角色:
app/models.py
class User(UserMixin,db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(64),unique=True,index=True)
username = db.Column(db.String(64), unique=True,index=True)
role_id = db.Column(db.Integer,db.ForeignKey('roles.id'))
password_hash = db.Column(db.String(128))
def __init__(self,**kwargs):
super(User,self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(permission=0xff).first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()
User表的方法是:如果用戶沒有設置權限,用戶的郵箱等於配置中設置的管理員郵箱,就設置為管理員權限,用戶的權限為空,則設置為普通用戶
角色驗證
app/models.py:檢查用戶是否有指定的權限
class User(UserMixin,db.Model):
#...
def can(self, permissions):
return self.role is not None and \
(self.role.permissions & permissions) == permissions
def is_administrator(self):
return self.can(Permission.ADMINISTER)
can()方法在請求和賦予角色這兩種權限之間進行位與操作。如果角色中包含請求的所有權限位,則返回True ,表示允許用戶執行此項操作。
is_administrator()方法用來檢車管理員權限
from flask_login import AnonymousUserMixin
class AnonymousUser(AnonymousUserMixin): def can(self,permissions): return False def is_administraror(self): return False login_manager.anonymous_user = AnonymousUser
這個類用來檢查匿名用戶的權限
app/decorators.py:檢查用戶權限的自定義修飾器
from functools import wraps from flask import abort from flask_login import current_user from .models import Permission def permission_required(permission): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.can(permission): abort(403) return f(*args, **kwargs) return decorated_function return decorator def admin_required(f): return permission_required(Permission.ADMINISTER)(f)
這兩個修飾器都使用了Python標准庫中的functools包,如果用戶不具有指定權限,則返回403錯誤碼,即HTTP“禁止”錯誤。要添加一個403錯誤頁面
以下的例子就是將上面的裝飾器,用在了路由功能里面,針對一些頁面設置了權限
from decorators import admin_required, permission_required
from .models import Permission
@main.route('/admin')
@login_required
@admin_required
def for_admins_only():
return "For administrators!"
@main.route('/moderator')
@login_required
@permission_required(Permission.MODERATE_COMMENTS)
def for_moderators_only():
return "For comment moderators!"
在模板中可能也需要檢查權限,所以Permission 類為所有位定義了常量以便於獲取。為了避免每次調用render_template() 時都多添加一個模板參數,可以使用上下文處理器。上下文處理器能讓變量在所有模板中全局可訪問。
app/main/__init__.py:把Permission類加入模板上下文
@main.app_context_processor
def inject_permissions():
return dict(Permission=Permission)
tests/test_user_models.py:角色和權限的單元測試
#...
def test_roles_and_permissions(self):
Role.insert_roles()
u = User(email='1808863623@qq.com',password='cat')
self.assertTrue(u.can(Permission.WRITE_ARTICLES))
self.assertFalse(u.can(Permission.MODERATE_COMMENTS))
def test_anonymous_user(self):
u = AnonymousUser()
self.assertFalse(u.can(Permission.FOLLOW))
