原文:https://www.jianshu.com/p/f34d7e83f0de
看了flask-login的文檔,發現除了cookie驗證身份外,還提供通過請求參數和頭部來驗證用戶身份的方法。下面通過閱讀flask的源碼來解讀整個過程。
- 需要登錄驗證的視圖函數加上
login_required的裝飾器
@app.route('/userinfo') @login_required def user_info(): pass
login_required的源碼
def login_required(func): @wraps(func) def decorated_view(*args, **kwargs): if request.method in EXEMPT_METHODS: return func(*args, **kwargs) elif current_app.login_manager._login_disabled: return func(*args, **kwargs) elif not current_user.is_authenticated: return current_app.login_manager.unauthorized() return func(*args, **kwargs) return decorated_view
第一個條件判斷請求方法是否在白名單中,是的話不進行登錄驗證,默認請求是OPTION的話不進行驗證。
第二個判斷login_manager是否配置為不驗證登錄。以上都不滿足的話則判斷當前用戶是否是通過登錄授權的。
登錄授權的本質問題就是能否通過請求所帶的cookie或者參數等識別到用戶。下來來看如何獲取到current_user。
- 獲取
current_user。
在flask_login/utils.py中定義了current_user
current_user = LocalProxy(lambda: _get_user())
然后看到_get_user的定義
def _get_user(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'): current_app.login_manager._load_user() return getattr(_request_ctx_stack.top, 'user', None)
接着來看login_manager._load_user
def _load_user(self): config = current_app.config if config.get('SESSION_PROTECTION', self.session_protection): deleted = self._session_protection() if deleted: return self.reload_user() if is_missing_user_id: cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME) header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME) has_cookie = (cookie_name in request.cookies and session.get('remember') != 'clear') if has_cookie: return self._load_from_cookie(request.cookies[cookie_name]) elif self.request_callback: return self._load_from_request(request) elif header_name in request.headers: return self._load_from_header(request.headers[header_name]) return self.reload_user()
從這個方法的最后可以看到,提供了三種方法加載用戶。
第一種是_load_from_cookie,
接着是_load_from_request,
最后是_load_form_header。
第一種小程序不支持,第三種不久后將要被棄用,所以這里主要是用第二種,通過request對象來加載用戶。
_load_from_request的定義
def _load_from_request(self, request): user = None if self.request_callback: user = self.request_callback(request) if user is not None: self.reload_user(user=user) app = current_app._get_current_object() user_loaded_from_request.send(app, user=_get_user()) else: self.reload_user()
如果存在request_callback,就調用該方法。看下下面這段代碼
def request_loader(self, callback): self.request_callback = callback return callback
這個方法也是一個裝飾器,類似於常用的login_manager.load_user裝飾器,詳細用法可以查看官方文檔。
- 自定義授權檢查方法
這是我在小程序接口中定義的授權檢查方法
@login_manager.request_loader def load_user_from_request(request): login_key = request.headers.get('login_key') if login_key: user_info = current_app.redis.get(login_key) if user_info: # load user # return user pass return None
用戶通過小程序授權登錄后,自定義一個login_key作為存儲用戶唯一識別信息的Redis鍵,將其返回給小程序,之后每次小程序請求都要帶上這個key,這樣就能完成一次登錄校驗。
#自定義登錄校驗裝飾器(此方式是在未改變默認is_authenticated方法的時候生效)
def custom_login_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
from flask_login.config import EXEMPT_METHODS
from flask import current_app
if request.method in EXEMPT_METHODS:
return func(*args, **kwargs)
elif current_app.login_manager._login_disabled:
return func(*args, **kwargs)
elif not current_user.is_authenticated:
# 自定義登錄方式
opstore_cookie_name = current_app.config.get('OPSTORE_COOKIE_NAME')
has_opstore_cookie = (opstore_cookie_name in request.cookies)
if not has_opstore_cookie:
return current_app.login_manager.unauthorized()
opstore_cookie_secret_key = current_app.config.get('OPSTORE_COOKIE_SECRET_KEY')
cookie_dict = decrypt_token(opstore_cookie_secret_key, request.cookies[opstore_cookie_name])
if not cookie_dict:
return current_app.login_manager.unauthorized()
current_app.logger.debug('decrypt_token-----------------%s------%s' % cookie_dict, cookie_dict['userid'])
if 'userid' not in cookie_dict:
return current_app.login_manager.unauthorized()
user = User.query.filter(User.id == cookie_dict['userid']).first()
if user:
login_user(user)
return func(*args, **kwargs)
else:
return current_app.login_manager.unauthorized()
current_app.logger.debug('++current_user.is_authenticated----------')
return func(*args, **kwargs)
return decorated_view
#自定義登錄校驗裝飾器使用
@auth.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
'''修改密碼'''
pass
