flask-login 三種方法加載用戶【轉】


原文:https://www.jianshu.com/p/f34d7e83f0de

 

看了flask-login的文檔,發現除了cookie驗證身份外,還提供通過請求參數和頭部來驗證用戶身份的方法。下面通過閱讀flask的源碼來解讀整個過程。

  • 需要登錄驗證的視圖函數加上login_required的裝飾器
@app.route('/userinfo') @login_required def user_info(): pass 

login_required的源碼

def login_required(func): @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.login_manager._login_disabled: return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() return func(*args, **kwargs) return decorated_view 

第一個條件判斷請求方法是否在白名單中,是的話不進行登錄驗證,默認請求是OPTION的話不進行驗證。

第二個判斷login_manager是否配置為不驗證登錄。以上都不滿足的話則判斷當前用戶是否是通過登錄授權的。

登錄授權的本質問題就是能否通過請求所帶的cookie或者參數等識別到用戶。下來來看如何獲取到current_user

  • 獲取current_user

在flask_login/utils.py中定義了current_user

current_user = LocalProxy(lambda: _get_user()) 

然后看到_get_user的定義

def _get_user(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'): current_app.login_manager._load_user() return getattr(_request_ctx_stack.top, 'user', None) 

接着來看login_manager._load_user

def _load_user(self): config = current_app.config if config.get('SESSION_PROTECTION', self.session_protection): deleted = self._session_protection() if deleted: return self.reload_user() if is_missing_user_id: cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME) has_cookie = (cookie_name in request.cookies and session.get('remember') != 'clear') if has_cookie: return self._load_from_cookie(request.cookies[cookie_name]) elif self.request_callback: return self._load_from_request(request) elif header_name in request.headers: return self._load_from_header(request.headers[header_name]) return self.reload_user() 

從這個方法的最后可以看到,提供了三種方法加載用戶。

第一種是_load_from_cookie

接着是_load_from_request

最后是_load_form_header

 

第一種小程序不支持,第三種不久后將要被棄用,所以這里主要是用第二種,通過request對象來加載用戶。

_load_from_request的定義

def _load_from_request(self, request): user = None if self.request_callback: user = self.request_callback(request) if user is not None: self.reload_user(user=user) app = current_app._get_current_object() user_loaded_from_request.send(app, user=_get_user()) else: self.reload_user() 

如果存在request_callback,就調用該方法。看下下面這段代碼

def request_loader(self, callback): self.request_callback = callback return callback 

這個方法也是一個裝飾器,類似於常用的login_manager.load_user裝飾器,詳細用法可以查看官方文檔

  • 自定義授權檢查方法

這是我在小程序接口中定義的授權檢查方法

@login_manager.request_loader def load_user_from_request(request): login_key = request.headers.get('login_key') if login_key: user_info = current_app.redis.get(login_key) if user_info: # load user # return user pass return None 

用戶通過小程序授權登錄后,自定義一個login_key作為存儲用戶唯一識別信息的Redis鍵,將其返回給小程序,之后每次小程序請求都要帶上這個key,這樣就能完成一次登錄校驗。

 

 

#自定義登錄校驗裝飾器(此方式是在未改變默認is_authenticated方法的時候生效)
def custom_login_required(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        from flask_login.config import EXEMPT_METHODS
        from flask import current_app
        if request.method in EXEMPT_METHODS:
            return func(*args, **kwargs)
        elif current_app.login_manager._login_disabled:
            return func(*args, **kwargs)
        elif not current_user.is_authenticated:
            # 自定義登錄方式
            opstore_cookie_name = current_app.config.get('OPSTORE_COOKIE_NAME')
            has_opstore_cookie = (opstore_cookie_name in request.cookies)
            if not has_opstore_cookie:
                return current_app.login_manager.unauthorized()

            opstore_cookie_secret_key = current_app.config.get('OPSTORE_COOKIE_SECRET_KEY')
            cookie_dict = decrypt_token(opstore_cookie_secret_key, request.cookies[opstore_cookie_name])
            if not cookie_dict:
                return current_app.login_manager.unauthorized()

            current_app.logger.debug('decrypt_token-----------------%s------%s' % cookie_dict, cookie_dict['userid'])
            if 'userid' not in cookie_dict:
                return current_app.login_manager.unauthorized()
            user = User.query.filter(User.id == cookie_dict['userid']).first()
            if user:
                login_user(user)
                return func(*args, **kwargs)
            else:
                return current_app.login_manager.unauthorized()
        current_app.logger.debug('++current_user.is_authenticated----------')

        return func(*args, **kwargs)
    return decorated_view

#自定義登錄校驗裝飾器使用
@auth.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
    '''修改密碼'''
    pass

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM