Django REST framework 認證和權限:
1, 認證和權限是不同的
2, 權限控制可以限制用戶對於視圖的訪問和對於具體數據對象的訪問
3, 認證是檢驗是 能否通過強調的是檢驗身份
4, 項目采用的認證是 全局認證 權限代碼 可以隨時更改權限
5, 自定義認證主要是 獲取前端的 token 值 然后進行 token 檢驗!
首先 定義試圖類
from rest_framework.generics import GenericAPIView from django_test.auth import AsAdminRole class view(GenericAPIView): # 使用的認證方式! authentication_classes = ( JWTAuthentication, CookieAuthentication ) # 自定義全局權限 permission_classes = (AsAdminRole, ) '''AsAdminRole 是自定義的權限類!項目會自動調用權限檢查!''' # 試用裝飾器可以更改權限 view 類重新指定權限為 AsOperatorRole @AsOperatorRole # 相當於 get = AsOperatorRole(get(self, request)) def get(self,request): return HttpResponse( 'hello world!' )
在 setting文件之中配置權限,(也可以寫在試圖里面,這樣不需要寫在配置文件里面)
REST_FRAMEWORK = { # 認證和權限放在一起的!在view之中需要設置 'DEFAULT_AUTHENTICATION_CLASSES': ( #'rest_framework.authentication.BasicAuthentication', # 基本認證 #'rest_framework.authentication.SessionAuthentication', # session認證 # 項目采用自定義的認證(全局) 'antilles.user.plugins.JWTAuthentication' ), # 權限設置! 'DEFAULT_PERMISSION_CLASSES': ( # 僅通過認證的用戶可以通過! # 'rest_framework.permissions.IsAuthenticated', # 'rest_framework.permissions.AllowAny', # 允許所有的! 'django_test.auth.AsAdminRole', # 指定自定義的權限認證 ) }
登錄生成token:

class SessionView(APIView): permission_classes = () def get(self, request): user = request.user if isinstance(user, AnonymousUser): raise AuthenticationFailed( detail='Incorrect authentication credentials.' ) role = request.GET.get('role') if role is not None and not user.check_role(role): raise PermissionDenied( detail='Incorrect user role.' ) return Response() @json_schema_validate({ 'type': 'object', 'properties': { 'user': { 'type': 'string', 'minLength': 1 }, 'pass': { 'type': 'string', 'minLength': 1 }, }, }) def post(self, request): '''get auth token this api can be used in two method: request contains username/password in body, return token request contains exists valid token, return a new token ''' if isinstance(request.user, AnonymousUser): return self.login(request) else: return self.renew_token(request) def login(self, request): try: username = request.data['user'] password = request.data['pass'] user = User.objects.get(username=username) except User.DoesNotExist as e: logger.exception('User[ %s ] is not exists', username) raise_from( LoginFail, e ) else: if not user.is_activate(): raise LoginFail(user) success = authenticate(user=user, password=password) if not success: user.login_fail() logger.info('User[ %s ] authenticated failed', username) raise LoginFail() user.login_success() # 登錄返還給前端 token的值 return Response({'token': self.build_token(user)}) def renew_token(self, request): user = request.user if not user.is_activate(): raise LoginFail(user) return Response({'token': self.build_token(user)}) # 登錄生成 token def build_token(self, user): import jwt from cryptography.fernet import Fernet from django.utils.timezone import now now = now() return jwt.encode( { 'id': user.pk, 'iss': 'antilles-user', 'sub': user.username, 'role': user.get_role_display(), 'iat': now, 'nbf': now, 'exp': now + settings.TOKEN_EXPIRE, 'jti': Fernet.generate_key(), }, settings.SECRET_KEY, algorithm=settings.TOKEN_ALGORITHMS )
自定義權限控制類

#coding:utf-8 from abc import ABCMeta, abstractproperty from django.contrib.auth.models import AnonymousUser from rest_framework import permissions from six import add_metaclass ROLE_ADMIN = 300 ROLE_OPERATOR = 200 ROLE_USER = 100 #角色具有三種權限的角色 USER_ROLES = [ (ROLE_ADMIN, 'admin'), (ROLE_OPERATOR, 'operator'), (ROLE_USER, 'user'), ] #抽象基礎類 ABCMeta! @add_metaclass(ABCMeta) class BaseRole(permissions.BasePermission): def __new__(cls, func=None): # 在此處 func 指的是 get(self,request) """ Overwrite __new__ for use premission class as decorator example: >>> class TestView(APIView): ... # if use class as decorator, __new__ get a argument func ... @AsUserRole ... def get(self, request): ... pass premission defined by decorator will **overwrite** class premission """ if func: # use as decorator func.permission_class = cls # 增加一個函數屬性值為 AsUserRole return func else: return permissions.BasePermission.__new__(cls) @abstractproperty def floor(self): pass # 自定義繼承類需要重寫 has_permission和 has_object_permission方法! def has_permission(self, request, view): '''此方法必須有返回值(True/false)''' if not request.user: return False # request.user獲取當前的登陸用戶User對象。如果當前用戶沒有登陸,那么request.user將會是我們所說的AnonymousUser對象 if isinstance(request.user, AnonymousUser): return False # if defined method permission method = getattr(view, request.method.lower()) # 取出 __new__ 方法增加的屬性值(也是類對象)!是綁定到 方法 method 之中的 method_permission = getattr(method, 'permission_class', None) # 判斷權限級別是否大於自定義權限!權限是按照數據庫role大小來衡量 if method_permission: return request.user.role >= method_permission.floor return request.user.role >= self.floor class AsUserRole(BaseRole): floor = ROLE_USER class AsOperatorRole(BaseRole): floor = ROLE_OPERATOR class AsAdminRole(BaseRole): floor = ROLE_ADMIN
自定義認證類!

import logging from django.conf import settings from rest_framework.authentication import ( BaseAuthentication, get_authorization_header, ) from rest_framework.exceptions import AuthenticationFailed, PermissionDenied from six import raise_from from .managers.pam import auth from .models import User logger = logging.getLogger(__name__) class JWTAuthentication(BaseAuthentication): keyword = 'Jwt' # 自定義認證需要重寫 authenticate 方法! def authenticate(self, request): # 由於 header 的頭部是 byte 類型此方法 轉換成 字符串 然后以空格為界 轉換列表 # AUTHORIZATION這個就是之前服務端返回的,只不過Django會默認加上HTTP auth = get_authorization_header(request).split() # auth = ["Token", "c4840b5226a65806c586c239345fce66caf12409"] if not auth or auth[0].lower() != self.keyword.lower().encode(): return None if len(auth) == 1: raise AuthenticationFailed( detail='Invalid token header. No credentials provided.' ) elif len(auth) > 2: raise AuthenticationFailed( detail='Invalid token header.' 'Token string should not contain spaces.' ) try: token = auth[1].decode()# token = c4840b5226a65806c586c239345fce66caf12409 except UnicodeError: raise AuthenticationFailed( detail='Invalid token header.' 'Token string should not contain invalid characters.' ) return self.authenticate_credentials(token) # token 認證! def authenticate_credentials(self, token): import jwt from jwt import InvalidTokenError try: payload = jwt.decode( token, settings.SECRET_KEY, options={ 'verify_signature': True, 'verify_exp': True, 'verify_nbf': True, 'verify_iat': True, 'require_exp': True, 'require_nbf': True, 'require_iat': True, 'require_iss': True, 'require_jti': True, 'require_role': True, 'require_sub': True, 'require_mgt': True } ) # sub: jwt所面向的用戶 user = User.objects.get(username=payload['sub']) payload_role = User.get_role_value(payload['role']) if payload_role > user.role: raise PermissionDenied( 'Insufficient permission.' ) return user, payload except InvalidTokenError as e: raise_from( AuthenticationFailed, e ) except User.DoesNotExist as e: raise_from( AuthenticationFailed, e ) # “jwt” 自定義的字符串 生成的token前面 def authenticate_header(self, request): return self.keyword class CookieAuthentication(JWTAuthentication): def authenticate(self, request): token = request.COOKIES.get('token') if token is None: return None token = token.decode() return self.authenticate_credentials(token)
驗證結果