1、session認證
.....
login(request, user) #登錄成功
# 登錄之后獲取獲取最新的session_key
session_key = request.session.session_key
# 刪除非當前用戶session_key的記錄
for session in Session.objects.filter(~Q(session_key=session_key), expire_date__gte=timezone.now()):
data = session.get_decoded()
if data.get('_auth_user_id', None) == str(request.user.id):
session.delete()
2、jwt認證
如果是JWT認證模式,比較麻煩,個人的解決方法是,每個用戶登錄后,在redis中存儲用戶的jwt_token. key是用戶的id,value是用戶最新的jwt_token. 因為用的django-jwt庫,所以這里定義了自己的LoginJWT, 繼承JSONWebTokenAPIView.
主要代碼改動,只是在登錄后,把最新的jwt_token存入redis cache.
class LoginJWT(JSONWebTokenAPIView):
serializer_class = JSONWebTokenSerializer
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = serializer.object.get('user') or request.user
token = serializer.object.get('token')
response_data = jwt_response_payload_handler(token, user, request)
response = Response(response_data)
if api_settings.JWT_AUTH_COOKIE:
expiration = (datetime.utcnow() +
api_settings.JWT_EXPIRATION_DELTA)
response.set_cookie(api_settings.JWT_AUTH_COOKIE,
token,
expires=expiration,
httponly=True)
user.token = token
user.save()
k = "{}".format(user.id)
cache.set(k, token, TOKEN_EXPIRE)
return response
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
如果有推送機制的話,可以在此處添加推送機制,把新用戶登錄的消息推送給登錄過的客戶端。
這里我沒采用推送,而是,在某個經常訪問的DRF API中定義驗證,驗證當前請求帶上來的jwt_token是否是redis中存儲的最新的,如果不是,則返回用戶,請求失敗,請重新登錄的消息。
比如在某個viewsets.GenericViewSet中,authentication_classes包含自定義的myJWTAuth
authentication_classes = (myJWTAuth, SessionAuthentication)
myJWTAuth代碼:
class myJWTAuth(JSONWebTokenAuthentication):
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
k = "{}".format(user.id)
jwt_header=request.META.get('HTTP_AUTHORIZATION',None)
try:
jwt_str=jwt_header.split()[1]
except:
jwt_str=""
if cache.has_key(k):
cache_key = cache.get(k)
if jwt_str==cache_key:
cache.set(k, cache_key, TOKEN_EXPIRE)
return (user, jwt_value)
raise exceptions.AuthenticationFailed()
參考連接:https://blog.csdn.net/u014633966/article/details/85414656
