認證:
定義一個用戶表和一個保存用戶的Token表
# ======================day96======================= class UserInfo(models.Model): username = models.CharField(max_length=16,unique=True) password = models.CharField(max_length=32) type = models.SmallIntegerField( choices=((1,"普通用戶"),(2,"VIP用戶")), default=1 ) class Token(models.Model): token = models.CharField(max_length=128) user = models.OneToOneField(to="UserInfo",on_delete=models.CASCADE)
定義一個登陸視圖:
# 生成Token的函數 def get_token_code(username): """ 根據用戶名和時間戳來生成永不相同的token隨機字符串 :param username: 字符串格式的用戶名 :return: 字符串格式的Token """ import time import hashlib timestamp = str(time.time()) m = hashlib.md5(username.encode("utf-8")) # md5 要傳入字節類型的數據 m.update(timestamp.encode("utf-8")) return m.hexdigest() # 將生成的隨機字符串返回 # 登陸視圖 class LoginView(APIView): ''' 登陸檢測試圖。 1,接收用戶發過來的用戶名和密碼數據 2,校驗用戶密碼是否正確 - 成功就返回登陸成功,然后發Token - 失敗就返回錯誤提示 ''' def post(self,request): res = {"code":0} # 從post 里面取數據 print(request.data) username = request.data.get("username") password = request.data.get("password") # 去數據庫查詢 user_obj = models.UserInfo.objects.filter( username = username, password = password ).first() if user_obj: # 登陸成功 # 生成Token token = get_token_code(username) # 將token保存 # 用user = user_obj 這個條件去Token表里查詢。 # 如果又記錄就更新default里傳的參數,沒有記錄就用default里傳的參數創建一條數據。 models.Token.objects.update_or_create(defaults={"token":token},user = user_obj) # 將token返回給用戶 res["token"] = token else: # 登陸失敗 res["code"] = 1 res["error"] = "用戶名或密碼錯誤" return Response(res)
新建一個utils文件夾 下面放一些組件:
定義一個MyAuth認證類:
""" 這里放自定義的認證類 """ from rest_framework.authentication import BaseAuthentication from app01 import models from rest_framework.exceptions import AuthenticationFailed class MyAuth(BaseAuthentication): def authenticate(self, request): # print(request.method) if request.method in ["POST","PUT","DELETE"]: # 如果請求是post,put,delete三種類型時 # 獲取隨用戶請求發來的token隨機碼 token = request.data.get("token") # 然后去數據庫查詢有沒有這個token token_obj = models.Token.objects.filter(token=token).first() if token_obj: # 如果存在,則說明驗證通過,以元組形式返回用戶對象和token return token_obj.user,token else: # 不存在就直接拋錯 raise AuthenticationFailed("無效的token") else: # 這一步的else 是為了當用戶是get請求時也可獲取數據,並不需要驗證token. return None,None
視圖級別認證:
class CommentViewSet(ModelViewSet): queryset = models.Comment.objects.all() serializer_class = app01_serializers.CommentSerializer authentication_classes = [MyAuth,] permission_classes = [MyPermission,]
全局級別認證:需要在settings.py文件設置:
# REST FRAMEWORK 相關的配置 REST_FRAMEWORK = { # 關於認證的全局配置 # "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",], # "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"], # "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",], "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",], "DEFAULT_THROTTLE_RATES":{ "XXX":"5/m", } }
權限:
只有VIP用戶才能看的內容:
自定義權限類:
''' 自定義的權限類 ''' from rest_framework.permissions import BasePermission class MyPermission(BasePermission): message = "sorry,您沒有權限" def has_permission(self, request, view): # 內置封裝的方法 ''' 判斷該用戶有沒有權限 ''' # 判斷用戶是不是VIP用戶 # 如果是VIP用戶就返回True # 如果是普通用戶就返會Flase if request.method in ["POST","PUT","DELETE"]: # print(111) print(request.user.username) print(request.user.type) print(type(request.user.type)) if request.user.type == 2: # 是VIP用戶 print(2222) return True else: return False else: return True def has_object_permission(self, request, view, obj): # 用來判斷針對的obj權限: # 例如:是不是某一個人的評論 ''' 只有評論人是自己才能刪除選定的評論 ''' if request.method in ["PUT","DELETE"]: print(obj.user.username) print(request.user.username) if obj.user == request.user: # 表示當前評論對象的用戶就是登陸用戶 return True else: return False else: return True
視圖級別配置:
class CommentViewSet(ModelViewSet): queryset = models.Comment.objects.all() serializer_class = app01_serializers.CommentSerializer authentication_classes = [MyAuth,] permission_classes = [MyPermission,]
全局級別配置:
# REST FRAMEWORK 相關的配置 REST_FRAMEWORK = { # 關於認證的全局配置 # "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",], # "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"], # "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",], "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",], "DEFAULT_THROTTLE_RATES":{ "XXX":"5/m", } }
限制:
自定義限制類:
''' 自定義的訪問限制類 ''' from rest_framework.throttling import BaseThrottle,SimpleRateThrottle import time # ============================= # DIC = {} # # class MyThrottle(BaseThrottle): # def allow_request(self, request, view): # ''' # 返回True就放行,返回False表示被限制了 # ''' # # # 獲取當前訪問的ip地址 # ip = request.META.get("REMOTE_ADDR") # # # 獲取當前時間 # now = time.time() # # # 判斷當前ip是否有訪問記錄 # if ip not in DIC: # DIC[ip] = [] # 如果沒有訪問記錄初始化一個空的訪問歷史列表 # # # 高端操作 # history = DIC[ip] # # 當當前ip存在訪問記錄,且現在的訪問時間比最初的一次訪問時間大於10秒 # while history and now - history[-1] > 10: # history.pop() # 刪掉歷史列表中的最后一個記錄 # # 判斷最近一分鍾的訪問次數是否超過了閾值(3次) # if len(history)>=3: # return False # else: # # 把這一次的訪問時間加到訪問歷史列表的第一位 # DIC[ip].insert(0,now) # return True # ============================== # 以上代碼等同於一下代碼 class VisitThrottle(SimpleRateThrottle): scope = 'XXX' def get_cache_key(self, request, view): return self.get_ident(request) # 求當前訪問的IP
視圖級別:
from app01.utils.auth import MyAuth from app01.utils.permission import MyPermission from app01.utils.throttle import SimpleRateThrottle # from app01.utils.throttle import class CommentViewSet(ModelViewSet): queryset = models.Comment.objects.all() serializer_class = app01_serializers.CommentSerializer authentication_classes = [MyAuth,] permission_classes = [MyPermission,]
全局級別:
# REST FRAMEWORK 相關的配置 REST_FRAMEWORK = { # 關於認證的全局配置 # "DEFAULT_AUTHENTICATION_CALSSES": ["app01.utils.auth.MyAuth",], # "DEFAULT_PERMISSION_CLASSES" : ["app01.utils.permission.MyPermission"], # "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.MyThrottle",], "DEFAULT_THROTTLE_CLASSES" : ["app01.utils.throttle.VisitThrottle",], "DEFAULT_THROTTLE_RATES":{ "XXX":"5/m", } }
