1、session認證
..... login(request, user) #登錄成功 # 登錄之后獲取獲取最新的session_key session_key = request.session.session_key # 刪除非當前用戶session_key的記錄 for session in Session.objects.filter(~Q(session_key=session_key), expire_date__gte=timezone.now()): data = session.get_decoded() if data.get('_auth_user_id', None) == str(request.user.id): session.delete()
2、jwt認證
如果是JWT認證模式,比較麻煩,個人的解決方法是,每個用戶登錄后,在redis中存儲用戶的jwt_token. key是用戶的id,value是用戶最新的jwt_token. 因為用的django-jwt庫,所以這里定義了自己的LoginJWT, 繼承JSONWebTokenAPIView.
主要代碼改動,只是在登錄后,把最新的jwt_token存入redis cache.
class LoginJWT(JSONWebTokenAPIView): serializer_class = JSONWebTokenSerializer def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if serializer.is_valid(): user = serializer.object.get('user') or request.user token = serializer.object.get('token') response_data = jwt_response_payload_handler(token, user, request) response = Response(response_data) if api_settings.JWT_AUTH_COOKIE: expiration = (datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA) response.set_cookie(api_settings.JWT_AUTH_COOKIE, token, expires=expiration, httponly=True) user.token = token user.save() k = "{}".format(user.id) cache.set(k, token, TOKEN_EXPIRE) return response return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
如果有推送機制的話,可以在此處添加推送機制,把新用戶登錄的消息推送給登錄過的客戶端。
這里我沒采用推送,而是,在某個經常訪問的DRF API中定義驗證,驗證當前請求帶上來的jwt_token是否是redis中存儲的最新的,如果不是,則返回用戶,請求失敗,請重新登錄的消息。
比如在某個viewsets.GenericViewSet中,authentication_classes包含自定義的myJWTAuth
authentication_classes = (myJWTAuth, SessionAuthentication)
myJWTAuth代碼:
class myJWTAuth(JSONWebTokenAuthentication): def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload) k = "{}".format(user.id) jwt_header=request.META.get('HTTP_AUTHORIZATION',None) try: jwt_str=jwt_header.split()[1] except: jwt_str="" if cache.has_key(k): cache_key = cache.get(k) if jwt_str==cache_key: cache.set(k, cache_key, TOKEN_EXPIRE) return (user, jwt_value) raise exceptions.AuthenticationFailed()
參考連接:https://blog.csdn.net/u014633966/article/details/85414656