DRF 認證、權限、限制


認證:

  定義一個用戶表和一個保存用戶的Token表

# ======================day96=======================

class UserInfo(models.Model):
    username = models.CharField(max_length=16,unique=True)
    password = models.CharField(max_length=32)

    type = models.SmallIntegerField(
        choices=((1,"普通用戶"),(2,"VIP用戶")),
        default=1
    )

class Token(models.Model):
    token = models.CharField(max_length=128)
    user = models.OneToOneField(to="UserInfo",on_delete=models.CASCADE)

 

  定義一個登陸視圖:

# 生成Token的函數
def get_token_code(username):
    """
    根據用戶名和時間戳來生成永不相同的token隨機字符串
    :param username: 字符串格式的用戶名
    :return: 字符串格式的Token
    """

    import time
    import hashlib

    timestamp = str(time.time())
    m = hashlib.md5(username.encode("utf-8"))
    # md5 要傳入字節類型的數據
    m.update(timestamp.encode("utf-8"))
    return m.hexdigest()  # 將生成的隨機字符串返回

# 登陸視圖

class LoginView(APIView):
    '''
    登陸檢測試圖。
    1,接收用戶發過來的用戶名和密碼數據
    2,校驗用戶密碼是否正確
        - 成功就返回登陸成功,然后發Token
        - 失敗就返回錯誤提示
    '''

    def post(self,request):
        res = {"code":0}
        # 從post 里面取數據
        print(request.data)
        username = request.data.get("username")
        password = request.data.get("password")
        # 去數據庫查詢
        user_obj = models.UserInfo.objects.filter(
            username = username,
            password = password
        ).first()
        if user_obj:
            # 登陸成功
            # 生成Token
            token = get_token_code(username)
            # 將token保存
            # 用user = user_obj 這個條件去Token表里查詢。
            # 如果又記錄就更新default里傳的參數,沒有記錄就用default里傳的參數創建一條數據。
            models.Token.objects.update_or_create(defaults={"token":token},user = user_obj)
            # 將token返回給用戶
            res["token"] = token
        else:
            # 登陸失敗
            res["code"] = 1
            res["error"] = "用戶名或密碼錯誤"
        return Response(res)

 

新建一個utils文件夾 下面放一些組件:

 

  定義一個MyAuth認證類:

"""
這里放自定義的認證類
"""
from rest_framework.authentication import BaseAuthentication
from app01 import models
from rest_framework.exceptions import AuthenticationFailed


class MyAuth(BaseAuthentication):

    def authenticate(self, request):
        # print(request.method)
        if request.method in ["POST","PUT","DELETE"]:
            # 如果請求是post,put,delete三種類型時
            # 獲取隨用戶請求發來的token隨機碼
            token = request.data.get("token")
            # 然后去數據庫查詢有沒有這個token
            token_obj = models.Token.objects.filter(token=token).first()
            if token_obj:
                # 如果存在,則說明驗證通過,以元組形式返回用戶對象和token
                return token_obj.user,token
            else:
                # 不存在就直接拋錯
                raise AuthenticationFailed("無效的token")
        else:
        # 這一步的else 是為了當用戶是get請求時也可獲取數據,並不需要驗證token.
            return None,None

 

視圖級別認證:

class CommentViewSet(ModelViewSet):
    queryset = models.Comment.objects.all()
    serializer_class = app01_serializers.CommentSerializer
    authentication_classes = [MyAuth,]
    permission_classes = [MyPermission,]

全局級別認證:需要在settings.py文件設置:

# REST FRAMEWORK 相關的配置

REST_FRAMEWORK = {
    # 關於認證的全局配置
    # "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",],
    # "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"],
    # "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",],
    "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",],
    "DEFAULT_THROTTLE_RATES":{
        "XXX":"5/m",
    }
}

 

權限:

  只有VIP用戶才能看的內容:

  自定義權限類:

'''
自定義的權限類
'''

from rest_framework.permissions import BasePermission

class MyPermission(BasePermission):
    message = "sorry,您沒有權限"
    def has_permission(self, request, view):
        # 內置封裝的方法
        '''
        判斷該用戶有沒有權限
        '''
        # 判斷用戶是不是VIP用戶
        # 如果是VIP用戶就返回True
        # 如果是普通用戶就返會Flase

        if request.method in ["POST","PUT","DELETE"]:
            # print(111)
            print(request.user.username)
            print(request.user.type)
            print(type(request.user.type))
            if request.user.type == 2:   # 是VIP用戶
                print(2222)
                return True
            else:
                return False
        else:
            return True

    def has_object_permission(self, request, view, obj):
        # 用來判斷針對的obj權限:
        # 例如:是不是某一個人的評論
        '''
        只有評論人是自己才能刪除選定的評論
        '''
        if request.method in ["PUT","DELETE"]:
            print(obj.user.username)
            print(request.user.username)
            if obj.user == request.user:
                # 表示當前評論對象的用戶就是登陸用戶
                return True
            else:
                return False
        else:
            return True

 

視圖級別配置:

class CommentViewSet(ModelViewSet):
    queryset = models.Comment.objects.all()
    serializer_class = app01_serializers.CommentSerializer
    authentication_classes = [MyAuth,]
    permission_classes = [MyPermission,]

全局級別配置:

# REST FRAMEWORK 相關的配置

REST_FRAMEWORK = {
    # 關於認證的全局配置
    # "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",],
    # "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"],
    # "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",],
    "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",],
    "DEFAULT_THROTTLE_RATES":{
        "XXX":"5/m",
    }
}

 

限制:

  自定義限制類:

'''
自定義的訪問限制類
'''

from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
import time

# =============================
# DIC = {}
#
# class MyThrottle(BaseThrottle):
#     def allow_request(self, request, view):
#         '''
#         返回True就放行,返回False表示被限制了
#         '''
#
#         # 獲取當前訪問的ip地址
#         ip = request.META.get("REMOTE_ADDR")
#
#         # 獲取當前時間
#         now = time.time()
#
#         # 判斷當前ip是否有訪問記錄
#         if ip not in DIC:
#             DIC[ip] = []    # 如果沒有訪問記錄初始化一個空的訪問歷史列表
#
#         # 高端操作
#         history = DIC[ip]
#         # 當當前ip存在訪問記錄,且現在的訪問時間比最初的一次訪問時間大於10秒
#         while history and now - history[-1] > 10:
#             history.pop()   # 刪掉歷史列表中的最后一個記錄
#         # 判斷最近一分鍾的訪問次數是否超過了閾值(3次)
#         if len(history)>=3:
#             return False
#         else:
#             # 把這一次的訪問時間加到訪問歷史列表的第一位
#             DIC[ip].insert(0,now)
#             return True


# ==============================
# 以上代碼等同於一下代碼
class VisitThrottle(SimpleRateThrottle):
    scope = 'XXX'

    def get_cache_key(self, request, view):
        return self.get_ident(request)   # 求當前訪問的IP

 

視圖級別:

from app01.utils.auth import MyAuth
from app01.utils.permission import MyPermission
from app01.utils.throttle import SimpleRateThrottle
# from app01.utils.throttle import
class CommentViewSet(ModelViewSet):
    queryset = models.Comment.objects.all()
    serializer_class = app01_serializers.CommentSerializer
    authentication_classes = [MyAuth,]
    permission_classes = [MyPermission,]

 

全局級別:

# REST FRAMEWORK 相關的配置

REST_FRAMEWORK = {
    # 關於認證的全局配置
    # "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",],
    # "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"],
    # "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",],
    "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",],
    "DEFAULT_THROTTLE_RATES":{
        "XXX":"5/m",
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM