原文:https://www.jianshu.com/p/f34d7e83f0de
看了flask-login的文档,发现除了cookie验证身份外,还提供通过请求参数和头部来验证用户身份的方法。下面通过阅读flask的源码来解读整个过程。
- 需要登录验证的视图函数加上
login_required的装饰器
@app.route('/userinfo') @login_required def user_info(): pass
login_required的源码
def login_required(func): @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.login_manager._login_disabled: return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() return func(*args, **kwargs) return decorated_view
第一个条件判断请求方法是否在白名单中,是的话不进行登录验证,默认请求是OPTION的话不进行验证。
第二个判断login_manager是否配置为不验证登录。以上都不满足的话则判断当前用户是否是通过登录授权的。
登录授权的本质问题就是能否通过请求所带的cookie或者参数等识别到用户。下来来看如何获取到current_user。
- 获取
current_user。
在flask_login/utils.py中定义了current_user
current_user = LocalProxy(lambda: _get_user())
然后看到_get_user的定义
def _get_user(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'): current_app.login_manager._load_user() return getattr(_request_ctx_stack.top, 'user', None)
接着来看login_manager._load_user
def _load_user(self): config = current_app.config if config.get('SESSION_PROTECTION', self.session_protection): deleted = self._session_protection() if deleted: return self.reload_user() if is_missing_user_id: cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME) has_cookie = (cookie_name in request.cookies and session.get('remember') != 'clear') if has_cookie: return self._load_from_cookie(request.cookies[cookie_name]) elif self.request_callback: return self._load_from_request(request) elif header_name in request.headers: return self._load_from_header(request.headers[header_name]) return self.reload_user()
从这个方法的最后可以看到,提供了三种方法加载用户。
第一种是_load_from_cookie,
接着是_load_from_request,
最后是_load_form_header。
第一种小程序不支持,第三种不久后将要被弃用,所以这里主要是用第二种,通过request对象来加载用户。
_load_from_request的定义
def _load_from_request(self, request): user = None if self.request_callback: user = self.request_callback(request) if user is not None: self.reload_user(user=user) app = current_app._get_current_object() user_loaded_from_request.send(app, user=_get_user()) else: self.reload_user()
如果存在request_callback,就调用该方法。看下下面这段代码
def request_loader(self, callback): self.request_callback = callback return callback
这个方法也是一个装饰器,类似于常用的login_manager.load_user装饰器,详细用法可以查看官方文档。
- 自定义授权检查方法
这是我在小程序接口中定义的授权检查方法
@login_manager.request_loader def load_user_from_request(request): login_key = request.headers.get('login_key') if login_key: user_info = current_app.redis.get(login_key) if user_info: # load user # return user pass return None
用户通过小程序授权登录后,自定义一个login_key作为存储用户唯一识别信息的Redis键,将其返回给小程序,之后每次小程序请求都要带上这个key,这样就能完成一次登录校验。
#自定义登录校验装饰器(此方式是在未改变默认is_authenticated方法的时候生效)
def custom_login_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
from flask_login.config import EXEMPT_METHODS
from flask import current_app
if request.method in EXEMPT_METHODS:
return func(*args, **kwargs)
elif current_app.login_manager._login_disabled:
return func(*args, **kwargs)
elif not current_user.is_authenticated:
# 自定义登录方式
opstore_cookie_name = current_app.config.get('OPSTORE_COOKIE_NAME')
has_opstore_cookie = (opstore_cookie_name in request.cookies)
if not has_opstore_cookie:
return current_app.login_manager.unauthorized()
opstore_cookie_secret_key = current_app.config.get('OPSTORE_COOKIE_SECRET_KEY')
cookie_dict = decrypt_token(opstore_cookie_secret_key, request.cookies[opstore_cookie_name])
if not cookie_dict:
return current_app.login_manager.unauthorized()
current_app.logger.debug('decrypt_token-----------------%s------%s' % cookie_dict, cookie_dict['userid'])
if 'userid' not in cookie_dict:
return current_app.login_manager.unauthorized()
user = User.query.filter(User.id == cookie_dict['userid']).first()
if user:
login_user(user)
return func(*args, **kwargs)
else:
return current_app.login_manager.unauthorized()
current_app.logger.debug('++current_user.is_authenticated----------')
return func(*args, **kwargs)
return decorated_view
#自定义登录校验装饰器使用
@auth.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
'''修改密码'''
pass
