django 項目認證權限實際操作(使用了DRF)


Django REST framework 認證和權限:
1, 認證和權限是不同的
2, 權限控制可以限制用戶對於視圖的訪問和對於具體數據對象的訪問
3, 認證是檢驗是 能否通過強調的是檢驗身份
4, 項目采用的認證是 全局認證 權限代碼 可以隨時更改權限
5, 自定義認證主要是 獲取前端的 token 值 然后進行 token 檢驗!

首先 定義試圖類

from rest_framework.generics import GenericAPIView
from django_test.auth import AsAdminRole
class view(GenericAPIView):
    # 使用的認證方式!
    authentication_classes = (
        JWTAuthentication,
        CookieAuthentication
    )
    # 自定義全局權限
    permission_classes = (AsAdminRole, )

    '''AsAdminRole 是自定義的權限類!項目會自動調用權限檢查!'''
    # 試用裝飾器可以更改權限 view 類重新指定權限為 AsOperatorRole
    @AsOperatorRole   # 相當於 get = AsOperatorRole(get(self, request))
    def get(self,request):
        return HttpResponse( 'hello world!' )
在 setting文件之中配置權限,(也可以寫在試圖里面,這樣不需要寫在配置文件里面)
REST_FRAMEWORK = {
    # 認證和權限放在一起的!在view之中需要設置
    'DEFAULT_AUTHENTICATION_CLASSES': (
        #'rest_framework.authentication.BasicAuthentication',  # 基本認證
        #'rest_framework.authentication.SessionAuthentication',  # session認證
        # 項目采用自定義的認證(全局)
        'antilles.user.plugins.JWTAuthentication'
    ),
    # 權限設置!
    'DEFAULT_PERMISSION_CLASSES': (
        # 僅通過認證的用戶可以通過!
        # 'rest_framework.permissions.IsAuthenticated',
        # 'rest_framework.permissions.AllowAny',  # 允許所有的!
        'django_test.auth.AsAdminRole', # 指定自定義的權限認證
    )
}
登錄生成token:
class SessionView(APIView):
    permission_classes = ()

    def get(self, request):
        user = request.user
        if isinstance(user, AnonymousUser):
            raise AuthenticationFailed(
                detail='Incorrect authentication credentials.'
            )

        role = request.GET.get('role')
        if role is not None and not user.check_role(role):
            raise PermissionDenied(
                detail='Incorrect user role.'
            )

        return Response()

    @json_schema_validate({
        'type': 'object',
        'properties': {
            'user': {
                'type': 'string',
                'minLength': 1
            },
            'pass': {
                'type': 'string',
                'minLength': 1
            },
        },
    })
    def post(self, request):
        '''get auth token
        this api can be used in two method:
            request contains username/password in body, return token
            request contains exists valid token, return a new token
        '''
        if isinstance(request.user, AnonymousUser):
            return self.login(request)
        else:
            return self.renew_token(request)

    def login(self, request):
        try:
            username = request.data['user']
            password = request.data['pass']
            user = User.objects.get(username=username)
        except User.DoesNotExist as e:
            logger.exception('User[ %s ] is not exists', username)
            raise_from(
                LoginFail, e
            )
        else:
            if not user.is_activate():
                raise LoginFail(user)
            success = authenticate(user=user, password=password)
            if not success:
                user.login_fail()
                logger.info('User[ %s ] authenticated failed', username)
                raise LoginFail()
            user.login_success()
            # 登錄返還給前端 token的值
            return Response({'token':  self.build_token(user)})

    def renew_token(self, request):
        user = request.user
        if not user.is_activate():
            raise LoginFail(user)
        return Response({'token':  self.build_token(user)})
    # 登錄生成 token
    def build_token(self, user):
        import jwt
        from cryptography.fernet import Fernet
        from django.utils.timezone import now

        now = now()
        return jwt.encode(
            {
                'id': user.pk,
                'iss': 'antilles-user',
                'sub': user.username,
                'role': user.get_role_display(),
                'iat': now,
                'nbf': now,
                'exp': now + settings.TOKEN_EXPIRE,
                'jti': Fernet.generate_key(),
            },
            settings.SECRET_KEY,
            algorithm=settings.TOKEN_ALGORITHMS
        )
View Code
自定義權限控制類
#coding:utf-8
from abc import ABCMeta, abstractproperty

from django.contrib.auth.models import AnonymousUser
from rest_framework import permissions
from six import add_metaclass

ROLE_ADMIN = 300
ROLE_OPERATOR = 200
ROLE_USER = 100

#角色具有三種權限的角色
USER_ROLES = [
    (ROLE_ADMIN, 'admin'),
    (ROLE_OPERATOR, 'operator'),
    (ROLE_USER, 'user'),
]
#抽象基礎類 ABCMeta!
@add_metaclass(ABCMeta)
class BaseRole(permissions.BasePermission):
    def __new__(cls, func=None):
        # 在此處 func 指的是 get(self,request)
        """
        Overwrite __new__ for use premission class as decorator

        example:
        >>> class TestView(APIView):
        ...     # if use class as decorator, __new__ get a argument func
        ...     @AsUserRole
        ...     def get(self, request):
        ...         pass

        premission defined by decorator will **overwrite** class premission
        """
        if func:        # use as decorator
            func.permission_class = cls  # 增加一個函數屬性值為 AsUserRole
            return func
        else:
            return permissions.BasePermission.__new__(cls)

    @abstractproperty
    def floor(self):
        pass

    # 自定義繼承類需要重寫 has_permission和 has_object_permission方法!
    def has_permission(self, request, view):
        '''此方法必須有返回值(True/false)'''
        if not request.user:
            return False
        # request.user獲取當前的登陸用戶User對象。如果當前用戶沒有登陸,那么request.user將會是我們所說的AnonymousUser對象
        if isinstance(request.user, AnonymousUser):
            return False

        # if defined method permission
        method = getattr(view, request.method.lower())
        # 取出 __new__ 方法增加的屬性值(也是類對象)!是綁定到 方法 method 之中的
        method_permission = getattr(method, 'permission_class', None)
        # 判斷權限級別是否大於自定義權限!權限是按照數據庫role大小來衡量
        if method_permission:
            return request.user.role >= method_permission.floor

        return request.user.role >= self.floor


class AsUserRole(BaseRole):
    floor = ROLE_USER


class AsOperatorRole(BaseRole):
    floor = ROLE_OPERATOR


class AsAdminRole(BaseRole):
    floor = ROLE_ADMIN
View Code
自定義認證類!
import logging

from django.conf import settings
from rest_framework.authentication import (
    BaseAuthentication, get_authorization_header,
)
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
from six import raise_from

from .managers.pam import auth
from .models import User

logger = logging.getLogger(__name__)

class JWTAuthentication(BaseAuthentication):
    keyword = 'Jwt'
    # 自定義認證需要重寫 authenticate 方法!
    def authenticate(self, request):
        # 由於 header 的頭部是 byte 類型此方法 轉換成 字符串 然后以空格為界 轉換列表
        # AUTHORIZATION這個就是之前服務端返回的,只不過Django會默認加上HTTP
        auth = get_authorization_header(request).split()
        # auth = ["Token", "c4840b5226a65806c586c239345fce66caf12409"]
        if not auth or auth[0].lower() != self.keyword.lower().encode():
            return None

        if len(auth) == 1:
            raise AuthenticationFailed(
                detail='Invalid token header. No credentials provided.'
            )
        elif len(auth) > 2:
            raise AuthenticationFailed(
                detail='Invalid token header.'
                'Token string should not contain spaces.'
            )

        try:
            token = auth[1].decode()# token = c4840b5226a65806c586c239345fce66caf12409
        except UnicodeError:
            raise AuthenticationFailed(
                detail='Invalid token header.'
                'Token string should not contain invalid characters.'
            )
        return self.authenticate_credentials(token)
    
    # token 認證!
    def authenticate_credentials(self, token):
        import jwt
        from jwt import InvalidTokenError
        try:
            payload = jwt.decode(
                token, settings.SECRET_KEY,
                options={
                    'verify_signature': True,
                    'verify_exp': True,
                    'verify_nbf': True,
                    'verify_iat': True,
                    'require_exp': True,
                    'require_nbf': True,
                    'require_iat': True,
                    'require_iss': True,
                    'require_jti': True,
                    'require_role': True,
                    'require_sub': True,
                    'require_mgt': True
                }
            )
            # sub: jwt所面向的用戶
            user = User.objects.get(username=payload['sub'])

            payload_role = User.get_role_value(payload['role'])
            if payload_role > user.role:
                raise PermissionDenied(
                    'Insufficient permission.'
                )

            return user, payload
        except InvalidTokenError as e:
            raise_from(
                AuthenticationFailed, e
            )
        except User.DoesNotExist as e:
            raise_from(
                AuthenticationFailed, e
            )
    #  “jwt” 自定義的字符串 生成的token前面
    def authenticate_header(self, request):
        return self.keyword


class CookieAuthentication(JWTAuthentication):
    def authenticate(self, request):
        token = request.COOKIES.get('token')
        if token is None:
            return None
        token = token.decode()

        return self.authenticate_credentials(token)
View Code
驗證結果

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM