DRF 三大認證的配置及使用方法


三大認證

一、身份認證

1、身份認證配置

1.1 全局配置身份認證模塊

身份認證組件一般都配置在全局settings中。

# settings.py
# drf框架自定義配置
REST_FRAMEWORK = {
    # 認證組件
    'DEFAULT_AUTHENTICATION_CLASSES': [
        # 'rest_framework.authentication.SessionAuthentication',
        # 'rest_framework.authentication.BasicAuthentication'
        'rest_framework_jwt.authentication.JSONWebTokenAuthentication'
    ],
}

1.2 局部配置身份認證模塊

在視圖類中用authentication_classes類屬性配置身份認證模塊:

# views.py
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class UserListViewSet(mixins.ListModelMixin, GenericViewSet):
    authentication_classes = [JSONWebTokenAuthentication]

    queryset = models.User.objects.filter(is_active=True).all()
    serializer_class = serializers.UserModelSerializer

2、drf提供的身份認證類(了解)

其中BaseAuthentication是用來自定義身份認證類需要繼承的基類。

其他的類是drf默認提前寫好的幾種身份認證類,可以直接使用:

BasicAuthentication

SessionAuthentication

TokenAuthentication

def get_authorization_header(request):
    # 前台在請求頭用authorization攜帶認證字符串token給后台
    auth = request.META.get('HTTP_AUTHORIZATION', b'')
    # auth == token字符串  將其編碼成二進制
    if isinstance(auth, str):
        # Work around django test client oddness
        auth = auth.encode(HTTP_HEADER_ENCODING)
    return auth

class BasicAuthentication(BaseAuthentication):
    www_authenticate_realm = 'api'

    def authenticate(self, request):
        auth = get_authorization_header(request).split()
        # auth按空格拆分,拆分的列表結果長度為2才合法

        if not auth or auth[0].lower() != b'basic':
            # 沒有token,認證方法直接返回None,代表游客(匿名用戶)
            return None
        # 可以推論出auth拆分的結果為2份,結構為:['basic','token字符串']
        if len(auth) == 1:
            msg = _('Invalid basic header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid basic header. Credentials string should not contain spaces.')
            raise exceptions.AuthenticationFailed(msg)
            # 結論:提交了token,格式有誤,拋異常,代表非法用戶

        try:
            # 反解token(auth是被拆分的列表,0是頭,1是token)
            auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
        except (TypeError, UnicodeDecodeError, binascii.Error):
            msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
            raise exceptions.AuthenticationFailed(msg)
            # 結論:提交了token,格式有誤,拋異常,代表非法用戶

        userid, password = auth_parts[0], auth_parts[2]
        return self.authenticate_credentials(userid, password, request)
        # 結論:提交了token,解析成功,返回(user,None)組成的元組,代表合法用戶
        # 元組0位user會被儲存到request.user中,
        # 元組1位token會被存儲到request.auth中,通常可以不用保存,所以可以用None填充

    def authenticate_credentials(self, userid, password, request=None):
        """
        Authenticate the userid and password against username and password
        with optional request for context.
        """
        credentials = {
            get_user_model().USERNAME_FIELD: userid,
            'password': password
        }
        user = authenticate(request=request, **credentials)

        if user is None:
            raise exceptions.AuthenticationFailed(_('Invalid username/password.'))

        if not user.is_active:
            raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))

        return (user, None)

    def authenticate_header(self, request):
        return 'Basic realm="%s"' % self.www_authenticate_realm


class SessionAuthentication(BaseAuthentication):
    def authenticate(self, request):
        # Get the session-based user from the underlying HttpRequest object
        user = getattr(request._request, 'user', None)

        # Unauthenticated, CSRF validation not required
        if not user or not user.is_active:
            return None

        self.enforce_csrf(request)

        # CSRF passed with authenticated user
        return (user, None)

class TokenAuthentication(BaseAuthentication):
    keyword = 'Token'
    model = None

    def authenticate(self, request):
        auth = get_authorization_header(request).split()

        if not auth or auth[0].lower() != self.keyword.lower().encode():
            return None

        if len(auth) == 1:
            msg = _('Invalid token header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid token header. Token string should not contain spaces.')
            raise exceptions.AuthenticationFailed(msg)

        try:
            token = auth[1].decode()
        except UnicodeError:
            msg = _('Invalid token header. Token string should not contain invalid characters.')
            raise exceptions.AuthenticationFailed(msg)

        return self.authenticate_credentials(token)

3、rf-jwt提供的身份認證類(常用)

rf-jwt為我們提供了其已經寫好的身份認證類:JSONWebTokenAuthentication

特點:

前端在向后端發送請求時需攜帶的token格式為:

{'Authorization':'jwt abc.def.xyz'}  // token需以jwt開頭

后端如何配置:

from rest_framework_jwt.authentication import JSONWebTokenAuthentication

class UserListViewSet(ListModelMixin,GenericViewSet):
    # ---------------三大認證配置---------------我是分割線
    # authentication_classes = [authentications.MyAuthentication]
    authentication_classes = [JSONWebTokenAuthentication]

    # -----------------正常邏輯-----------------我是分割線
    queryset = models.User.objects.filter(is_active=True).all()
    serializer_class = serializers.UserModelSerializer

4、自定義身份認證類(需要自定義簽發token時用)

  1. 如果使用session認證,drf默認提供了SessionAuthentication
  2. 如果使用drf-jwt認證框架,drf-jwt框架提供了JSONWebTokenAuthentication
  3. 如果是自定義簽發與校驗token,才需要將校驗token的算法封裝到自定義的認證類中

如何自定義身份認證類:

  1. 繼承BaseAuthentication續寫Authentication身份認證類:
  2. 重寫authenticate方法;
  • 從請求頭中拿到前台提交的token(一般從HTTP_AUTHORIZATION中拿,也可以與前台約定)

    • 如果設置了反爬等措施,校驗一下反爬(頭 token)
  • 沒有token,返回None,代表游客

  • 有token,進入校驗

    • 不通過:拋AuthenticationFailed異常,代表非法用戶
    • 通過:返回 (user, token),代表合法用戶
from rest_framework.authentication import BaseAuthentication

class MyAuthentication(BaseAuthentication):
    def authenticate(self, request):
        # 不設置任何身份認證規則,直接全部通過,進入下一輪認證(權限認證)
        pass

5、自定義簽發token及多方式登陸

重點:

  1. token只能由 登錄接口 簽發;
  2. 登錄接口也是APIView的子類,使用一定會進行'身份認證'和'權限認證'組件的校驗。

結論:不管系統默認、或是全局settings配置的是何認證與權限組件,登錄接口不用參與任何認證與權限的校驗所以,登錄接口一定要進行'身份認證'與'權限認證'的局部禁用

# views.py
from rest_framework.views import APIView
class LoginAPIView(APIView):
    authentication_classes = []
    pagination_class = []
    permission_classes = []
    def post(self,request,*args,**kwargs):
        user_ser = serializers.LoginModelSerializer(data=request.data)
        user_ser.is_valid(raise_exception=True)
        return APIResponse(results={
            'username':user_ser.content.get('user').username,
            'token':user_ser.content.get('token')
        })
    
# serializers.py
from rest_framework_jwt.serializers import jwt_payload_handler,jwt_encode_handler
class LoginModelSerializer(serializers.ModelSerializer):
    username = serializers.CharField(
        max_length=64,
        min_length=3
    )
    password = serializers.CharField(
        max_length=64,
        min_length=3
    )

    class Meta:
        model = models.User
        fields = ['username','password']

    # 在全局鈎子中完成token的簽發
    def validate(self, attrs):
        # 先從model表中查出user對象
        user = self._validate_user(attrs)
        # 將user對象包裝進載荷中
        payload = jwt_payload_handler(user)
        # 將載荷簽發入token
        token = jwt_encode_handler(payload)
        # 將對象和token儲存進serializer對象中,就可以在視圖類中調用
        self.content = {
            'user':user,
            'token':token
        }
        return attrs

    def _validate_user(self,attrs):
        username = attrs.get('username')
        password = attrs.get('password')
        # 多方式登陸
        if re.match(r'.*@.*',username):
            user = models.User.objects.filter(email=username).first()  # type:models.User
        elif re.match(r'^1[3-9][0-9]{9}$',username):
            user = models.User.objects.filter(mobile=username).first()  # type:models.User
        else:
            user = models.User.objects.filter(username=username).first()  # type:models.User

        if not user or not user.check_password(password):
            raise serializers.ValidationError({'message':'用戶信息系有誤!請重新登錄!'})

        return user

二、權限認證

1、權限認證配置

1.1 全局配置權限認證模塊

一般權限認證不做全局配置,因為每個功能對應不同的權限,不好配。

1.2 局部配置權限認證模塊

使用permission_classes類屬性配置:

from rest_framework.permissions import IsAdminUser,IsAuthenticated,IsAuthenticatedOrReadOnly,AllowAny

class UserViewSet(ViewSet):
    # 配置django默認提供的權限認證模塊
    permission_classes = [IsAuthenticated]

2、drf提供的權限認證類

drf默認提供了幾種權限認證模塊:

from rest_framework.permissions import IsAuthenticated, IsAdminUser, AllowAny, IsAuthenticatedOrReadOnly

- AllowAny                  |  游客和登錄用戶有全權限
- IsAuthenticated           |  只有登錄用戶有全權限
- IsAdminUser               |  只有后台用戶(admin用戶)有全權限
- IsAuthenticatedOrReadOnly |  游客有讀權限,登錄用戶有全權限

3、自定義權限認證類

如果有特殊需要,需要自定義權限類

如:只有superuser有權限、只有vip用戶有權限、只有某ip網段用戶有權限、只有某個視圖及其子類有權限

如何自定義權限類:

  1. 繼承BasePermission續寫permission類;
  2. 重寫has_permission方法;
    • 根據需求,request和view的輔助,制定權限規則判斷條件
    • 如果條件通過,返回True
    • 如果條件不通過,返回False
# permission.py
from rest_framework.permissions import BasePermission
# VIP用戶權限
class VIPUserPermission(BasePermission):
    def has_permission(self, request, view):
        for group in request.user.groups.all():
            # 存在於vip組即有權限
            if group.name.lower() == 'vip':
                return True
        # 否則沒有權限
        return False
    
# views.py
class UserViewSet(ViewSet):
    # 局部配置哪些權限可以執行此操作
    permission_classes = [permissions.VIPUserPermission]

    def retrieve(self,request,*args,**kwargs):
        return APIResponse(results={
            'username':request.user.username,
            'email':request.user.email,
            'mobile':request.user.mobile,
            'create_time':request.user.date_joined,
        })

三、節流認證(頻率認證)

1、節流認證配置

需全局與局部配置配合。

局部配置節流模式,全局配置節流模式的流量。

1.1 全局配置節流認證模塊

# settings.py
REST_FRAMEWORK = {
    # 頻率組件:頻率類一般做局部配置,但是頻率調節在settings中配置
    'DEFAULT_THROTTLE_RATES': {
        'user': '5/min',
        'anon': '3/min',
        'mobile': '1/min'
    },
}

1.2 局部配置節流認證模塊

使用throttle_classes類屬性配置:

from rest_framework.viewsets import ViewSet
class UserViewSet(ViewSet):
    
    throttle_classes = []

2、drf提供的節流認證類

drf默認提供了幾種節流認證模式:

from rest_framework.throttling import AnonRateThrottle,UserRateThrottle,ScopedRateThrottle

- AnonRateThrottle      |    scope = 'anon'
- UserRateThrottle      |    scope = 'user'
- ScopedRateThrottle    |    scope_attr = 'throttle_scope'

3、自定義節流認證類

如果有特殊需要,需要自定義頻率類.

如:對ip進行限次、對電話進行限制、對視圖某些信息進行限次.

如何自定義:

  1. 繼承SimpleRateThrottle節流基類續寫throrrle類;
  2. 設置scope字符串類屬性,同時在settings中進行drf配置DEFAULT_THROTTLE_RATES
    • eg: DEFAULT_THROTTLE_RATES = {'mobile': '1/min'}
  3. 重寫get_catch_key方法
    • 返回與限制條件有關的字符串,表示限制
    • 返回None,表示不限制
# throttles.py
from rest_framework.throttling import SimpleRateThrottle
class ModileRateThrottle(SimpleRateThrottle):
    scope = 'mobile'
    def get_cache_key(self, request, view):
        # 游客和沒有手機號的用戶不做限制
        if not request.user.is_authenticated or request.user.mobile:
            return None
        # 設置用戶唯一識別碼
        return self.cache_format % {
            'scope':self.scope,
            'ident':request.user.mobile
        }
    
# views.py
from rest_framework.viewsets import ViewSet
class UserViewSet(ViewSet):
    # throttle_classes = [UserRateThrottle]
    throttle_classes = [throttles.ModileRateThrottle]

    def retrieve(self,request,*args,**kwargs):
        return APIResponse(results={
            'username':request.user.username,
            'email':request.user.email,
            'mobile':request.user.mobile,
            'create_time':request.user.date_joined,
        })


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM