一、認證
認證請求頭
views.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework.permissions import BasePermission
from rest_framework.request import Request
from rest_framework import exceptions
token_list = [
'sfsfss123kuf3j123',
'asijnfowerkkf9812',
]
class TestAuthentication(BaseAuthentication):
def authenticate(self, request):
"""
用戶認證,如果驗證成功后返回元組: (用戶,用戶Token)
:param request:
:return:
None,表示跳過該驗證;
如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置
self._authenticator = None
if api_settings.UNAUTHENTICATED_USER:
self.user = api_settings.UNAUTHENTICATED_USER() # 默認值為:匿名用戶
else:
self.user = None
if api_settings.UNAUTHENTICATED_TOKEN:
self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值為:None
else:
self.auth = None
(user,token)表示驗證通過並設置用戶名和Token;
AuthenticationFailed異常
"""
val = request.query_params.get('token')
if val not in token_list:
raise exceptions.AuthenticationFailed("用戶認證失敗")
return ('登錄用戶', '用戶token')
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
pass
class TestPermission(BasePermission):
message = "權限驗證失敗"
def has_permission(self, request, view):
"""
判斷是否有權限訪問當前請求
Return `True` if permission is granted, `False` otherwise.
:param request:
:param view:
:return: True有權限;False無權限
"""
if request.user == "管理員":
return True
# GenericAPIView中get_object時調用
def has_object_permission(self, request, view, obj):
"""
視圖繼承GenericAPIView,並在其中使用get_object時獲取對象時,觸發單獨對象權限驗證
Return `True` if permission is granted, `False` otherwise.
:param request:
:param view:
:param obj:
:return: True有權限;False無權限
"""
if request.user == "管理員":
return True
class TestView(APIView):
# 認證的動作是由request.user觸發
authentication_classes = [TestAuthentication, ]
# 權限
# 循環執行所有的權限
permission_classes = [TestPermission, ]
def get(self, request, *args, **kwargs):
# self.dispatch
print(request.user)
print(request.auth)
return Response('GET請求,響應內容')
def post(self, request, *args, **kwargs):
return Response('POST請求,響應內容')
def put(self, request, *args, **kwargs):
return Response('PUT請求,響應內容')
自定義認證功能
class MyAuthtication(BasicAuthentication):
def authenticate(self, request):
token = request.query_params.get('token') #注意是沒有GET的,用query_params表示
if token == 'zxxzzxzc':
return ('uuuuuu','afsdsgdf') #返回user,auth
# raise AuthenticationFailed('認證錯誤') #只要拋出認證錯誤這樣的異常就會去執行下面的函數
raise APIException('認證錯誤')
def authenticate_header(self, request): #認證不成功的時候執行
return 'Basic reala="api"'
class UserView(APIView):
authentication_classes = [MyAuthtication,]
def get(self,request,*args,**kwargs):
print(request.user)
print(request.auth)
return Response('用戶列表')

二、權限
1、需求:Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none;User只有注冊用戶能訪問
urls.py
from app03 import views
from django.conf.urls import url
urlpatterns = [
# django rest framework
url('^auth/', views.AuthView.as_view()),
url(r'^hosts/', views.HostView.as_view()),
url(r'^users/', views.UsersView.as_view()),
url(r'^salary/', views.SalaryView.as_view()),
]
views.py
from django.shortcuts import render
from rest_framework.views import APIView #繼承的view
from rest_framework.response import Response #友好的返回
from rest_framework.authentication import BaseAuthentication #認證的類
from rest_framework.authentication import BasicAuthentication
from app01 import models
from rest_framework import exceptions
from rest_framework.permissions import AllowAny #權限在這個類里面
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle
# Create your views here.
# +++++++++++++++認證類和權限類========================
class MyAuthentication(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get('token')
obj = models.UserInfo.objects.filter(token=token).first()
if obj : #如果認證成功,返回用戶名和auth
return (obj.username,obj)
return None #如果沒有認證成功就不處理,進行下一步
def authenticate_header(self, request):
pass
class MyPermission(object):
message = '無權訪問'
def has_permission(self,request,view): #has_permission里面的self是view視圖對象
if request.user:
return True #如果不是匿名用戶就說明有權限
return False #否則無權限
class AdminPermission(object):
message = '無權訪問'
def has_permission(self, request, view): # has_permission里面的self是view視圖對象
if request.user=='haiyun':
return True # 返回True表示有權限
return False #返回False表示無權限
# +++++++++++++++++++++++++++
class AuthView(APIView):
authentication_classes = [] #認證頁面不需要認證
def get(self,request):
self.dispatch
return '認證列表'
class HostView(APIView):
'''需求:
Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none
User只有注冊用戶能訪問
'''
authentication_classes = [MyAuthentication,]
permission_classes = [] #都能訪問就沒必要設置權限了
def get(self,request):
print(request.user)
print(request.auth)
return Response('主機列表')
class UsersView(APIView):
'''用戶能訪問,request.user里面有值'''
authentication_classes = [MyAuthentication,]
permission_classes = [MyPermission,]
def get(self,request):
print(request.user,'111111111')
return Response('用戶列表')
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了'''
raise exceptions.NotAuthenticated(detail='無權訪問')
raise exceptions.PermissionDenied(detail=message)
認證和權限配合使用
class SalaryView(APIView):
'''用戶能訪問'''
message ='無權訪問'
authentication_classes = [MyAuthentication,] #驗證是不是用戶
permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,如果有權限在判斷有沒有管理員的權限
def get(self,request):
return Response('薪資列表')
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了'''
raise exceptions.NotAuthenticated(detail='無權訪問')
raise exceptions.PermissionDenied(detail=message)
如果遇上這樣的,還可以自定制,參考源碼

def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
#循環每一個permission對象,調用has_permission
#如果False,則拋出異常
#True 說明有權訪問
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了'''
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message)
那么我們可以重寫permission_denied這個方法,如下:
views.py
class UsersView(APIView):
'''用戶能訪問,request.user里面有值'''
authentication_classes = [MyAuthentication,]
permission_classes = [MyPermission,]
def get(self,request):
return Response('用戶列表')
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了'''
raise exceptions.NotAuthenticated(detail='無權訪問')
raise exceptions.PermissionDenied(detail=message)

2. 全局使用
上述操作中均是對單獨視圖進行特殊配置,如果想要對全局進行配置,則需要再配置文件中寫入即可。
settings.py
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None, #將匿名用戶設置為None
"DEFAULT_AUTHENTICATION_CLASSES": [
"app01.utils.MyAuthentication",
],
'DEFAULT_PERMISSION_CLASSES':[
"app03.utils.MyPermission",#設置路徑,
]
}
Views.py
class AuthView(APIView):
authentication_classes = [] #認證頁面不需要認證
def get(self,request):
self.dispatch
return '認證列表'
class HostView(APIView):
'''需求:
Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none
User只有注冊用戶能訪問
'''
authentication_classes = [MyAuthentication,]
permission_classes = [] #都能訪問就沒必要設置權限了
def get(self,request):
print(request.user)
print(request.auth)
return Response('主機列表')
class UsersView(APIView):
'''用戶能訪問,request.user里面有值'''
authentication_classes = [MyAuthentication,]
permission_classes = [MyPermission,]
def get(self,request):
print(request.user,'111111111')
return Response('用戶列表')
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了'''
raise exceptions.NotAuthenticated(detail='無權訪問')
raise exceptions.PermissionDenied(detail=message)
class SalaryView(APIView):
'''用戶能訪問'''
message ='無權訪問'
authentication_classes = [MyAuthentication,] #驗證是不是用戶
permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,如果有權限在判斷有沒有管理員的權限
def get(self,request):
return Response('薪資列表')
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
'''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了'''
raise exceptions.NotAuthenticated(detail='無權訪問')
raise exceptions.PermissionDenied(detail=message)
三、限流
1、為什么要限流呢?
答:防爬
2、限制訪問頻率源碼分析
self.check_throttles(request)
self.check_throttles(request)
check_throttles
def check_throttles(self, request):
"""
Check if request should be throttled.
Raises an appropriate exception if the request is throttled.
"""
for throttle in self.get_throttles():
#循環每一個throttle對象,執行allow_request方法
# allow_request:
#返回False,說明限制訪問頻率
#返回True,說明不限制,通行
if not throttle.allow_request(request, self):
self.throttled(request, throttle.wait())
#throttle.wait()表示還要等多少秒就能訪問了
get_throttles
def get_throttles(self):
"""
Instantiates and returns the list of throttles that this view uses.
"""
#返回對象
return [throttle() for throttle in self.throttle_classes]
找到類,可自定制類throttle_classes
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
BaseThrottle
class BaseThrottle(object):
"""
Rate throttling of requests.
"""
def allow_request(self, request, view):
"""
Return `True` if the request should be allowed, `False` otherwise.
"""
raise NotImplementedError('.allow_request() must be overridden')
def get_ident(self, request):
"""
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR')
num_proxies = api_settings.NUM_PROXIES
if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip()
return ''.join(xff.split()) if xff else remote_addr
def wait(self):
"""
Optionally, return a recommended number of seconds to wait before
the next request.
"""
return None
也可以重寫allow_request方法
可自定制返回的錯誤信息throttled
def throttled(self, request, wait):
"""
If request is throttled, determine what kind of exception to raise.
"""
raise exceptions.Throttled(wait)
raise exceptions.Throttled(wait)錯誤信息詳情
class Throttled(APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
default_detail = _('Request was throttled.')
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = 'throttled'
def __init__(self, wait=None, detail=None, code=None):
if detail is None:
detail = force_text(self.default_detail)
if wait is not None:
wait = math.ceil(wait)
detail = ' '.join((
detail,
force_text(ungettext(self.extra_detail_singular.format(wait=wait),
self.extra_detail_plural.format(wait=wait),
wait))))
self.wait = wait
super(Throttled, self).__init__(detail, code)
下面來看看最簡單的從源碼中分析的示例,這只是舉例說明了一下
urls.py
from django.conf.urls import url
from app04 import views
urlpatterns = [
url('limit/',views.LimitView.as_view()),
]
views.py
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import exceptions
# from rest_framewor import
# Create your views here.
class MyThrottle(object):
def allow_request(self,request,view):
#返回False,限制
#返回True,不限制
pass
def wait(self):
return 1000
class LimitView(APIView):
authentication_classes = [] #不讓認證用戶
permission_classes = [] #不讓驗證權限
throttle_classes = [MyThrottle, ]
def get(self,request):
# self.dispatch
return Response('控制訪問頻率示例')
def throttled(self, request, wait):
'''可定制方法設置中文錯誤'''
# raise exceptions.Throttled(wait)
class MyThrottle(exceptions.Throttled):
default_detail = '請求被限制'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = '還需要再等{wait}秒'
raise MyThrottle(wait)
3、需求:對匿名用戶進行限制,每個用戶一分鍾允許訪問10次(只針對用戶來說)
a、基於用戶IP限制訪問頻率
流程分析:
- 先獲取用戶信息,如果是匿名用戶,獲取IP。如果不是匿名用戶就可以獲取用戶名。
- 獲取匿名用戶IP,在request里面獲取,比如IP= 1.1.1.1。
- 吧獲取到的IP添加到到recode字典里面,需要在添加之前先限制一下。
- 如果時間間隔大於60秒,說明時間久遠了,就把那個時間給剔除 了pop。在timelist列表里面現在留的是有效的訪問時間段。
- 然后判斷他的訪問次數超過了10次沒有,如果超過了時間就return False。
- 美中不足的是時間是固定的,我們改變他為動態的:列表里面最開始進來的時間和當前的時間進行比較,看需要等多久。
具體實現:
views初級版本
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import exceptions
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率
import time
# Create your views here.
RECORD = {}
class MyThrottle(BaseThrottle):
def allow_request(self,request,view):
'''對匿名用戶進行限制,每個用戶一分鍾訪問10次 '''
ctime = time.time()
ip = '1.1.1.1'
if ip not in RECORD:
RECORD[ip] = [ctime]
else:
#[152042123,15204212,3152042,123152042123]
time_list = RECORD[ip] #獲取ip里面的值
while True:
val = time_list[-1]#取出最后一個時間,也就是訪問最早的時間
if (ctime-60)>val: #吧時間大於60秒的給剔除了
time_list.pop()
#剔除了之后timelist里面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次
else:
break
if len(time_list) >10:
return False # 返回False,限制
time_list.insert(0, ctime)
return True #返回True,不限制
def wait(self):
ctime = time.time()
first_in_time = RECORD['1.1.1.1'][-1]
wt = 60-(ctime-first_in_time)
return wt
class LimitView(APIView):
authentication_classes = [] #不讓認證用戶
permission_classes = [] #不讓驗證權限
throttle_classes = [MyThrottle, ]
def get(self,request):
# self.dispatch
return Response('控制訪問頻率示例')
def throttled(self, request, wait):
'''可定制方法設置中文錯誤'''
# raise exceptions.Throttled(wait)
class MyThrottle(exceptions.Throttled):
default_detail = '請求被限制'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = '還需要再等{wait}秒'
raise MyThrottle(wait)
稍微做了改動
# from django.shortcuts import render
# from rest_framework.views import APIView
# from rest_framework.response import Response
# from rest_framework import exceptions
# from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率
# import time
# # Create your views here.
# RECORD = {}
# class MyThrottle(BaseThrottle):
#
# def allow_request(self,request,view):
# '''對匿名用戶進行限制,每個用戶一分鍾訪問10次 '''
# ctime = time.time()
# ip = '1.1.1.1'
# if ip not in RECORD:
# RECORD[ip] = [ctime]
# else:
# #[152042123,15204212,3152042,123152042123]
# time_list = RECORD[ip] #獲取ip里面的值
# while True:
# val = time_list[-1]#取出最后一個時間,也就是訪問最早的時間
# if (ctime-60)>val: #吧時間大於60秒的給剔除了
# time_list.pop()
# #剔除了之后timelist里面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次
# else:
# break
# if len(time_list) >10:
# return False # 返回False,限制
# time_list.insert(0, ctime)
# return True #返回True,不限制
#
# def wait(self):
# ctime = time.time()
# first_in_time = RECORD['1.1.1.1'][-1]
# wt = 60-(ctime-first_in_time)
# return wt
#
#
# class LimitView(APIView):
# authentication_classes = [] #不讓認證用戶
# permission_classes = [] #不讓驗證權限
# throttle_classes = [MyThrottle, ]
# def get(self,request):
# # self.dispatch
# return Response('控制訪問頻率示例')
#
# def throttled(self, request, wait):
# '''可定制方法設置中文錯誤'''
# # raise exceptions.Throttled(wait)
# class MyThrottle(exceptions.Throttled):
# default_detail = '請求被限制'
# extra_detail_singular = 'Expected available in {wait} second.'
# extra_detail_plural = 'Expected available in {wait} seconds.'
# default_code = '還需要再等{wait}秒'
# raise MyThrottle(wait)
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import exceptions
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率
import time
# Create your views here.
RECORD = {}
class MyThrottle(BaseThrottle):
def allow_request(self,request,view):
'''對匿名用戶進行限制,每個用戶一分鍾訪問10次 '''
ctime = time.time()
self.ip =self.get_ident(request)
if self.ip not in RECORD:
RECORD[self.ip] = [ctime]
else:
#[152042123,15204212,3152042,123152042123]
time_list = RECORD[self.ip] #獲取ip里面的值
while True:
val = time_list[-1]#取出最后一個時間,也就是訪問最早的時間
if (ctime-60)>val: #吧時間大於60秒的給剔除了
time_list.pop()
#剔除了之后timelist里面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次
else:
break
if len(time_list) >10:
return False # 返回False,限制
time_list.insert(0, ctime)
return True #返回True,不限制
def wait(self):
ctime = time.time()
first_in_time = RECORD[self.ip][-1]
wt = 60-(ctime-first_in_time)
return wt
class LimitView(APIView):
authentication_classes = [] #不讓認證用戶
permission_classes = [] #不讓驗證權限
throttle_classes = [MyThrottle, ]
def get(self,request):
# self.dispatch
return Response('控制訪問頻率示例')
def throttled(self, request, wait):
'''可定制方法設置中文錯誤'''
# raise exceptions.Throttled(wait)
class MyThrottle(exceptions.Throttled):
default_detail = '請求被限制'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = '還需要再等{wait}秒'
raise MyThrottle(wait)

b、用resetframework內部的限制訪問頻率(利於Django緩存)
源碼分析:
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率
BaseThrottle相當於一個抽象類
class BaseThrottle(object):
"""
Rate throttling of requests.
"""
def allow_request(self, request, view):
"""
Return `True` if the request should be allowed, `False` otherwise.
"""
raise NotImplementedError('.allow_request() must be overridden')
def get_ident(self, request): #唯一標識
"""
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR') #獲取IP等
num_proxies = api_settings.NUM_PROXIES
if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip()
return ''.join(xff.split()) if xff else remote_addr
def wait(self):
"""
Optionally, return a recommended number of seconds to wait before
the next request.
"""
return None
SimpleRateThrottle
class SimpleRateThrottle(BaseThrottle):
"""
一個簡單的緩存實現,只需要` get_cache_key() `。被覆蓋。
速率(請求/秒)是由視圖上的“速率”屬性設置的。類。該屬性是一個字符串的形式number_of_requests /期。
周期應該是:(的),“秒”,“M”,“min”,“h”,“小時”,“D”,“一天”。
以前用於節流的請求信息存儲在高速緩存中。
A simple cache implementation, that only requires `.get_cache_key()`
to be overridden.
The rate (requests / seconds) is set by a `rate` attribute on the View
class. The attribute is a string of the form 'number_of_requests/period'.
Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
Previous request information used for throttling is stored in the cache.
"""
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
def get_cache_key(self, request, view):#這個相當於是一個半成品,我們可以來補充它
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden.
May return `None` if the request should not be throttled.
"""
raise NotImplementedError('.get_cache_key() must be overridden')
def get_rate(self):
"""
Determine the string representation of the allowed request rate.
"""
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
#1、一進來會先執行他,
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled.
On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True
self.key = self.get_cache_key(request, view) #2、執行get_cache_key,這里的self.key就相當於我們舉例ip
if self.key is None:
return True
self.history = self.cache.get(self.key, []) #3、得到的key,默認是一個列表,賦值給了self.history,
# 這時候self.history就是每一個ip對應的訪問記錄
self.now = self.timer()
# Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
def throttle_success(self):
"""
Inserts the current request's timestamp along with the key
into the cache.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
def throttle_failure(self):
"""
Called when a request to the API has failed due to throttling.
"""
return False
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
請求一進來會先執行SimpleRateThrottle這個類的構造方法
__init__
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate() #點進去看到需要些一個scope ,2/m
self.num_requests, self.duration = self.parse_rate(self.rate)
get_rate
def get_rate(self):
"""
Determine the string representation of the allowed request rate.
"""
if not getattr(self, 'scope', None): #檢測必須有scope,沒有就報錯了
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
parse_rate
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
allow_request
#2、接下來會先執行他,
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled.
On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True
self.key = self.get_cache_key(request, view) #2、執行get_cache_key,這里的self.key就相當於我們舉例ip
if self.key is None:
return True #不限制
# [114521212,11452121211,45212121145,21212114,521212]
self.history = self.cache.get(self.key, []) #3、得到的key,默認是一個列表,賦值給了self.history,
# 這時候self.history就是每一個ip對應的訪問記錄
self.now = self.timer()
# Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
wait
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
代碼實現:
views.py
###########用resetframework內部的限制訪問頻率##############
class MySimpleRateThrottle(SimpleRateThrottle):
scope = 'xxx'
def get_cache_key(self, request, view):
return self.get_ident(request) #返回唯一標識IP
class LimitView(APIView):
authentication_classes = [] #不讓認證用戶
permission_classes = [] #不讓驗證權限
throttle_classes = [MySimpleRateThrottle, ]
def get(self,request):
# self.dispatch
return Response('控制訪問頻率示例')
def throttled(self, request, wait):
'''可定制方法設置中文錯誤'''
# raise exceptions.Throttled(wait)
class MyThrottle(exceptions.Throttled):
default_detail = '請求被限制'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = '還需要再等{wait}秒'
raise MyThrottle(wait)
記得在settings里面配置
settings.py
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None, #將匿名用戶設置為None
"DEFAULT_AUTHENTICATION_CLASSES": [
"app01.utils.MyAuthentication",
],
'DEFAULT_PERMISSION_CLASSES':[
# "app03.utils.MyPermission",#設置路徑,
],
'DEFAULT_THROTTLE_RATES':{
'xxx':'2/minute' #2分鍾
}
}
#緩存:放在文件
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
'LOCATION': 'cache', #文件路徑
}
}
4、對匿名用戶進行限制,每個用戶1分鍾允許訪問5次,對於登錄的普通用戶1分鍾訪問10次,VIP用戶一分鍾訪問20次
- 比如首頁可以匿名訪問
- #先認證,只有認證了才知道是不是匿名的,
- #權限登錄成功之后才能訪問, ,index頁面就不需要權限了
- If request.user #判斷登錄了沒有
urls.py
from django.contrib import admin
from django.conf.urls import url, include
from app05 import views
urlpatterns = [
url('index/',views.IndexView.as_view()),
url('manage/',views.ManageView.as_view()),
]
views.py
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication #認證需要
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限流處理
from rest_framework.permissions import BasePermission
from rest_framework import exceptions
from app01 import models
# Create your views here.
###############3##認證#####################
class MyAuthentcate(BaseAuthentication):
'''檢查用戶是否存在,如果存在就返回user和auth,如果沒有就返回'''
def authenticate(self, request):
token = request.query_params.get('token')
obj = models.UserInfo.objects.filter(token=token).first()
if obj:
return (obj.username,obj.token)
return None #表示我不處理
##################權限#####################
class MyPermission(BasePermission):
message='無權訪問'
def has_permission(self, request, view):
if request.user:
return True #true表示有權限
return False #false表示無權限
class AdminPermission(BasePermission):
message = '無權訪問'
def has_permission(self, request, view):
if request.user=='haiyan':
return True # true表示有權限
return False # false表示無權限
############3#####限流##################3##
class AnonThrottle(SimpleRateThrottle):
scope = 'wdp_anon' #相當於設置了最大的訪問次數和時間
def get_cache_key(self, request, view):
if request.user:
return None #返回None表示我不限制,登錄用戶我不管
#匿名用戶
return self.get_ident(request) #返回一個唯一標識IP
class UserThrottle(SimpleRateThrottle):
scope = 'wdp_user'
def get_cache_key(self, request, view):
#登錄用戶
if request.user:
return request.user
return None #返回NOne表示匿名用戶我不管
##################視圖#####################
#首頁支持匿名訪問,
#無需要登錄就可以訪問
class IndexView(APIView):
authentication_classes = [MyAuthentcate,] #認證判斷他是不是匿名用戶
permission_classes = [] #一般主頁就不需要權限驗證了
throttle_classes = [AnonThrottle,UserThrottle,] #對匿名用戶和普通用戶的訪問限制
def get(self,request):
# self.dispatch
return Response('訪問首頁')
def throttled(self, request, wait):
'''可定制方法設置中文錯誤'''
# raise exceptions.Throttled(wait)
class MyThrottle(exceptions.Throttled):
default_detail = '請求被限制'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = '還需要再等{wait}秒'
raise MyThrottle(wait)
#需登錄就可以訪問
class ManageView(APIView):
authentication_classes = [MyAuthentcate, ] # 認證判斷他是不是匿名用戶
permission_classes = [MyPermission,] # 一般主頁就不需要權限驗證了
throttle_classes = [AnonThrottle, UserThrottle, ] # 對匿名用戶和普通用戶的訪問限制
def get(self, request):
# self.dispatch
return Response('管理人員訪問頁面')
def throttled(self, request, wait):
'''可定制方法設置中文錯誤'''
# raise exceptions.Throttled(wait)
class MyThrottle(exceptions.Throttled):
default_detail = '請求被限制'
extra_detail_singular = 'Expected available in {wait} second.'
extra_detail_plural = 'Expected available in {wait} seconds.'
default_code = '還需要再等{wait}秒'
raise MyThrottle(wait)
四、總結
1、認證:就是檢查用戶是否存在;如果存在返回(request.user,request.auth);不存在request.user/request.auth=None
2、權限:進行職責的划分
3、限制訪問頻率
認證
- 類:authenticate/authenticate_header ##驗證不成功的時候執行的
- 返回值:
- return None,
- return (user,auth),
- raise 異常
- 配置:
- 視圖:
class IndexView(APIView):
authentication_classes = [MyAuthentication,]
- 全局:
REST_FRAMEWORK = {
'UNAUTHENTICATED_USER': None,
'UNAUTHENTICATED_TOKEN': None,
"DEFAULT_AUTHENTICATION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
權限
- 類:has_permission/has_object_permission
- 返回值:
- True、#有權限
- False、#無權限
- exceptions.PermissionDenied(detail="錯誤信息") #異常自己隨意,想拋就拋,錯誤信息自己指定
- 配置:
- 視圖:
class IndexView(APIView):
permission_classes = [MyPermission,]
- 全局:
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": [
# "app02.utils.MyAuthentication",
],
}
限流
- 類:allow_request/wait PS: scope = "wdp_user"
- 返回值:
return True、#不限制
return False #限制
- 配置:
- 視圖:
class IndexView(APIView):
throttle_classes=[AnonThrottle,UserThrottle,]
def get(self,request,*args,**kwargs):
self.dispatch
return Response('訪問首頁')
- 全局
REST_FRAMEWORK = {
"DEFAULT_THROTTLE_CLASSES":[
],
'DEFAULT_THROTTLE_RATES':{
'wdp_anon':'5/minute',
'wdp_user':'10/minute',
}
}
