認證和權限
所謂認證就是檢測用戶登陸與否,通常與權限對應使用。網站中都是通過用戶登錄后由該用戶相應的角色認證以給予對應的權限。
權限是對用戶對網站進行操作的限制,只有在擁有相應權限時才可對網站中某個功能進行操作。權限總是與認證相輔相成。w
自定制認證規則的重點是繼承內置的BaseAuthentication類,重寫其authenticate()方法。
自定制認證方式一:通過url傳參進行認證

from django.conf.urls import url, include from app01.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions ######偽造的數據庫中存有的token######## token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] ######自定制的認證規則的類,必須繼承BaseAuthentication##### class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登錄用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # 驗證失敗時,返回的響應頭WWW-Authenticate對應的值 pass #####視圖函數,必須繼承APIView##### class TestView(APIView): authentication_classes = [TestAuthentication, ]#中括號中寫入定義了認證規則的類 permission_classes = []#這是權限規則,下文會進行詳述 #只有通過了上述的規則,才能以下執行視圖函數 def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
自定制認證方式二:通過請求頭認證

from django.conf.urls import url, include from app01.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions ######自定制的認證規則的類,必須繼承BaseAuthentication##### class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ import base64 auth = request.META.get('HTTP_AUTHORIZATION', b'')#獲取請求頭 if auth: auth = auth.encode('utf-8')#將bytes類型編碼成utf-8 auth = auth.split() if not auth or auth[0].lower() != b'basic': raise exceptions.AuthenticationFailed('驗證失敗') if len(auth) != 2: raise exceptions.AuthenticationFailed('驗證失敗') username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':') if username == 'Damon' and password == '123': return ('登錄用戶', '用戶token') else: raise exceptions.AuthenticationFailed('用戶名或密碼錯誤') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ return 'Basic realm=api' #####視圖函數,必須繼承APIView##### class TestView(APIView): authentication_classes = [TestAuthentication, ]#中括號中寫入定義了認證規則的類,可放入多個 permission_classes = []#這是權限規則,下文會進行詳述 #只有通過了上述的規則,才能以下執行視圖函數 def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
使用自定制認證和自定制權限

from django.db import models class UserInfo(models.Model): username = models.CharField(max_length=32) password = models.CharField(max_length=64) token = models.CharField(max_length=64,null=True)

from django.conf.urls import url from django.contrib import admin from app02 import views as app02_view urlpatterns = [ # url(r'^admin/', admin.site.urls), url(r'^auth/', app02_view.AuthView.as_view()), url(r'^hosts/', app02_view.HostView.as_view()), url(r'^users/', app02_view.UserView.as_view()), url(r'^salary/', app02_view.SalaryView.as_view()), ]

from django.views import View from rest_framework.views import APIView from rest_framework.authentication import BaseAuthentication from rest_framework import exceptions from rest_framework.response import Response from app02 import models import hashlib import time #####自定制認證規則的類##### class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # return 'Basic realm="api"' pass #####自定制權限的類##### class MyPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user: return True return False #####自定制管理員權限的類##### class AdminPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user == 'Damon':#管理員 return True return False #####視圖函數##### class HostView(APIView): """ 匿名用戶和用戶都能訪問 """ authentication_classes = [MyAuthentication,] permission_classes = [] def get(self,request,*args,**kwargs): # 原來request對象,django.core.handlers.wsgi.WSGIRequest # 現在的request對象,rest_framework.request.Request\ self.dispatch print(request.user) # print(request.user) # print(request.auth) return Response('主機列表') class UserView(APIView): """ 普通用戶能訪問 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,]#自定制普通用戶擁有的權限 def get(self,request,*args,**kwargs): return Response('用戶列表') class SalaryView(APIView): """ 管理員用戶才能訪問 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,AdminPermission,]#自定制普通用戶和管理員擁有的權限 def get(self,request,*args,**kwargs): self.dispatch return Response('薪資列表') def permission_denied(self, request, message=None): #如果沒有通過認證,並且權限中return False,就會報下面的異常,detail為自定制的錯誤信息 if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated(detail='無權訪問') raise exceptions.PermissionDenied(detail=message)
設置全局變量
對於認證和權限,我們可新建utils.py文件,將自定制的類寫入該py文件中,之后在settings.py中進行配置,就可快速使用。在views.py中只需寫入相應視圖函數,無需關心中括號即可實現認證和權限的配置,使得views.py文件中的代碼可讀性更高。

REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None,#將匿名用戶名稱設置為None,默認為Anonymous "DEFAULT_AUTHENTICATION_CLASSES": [ "app02.utils.MyAuthentication",#配置自定制認證類的路徑 ], "DEFAULT_PERMISSION_CLASSES": [ "app02.utils.MyPermission",#配置自定制權限類的路徑 "app02.utils.AdminPermission",#配置自定制權限類的路徑 ], }
限制訪問頻率
建網站的宗旨是為人民服務,供人民訪問。但總有刁民想害朕,如利用機器人爬蟲肆意爬取數據、侵占流量緩存、洪水攻擊等等。所以我們需要對可以訪問網站的用戶進行相應的訪問頻率的限制,以保護我們網站的安全。
訪問的用戶有兩種——登錄用戶、匿名用戶。對於登錄用戶我們可采用唯一標識進行標記,而匿名用戶我們通常采用IP對其進行標記。
a、基於用戶IP限制訪問頻率
流程分析:
- 首先獲取用戶信息,如果是匿名用戶,獲取IP。如果不是匿名用戶獲取其用戶名。
- 在request里面獲取匿名用戶IP(也有可能是代理的IP),如IP= 127.1.1.1。
- 將獲取到的IP添加到到recode字典里面,需要在添加之前先限制一下。
- 如果時間間隔大於60秒(可自定制時長),說明村吃的該用戶最早一次訪問的時間久遠、已失效,將該次訪問的時間pop。在timelist列表里現在留的是有效的訪問時間段。
- 判斷該IP訪問次數是否超過10次,如果超過return False予以限制。
具體實現:重點是繼承BaseThrottle類,重寫其allow_request()和wait()方法

from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions RECORD = { } #####自定制對訪問頻率限制的類##### class MyThrottle(BaseThrottle): def allow_request(self,request,view): """ # 返回False,限制 # 返回True,通行 :param request: :param view: :return: """ """ 對匿名用戶進行限制:每個用戶1分鍾允許訪問10次 - 獲取用戶IP request 1.1.1.1 """ import time ctime = time.time() ip = self.get_ident() if ip not in RECORD: RECORD[ip] = [ctime,]#該IP是首次訪問,存儲當前訪問時間 else: # [4507862389234,3507862389234,2507862389234,1507862389234,],原先存有的訪問時間 time_list = RECORD[ip] while True: val = time_list[-1]#取出最早訪問的時間 if (ctime-60) > val:#檢測這次訪問時間與最早訪問時間間隔是否超過一分鍾 time_list.pop()#是,則更新列表中最早的訪問時間(確保列表內的訪問時間與此次訪問的時間間隔在一分鍾內) else: break#否,則表示在一分鍾內有多次訪問 if len(time_list) > 10:#檢測一分鍾訪問次數是否超過十次則限制 return False#是則限制 time_list.insert(0,ctime)#不超過十次則將當前時間存入列表 return True def wait(self): import time ctime = time.time() first_in_time = RECORD[self.get_ident()][-1] wt = 60 - (ctime - first_in_time)#動態顯示需要等待的時間60-(當前時間 - 最近一次訪問時間) return wt #####視圖函數##### class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MyThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('控制訪問頻率示例') def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """ #可自定制該方法設置中文的錯誤提示信息 class MyThrottled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = '還需要再等待{wait}秒' raise MyThrottled(wait)
b. 使用配置文件,基於用戶IP限制訪問頻率(利於Django緩存)
源碼分析

class SimpleRateThrottle(BaseThrottle): """ 一個簡單的緩存實現,只需要` get_cache_key() `。被覆蓋。 速率(請求/秒)是由視圖上的“速率”屬性設置的。類。該屬性是一個字符串的形式number_of_requests /期。 周期應該是:(的),“秒”,“M”,“min”,“h”,“小時”,“D”,“一天”。 以前用於節流的請求信息存儲在高速緩存中。 A simple cache implementation, that only requires `.get_cache_key()` to be overridden. The rate (requests / seconds) is set by a `throttle` attribute on the View class. The attribute is a string of the form 'number_of_requests/period'. Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day') Previous request information used for throttling is stored in the cache. """ cache = default_cache timer = time.time cache_format = 'throttle_%(scope)s_%(ident)s' scope = None THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, 'rate', None): self.rate = self.get_rate()#點進去看到需要些一個scope ,2/m self.num_requests, self.duration = self.parse_rate(self.rate) def get_cache_key(self, request, view):#這個相當於是一個半成品,我們可以來補充它 """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ raise NotImplementedError('.get_cache_key() must be overridden') def get_rate(self): """ Determine the string representation of the allowed request rate. """ if not getattr(self, 'scope', None):#檢測必須有scope,沒有就報錯了 msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg) def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """ if rate is None: return (None, None) num, period = rate.split('/')#取配置文件並切分 'wdp':'2/minute' num_requests = int(num)#2 duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]#用於配置文件的信息minute取m,即60秒 return (num_requests, duration) # 2、執行完構造方法后執行, def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True self.key = self.get_cache_key(request, view)#3、執行get_cache_key if self.key is None: return True#不限制 self.history = self.cache.get(self.key, [])#4、得到的key,默認是一個列表,賦值給了self.history, # self.history可以理解為每一個ip對應的訪問記錄 self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() def throttle_success(self): """ Inserts the current request's timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)
自定制
settings.py中:
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'test_scope': '10/m',#自定制限制的時間 一分鍾10次 }, }

from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions class MySimpleRateThrottle(SimpleRateThrottle): scope = "test_scope"#scope不可改,字符串需與配置文件相同 def get_cache_key(self, request, view): return self.get_ident(request) class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MySimpleRateThrottle,] def get(self,request,*args,**kwargs): # self.dispatch return Response('控制訪問頻率示例') def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """ #可自定制該方法設置中文的錯誤提示信息 class MyThrottled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = '還需要再等待{wait}秒'
c.對不同用戶進行不同的限流操作
對匿名用戶每個用戶1分鍾允許訪問5次,對於登錄的普通用戶1分鍾訪問10次,VIP用戶一分鍾訪問20次。
操作流程:
- 首頁可以匿名訪問
- 先認證,只有認證了才知道是不是匿名的,
- 權限登錄成功之后才能訪問, index頁面無需權限即可訪問
- 限流在配置文件中
settings.py 中進行配置
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': {#自定制鍵值對 'obj_anon': '10/m',#匿名用戶 'obj_user': '20/m',#登錄用戶 'obj_VIPuser':'20/minute',#VIP用戶 }, }

from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework.authentication import BaseAuthentication from rest_framework import exceptions from app02 import models class MyAuthentication(BaseAuthentication): #檢測用戶是否登錄 def authenticate(self, request): token = request.query_params.get('token')#登錄用戶有tocken字段 obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None#未登錄用戶不處理 def authenticate_header(self, request): pass class MyPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user: return True #True表示有權限 return False #False表示無權限 class AdminPermission(object): message = "無權訪問" def has_permission(self,request,view): if request.user == 'DamonVIP':#VIP用戶 return True return False ######對匿名用戶進行限流的類##### class AnonThrottle(SimpleRateThrottle): scope = "obj_anon" def get_cache_key(self, request, view): # 返回None,表示我不限制 # 登錄用戶我不管 if request.user: return None # 匿名用戶 return self.get_ident(request) ######對登錄用戶進行限流的類##### class UserThrottle(SimpleRateThrottle): scope = "obj_user" def get_cache_key(self, request, view): # 登錄用戶 if request.user: return request.user # 匿名用戶我不管 return None ######對VIP用戶進行限流的類##### class VIPUserThrottle(SimpleRateThrottle): scope = "obj_VIPuser" def get_cache_key(self, request, view): # VIP用戶 if request.user=='DamonVIP':#VIP用戶 return request.user # 匿名用戶我不管 return None #####視圖函數##### # 首頁無需登錄就可以訪問 class IndexView(APIView): authentication_classes = [MyAuthentication,]#認證判斷他是不是匿名用戶 permission_classes = [] #主頁無需權限驗證 throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle]#對匿名用戶和普通用戶的訪問限制 def get(self,request,*args,**kwargs): # self.dispatch return Response('訪問首頁') def throttled(self, request, wait): '''可定制方法設置中文錯誤''' # raise exceptions.Throttled(wait) class MyThrottle(exceptions.Throttled): default_detail = '請求被限制' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = 'Expected available in {wait} seconds.' default_code = '還需要再等{wait}秒' raise MyThrottle(wait) # 需登錄就可以訪問 class ManageView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle] def get(self,request,*args,**kwargs): # self.dispatch return Response('訪問首頁') # 需登錄就可以訪問 class SalaryView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle] def get(self,request,*args,**kwargs): # self.dispatch return Response('訪問首頁')
設置全局變量
與認證和權限相似,限流操作也可新建utils.py文件,將自定制的類寫入該py文件中,之后在settings.py中進行配置,就可快速使用。在views.py中只需寫入相應視圖函數,無需關心中括號即可實現認證和權限的配置,使得views.py文件中的代碼可讀性更高。

REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES': [ 'app04.utils.throttles.throttles.MyAnonRateThrottle', 'app04.utils.throttles.throttles.MyUserRateThrottle', ], 'DEFAULT_THROTTLE_RATES': { 'anon': '10/day', 'user': '10/day', 'My_anon': '10/m', 'My_user': '20/m', 'My_VIPuser': '50/m', }, }