flask-login 三种方法加载用户【转】


原文:https://www.jianshu.com/p/f34d7e83f0de

 

看了flask-login的文档,发现除了cookie验证身份外,还提供通过请求参数和头部来验证用户身份的方法。下面通过阅读flask的源码来解读整个过程。

  • 需要登录验证的视图函数加上login_required的装饰器
@app.route('/userinfo') @login_required def user_info(): pass 

login_required的源码

def login_required(func): @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.login_manager._login_disabled: return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() return func(*args, **kwargs) return decorated_view 

第一个条件判断请求方法是否在白名单中,是的话不进行登录验证,默认请求是OPTION的话不进行验证。

第二个判断login_manager是否配置为不验证登录。以上都不满足的话则判断当前用户是否是通过登录授权的。

登录授权的本质问题就是能否通过请求所带的cookie或者参数等识别到用户。下来来看如何获取到current_user

  • 获取current_user

在flask_login/utils.py中定义了current_user

current_user = LocalProxy(lambda: _get_user()) 

然后看到_get_user的定义

def _get_user(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'): current_app.login_manager._load_user() return getattr(_request_ctx_stack.top, 'user', None) 

接着来看login_manager._load_user

def _load_user(self): config = current_app.config if config.get('SESSION_PROTECTION', self.session_protection): deleted = self._session_protection() if deleted: return self.reload_user() if is_missing_user_id: cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME) has_cookie = (cookie_name in request.cookies and session.get('remember') != 'clear') if has_cookie: return self._load_from_cookie(request.cookies[cookie_name]) elif self.request_callback: return self._load_from_request(request) elif header_name in request.headers: return self._load_from_header(request.headers[header_name]) return self.reload_user() 

从这个方法的最后可以看到,提供了三种方法加载用户。

第一种是_load_from_cookie

接着是_load_from_request

最后是_load_form_header

 

第一种小程序不支持,第三种不久后将要被弃用,所以这里主要是用第二种,通过request对象来加载用户。

_load_from_request的定义

def _load_from_request(self, request): user = None if self.request_callback: user = self.request_callback(request) if user is not None: self.reload_user(user=user) app = current_app._get_current_object() user_loaded_from_request.send(app, user=_get_user()) else: self.reload_user() 

如果存在request_callback,就调用该方法。看下下面这段代码

def request_loader(self, callback): self.request_callback = callback return callback 

这个方法也是一个装饰器,类似于常用的login_manager.load_user装饰器,详细用法可以查看官方文档

  • 自定义授权检查方法

这是我在小程序接口中定义的授权检查方法

@login_manager.request_loader def load_user_from_request(request): login_key = request.headers.get('login_key') if login_key: user_info = current_app.redis.get(login_key) if user_info: # load user # return user pass return None 

用户通过小程序授权登录后,自定义一个login_key作为存储用户唯一识别信息的Redis键,将其返回给小程序,之后每次小程序请求都要带上这个key,这样就能完成一次登录校验。

 

 

#自定义登录校验装饰器(此方式是在未改变默认is_authenticated方法的时候生效)
def custom_login_required(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        from flask_login.config import EXEMPT_METHODS
        from flask import current_app
        if request.method in EXEMPT_METHODS:
            return func(*args, **kwargs)
        elif current_app.login_manager._login_disabled:
            return func(*args, **kwargs)
        elif not current_user.is_authenticated:
            # 自定义登录方式
            opstore_cookie_name = current_app.config.get('OPSTORE_COOKIE_NAME')
            has_opstore_cookie = (opstore_cookie_name in request.cookies)
            if not has_opstore_cookie:
                return current_app.login_manager.unauthorized()

            opstore_cookie_secret_key = current_app.config.get('OPSTORE_COOKIE_SECRET_KEY')
            cookie_dict = decrypt_token(opstore_cookie_secret_key, request.cookies[opstore_cookie_name])
            if not cookie_dict:
                return current_app.login_manager.unauthorized()

            current_app.logger.debug('decrypt_token-----------------%s------%s' % cookie_dict, cookie_dict['userid'])
            if 'userid' not in cookie_dict:
                return current_app.login_manager.unauthorized()
            user = User.query.filter(User.id == cookie_dict['userid']).first()
            if user:
                login_user(user)
                return func(*args, **kwargs)
            else:
                return current_app.login_manager.unauthorized()
        current_app.logger.debug('++current_user.is_authenticated----------')

        return func(*args, **kwargs)
    return decorated_view

#自定义登录校验装饰器使用
@auth.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
    '''修改密码'''
    pass

  


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM