一、認證和授權
a. 用戶url傳入的token認證

from django.conf.urls import url, include from web.viewsimport TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import BasePermission from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值為:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值為:None else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登錄用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # 驗證失敗時,返回的響應頭WWW-Authenticate對應的值 pass class TestView(APIView): # 認證的動作是由request.user觸發 authentication_classes = [TestAuthentication, ] # 權限 # 循環執行所有的權限 permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') views.py
b. 請求頭認證

from django.conf.urls import url, include from web.viewsimport TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ import base64 auth = request.META.get('HTTP_AUTHORIZATION', b'') if auth: auth = auth.encode('utf-8') auth = auth.split() if not auth or auth[0].lower() != b'basic': raise exceptions.AuthenticationFailed('驗證失敗') if len(auth) != 2: raise exceptions.AuthenticationFailed('驗證失敗') username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':') if username == 'alex' and password == '123': return ('登錄用戶', '用戶token') else: raise exceptions.AuthenticationFailed('用戶名或密碼錯誤') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ return 'Basic realm=api' class TestView(APIView): authentication_classes = [TestAuthentication, ] permission_classes = [] def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
c. 多個認證規則

from django.conf.urls import url, include from web.views.s2_auth import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class Test1Authentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值為:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值為:None else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ import base64 auth = request.META.get('HTTP_AUTHORIZATION', b'') if auth: auth = auth.encode('utf-8') else: return None print(auth,'xxxx') auth = auth.split() if not auth or auth[0].lower() != b'basic': raise exceptions.AuthenticationFailed('驗證失敗') if len(auth) != 2: raise exceptions.AuthenticationFailed('驗證失敗') username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':') if username == 'alex' and password == '123': return ('登錄用戶', '用戶token') else: raise exceptions.AuthenticationFailed('用戶名或密碼錯誤') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ # return 'Basic realm=api' pass class Test2Authentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值為:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值為:None else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登錄用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestView(APIView): authentication_classes = [Test1Authentication, Test2Authentication] permission_classes = [] def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') views.py
d. 認證和權限

from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.permissions import BasePermission from rest_framework.request import Request from rest_framework import exceptions token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用戶認證,如果驗證成功后返回元組: (用戶,用戶Token) :param request: :return: None,表示跳過該驗證; 如果跳過了所有認證,默認用戶和Token和使用配置文件進行設置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() # 默認值為:匿名用戶 else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默認值為:None else: self.auth = None (user,token)表示驗證通過並設置用戶名和Token; AuthenticationFailed異常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用戶認證失敗") return ('登錄用戶', '用戶token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class TestPermission(BasePermission): message = "權限驗證失敗" def has_permission(self, request, view): """ 判斷是否有權限訪問當前請求 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :return: True有權限;False無權限 """ if request.user == "管理員": return True # GenericAPIView中get_object時調用 def has_object_permission(self, request, view, obj): """ 視圖繼承GenericAPIView,並在其中使用get_object時獲取對象時,觸發單獨對象權限驗證 Return `True` if permission is granted, `False` otherwise. :param request: :param view: :param obj: :return: True有權限;False無權限 """ if request.user == "管理員": return True class TestView(APIView): # 認證的動作是由request.user觸發 authentication_classes = [TestAuthentication, ] # 權限 # 循環執行所有的權限 permission_classes = [TestPermission, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
e. 全局使用
上述操作中均是對單獨視圖進行特殊配置,如果想要對全局進行配置,則需要再配置文件中寫入即可。

REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ "web.utils.TestAuthentication", ], "DEFAULT_PERMISSION_CLASSES": [ "web.utils.TestPermission", ], }

from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ] urls.py

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response class TestView(APIView): def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
g.自定義認證工作

1 # 2 class MyAuthtication(BasicAuthentication): 3 def authenticate(self, request): 4 token = request.query_params.get('token') #注意是沒有GET的,用query_params表示 5 if token == 'zxxzzxzc': 6 return ('uuuuuu','afsdsgdf') #返回user,auth 7 # raise AuthenticationFailed('認證錯誤') #只要拋出認證錯誤這樣的異常就會去執行下面的函數 8 raise APIException('認證錯誤') 9 def authenticate_header(self, request): #認證不成功的時候執行 10 return 'Basic reala="api"' 11 12 class UserView(APIView): 13 authentication_classes = [MyAuthtication,] 14 def get(self,request,*args,**kwargs): 15 print(request.user) 16 print(request.auth) 17 return Response('用戶列表')
二、權限
1、需求:Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none;User只有注冊用戶能訪問

1 from app03 import views 2 from django.conf.urls import url 3 urlpatterns = [ 4 # django rest framework 5 url('^auth/', views.AuthView.as_view()), 6 url(r'^hosts/', views.HostView.as_view()), 7 url(r'^users/', views.UsersView.as_view()), 8 url(r'^salary/', views.SalaryView.as_view()), 9 ]

1 from django.shortcuts import render 2 from rest_framework.views import APIView #繼承的view 3 from rest_framework.response import Response #友好的返回 4 from rest_framework.authentication import BaseAuthentication #認證的類 5 from rest_framework.authentication import BasicAuthentication 6 from app01 import models 7 from rest_framework import exceptions 8 from rest_framework.permissions import AllowAny #權限在這個類里面 9 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle 10 # Create your views here. 11 # +++++++++++++++認證類和權限類======================== 12 class MyAuthentication(BaseAuthentication): 13 def authenticate(self, request): 14 token = request.query_params.get('token') 15 obj = models.UserInfo.objects.filter(token=token).first() 16 if obj : #如果認證成功,返回用戶名和auth 17 return (obj.username,obj) 18 return None #如果沒有認證成功就不處理,進行下一步 19 20 def authenticate_header(self, request): 21 pass 22 23 class MyPermission(object): 24 message = '無權訪問' 25 def has_permission(self,request,view): #has_permission里面的self是view視圖對象 26 if request.user: 27 return True #如果不是匿名用戶就說明有權限 28 return False #否則無權限 29 30 class AdminPermission(object): 31 message = '無權訪問' 32 def has_permission(self, request, view): # has_permission里面的self是view視圖對象 33 if request.user=='haiyun': 34 return True # 返回True表示有權限 35 return False #返回False表示無權限 36 37 # +++++++++++++++++++++++++++ 38 class AuthView(APIView): 39 authentication_classes = [] #認證頁面不需要認證 40 41 def get(self,request): 42 self.dispatch 43 return '認證列表' 44 45 class HostView(APIView): 46 '''需求: 47 Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none 48 User只有注冊用戶能訪問 49 ''' 50 authentication_classes = [MyAuthentication,] 51 permission_classes = [] #都能訪問就沒必要設置權限了 52 def get(self,request): 53 print(request.user) 54 print(request.auth) 55 return Response('主機列表') 56 57 class UsersView(APIView): 58 '''用戶能訪問,request.user里面有值''' 59 authentication_classes = [MyAuthentication,] 60 permission_classes = [MyPermission,] 61 def get(self,request): 62 print(request.user,'111111111') 63 return Response('用戶列表') 64 65 def permission_denied(self, request, message=None): 66 """ 67 If request is not permitted, determine what kind of exception to raise. 68 """ 69 if request.authenticators and not request.successful_authenticator: 70 '''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了''' 71 raise exceptions.NotAuthenticated(detail='無權訪問') 72 raise exceptions.PermissionDenied(detail=message)

1 class SalaryView(APIView): 2 '''用戶能訪問''' 3 message ='無權訪問' 4 authentication_classes = [MyAuthentication,] #驗證是不是用戶 5 permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,如果有權限在判斷有沒有管理員的權限 6 def get(self,request): 7 return Response('薪資列表') 8 9 def permission_denied(self, request, message=None): 10 """ 11 If request is not permitted, determine what kind of exception to raise. 12 """ 13 if request.authenticators and not request.successful_authenticator: 14 '''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了''' 15 raise exceptions.NotAuthenticated(detail='無權訪問') 16 raise exceptions.PermissionDenied(detail=message)
如果遇上下面這樣的情況,是因為沒有通過認證,並且權限中return False了,可以自定制錯誤信息為中文,參考源碼
def check_permissions(self, request): """ Check if the request should be permitted. Raises an appropriate exception if the request is not permitted. """ for permission in self.get_permissions(): #循環每一個permission對象,調用has_permission #如果False,則拋出異常 #True 說明有權訪問 if not permission.has_permission(request, self): self.permission_denied( request, message=getattr(permission, 'message', None) )
def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: '''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了''' raise exceptions.NotAuthenticated() raise exceptions.PermissionDenied(detail=message)
那么我們可以重寫permission_denied這個方法,如下:

1 class UsersView(APIView): 2 '''用戶能訪問,request.user里面有值''' 3 authentication_classes = [MyAuthentication,] 4 permission_classes = [MyPermission,] 5 def get(self,request): 6 return Response('用戶列表') 7 8 def permission_denied(self, request, message=None): 9 """ 10 If request is not permitted, determine what kind of exception to raise. 11 """ 12 if request.authenticators and not request.successful_authenticator: 13 '''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了''' 14 raise exceptions.NotAuthenticated(detail='無權訪問') 15 raise exceptions.PermissionDenied(detail=message)
2. 全局使用
上述操作中均是對單獨視圖進行特殊配置,如果想要對全局進行配置,則需要再配置文件中寫入即可。

1 REST_FRAMEWORK = { 2 'UNAUTHENTICATED_USER': None, 3 'UNAUTHENTICATED_TOKEN': None, #將匿名用戶設置為None 4 "DEFAULT_AUTHENTICATION_CLASSES": [ 5 "app01.utils.MyAuthentication", 6 ], 7 'DEFAULT_PERMISSION_CLASSES':[ 8 "app03.utils.MyPermission",#設置路徑, 9 ] 10 }

1 class AuthView(APIView): 2 authentication_classes = [] #認證頁面不需要認證 3 4 def get(self,request): 5 self.dispatch 6 return '認證列表' 7 8 class HostView(APIView): 9 '''需求: 10 Host是匿名用戶和用戶都能訪問 #匿名用戶的request.user = none 11 User只有注冊用戶能訪問 12 ''' 13 authentication_classes = [MyAuthentication,] 14 permission_classes = [] #都能訪問就沒必要設置權限了 15 def get(self,request): 16 print(request.user) 17 print(request.auth) 18 return Response('主機列表') 19 20 class UsersView(APIView): 21 '''用戶能訪問,request.user里面有值''' 22 authentication_classes = [MyAuthentication,] 23 permission_classes = [MyPermission,] 24 def get(self,request): 25 print(request.user,'111111111') 26 return Response('用戶列表') 27 28 def permission_denied(self, request, message=None): 29 """ 30 If request is not permitted, determine what kind of exception to raise. 31 """ 32 if request.authenticators and not request.successful_authenticator: 33 '''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了''' 34 raise exceptions.NotAuthenticated(detail='無權訪問') 35 raise exceptions.PermissionDenied(detail=message) 36 37 38 class SalaryView(APIView): 39 '''用戶能訪問''' 40 message ='無權訪問' 41 authentication_classes = [MyAuthentication,] #驗證是不是用戶 42 permission_classes = [MyPermission,AdminPermission,] #再看用戶有沒有權限,如果有權限在判斷有沒有管理員的權限 43 def get(self,request): 44 return Response('薪資列表') 45 46 def permission_denied(self, request, message=None): 47 """ 48 If request is not permitted, determine what kind of exception to raise. 49 """ 50 if request.authenticators and not request.successful_authenticator: 51 '''如果沒有通過認證,並且權限中return False了,就會報下面的這個異常了''' 52 raise exceptions.NotAuthenticated(detail='無權訪問') 53 raise exceptions.PermissionDenied(detail=message)
三、用戶訪問次數/頻率限制
1、為什么要限流呢
答:
- - 第一點:爬蟲,反爬
- - 第二點:控制 API 訪問次數
- - 登錄用戶的用戶名可以做標識
- 匿名用戶可以參考 ip,但是 ip可以加代理。
2、限制訪問頻率源碼分析

1 self.check_throttles(request)

1 def check_throttles(self, request): 2 """ 3 Check if request should be throttled. 4 Raises an appropriate exception if the request is throttled. 5 """ 6 for throttle in self.get_throttles(): 7 #循環每一個throttle對象,執行allow_request方法 8 # allow_request: 9 #返回False,說明限制訪問頻率 10 #返回True,說明不限制,通行 11 if not throttle.allow_request(request, self): 12 self.throttled(request, throttle.wait()) 13 #throttle.wait()表示還要等多少秒就能訪問了

1 def get_throttles(self): 2 """ 3 Instantiates and returns the list of throttles that this view uses. 4 """ 5 #返回對象 6 return [throttle() for throttle in self.throttle_classes]

1 throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES

1 class BaseThrottle(object): 2 """ 3 Rate throttling of requests. 4 """ 5 6 def allow_request(self, request, view): 7 """ 8 Return `True` if the request should be allowed, `False` otherwise. 9 """ 10 raise NotImplementedError('.allow_request() must be overridden') 11 12 def get_ident(self, request): 13 """ 14 Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR 15 if present and number of proxies is > 0. If not use all of 16 HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. 17 """ 18 xff = request.META.get('HTTP_X_FORWARDED_FOR') 19 remote_addr = request.META.get('REMOTE_ADDR') 20 num_proxies = api_settings.NUM_PROXIES 21 22 if num_proxies is not None: 23 if num_proxies == 0 or xff is None: 24 return remote_addr 25 addrs = xff.split(',') 26 client_addr = addrs[-min(num_proxies, len(addrs))] 27 return client_addr.strip() 28 29 return ''.join(xff.split()) if xff else remote_addr 30 31 def wait(self): 32 """ 33 Optionally, return a recommended number of seconds to wait before 34 the next request. 35 """ 36 return None


1 def throttled(self, request, wait): 2 """ 3 If request is throttled, determine what kind of exception to raise. 4 """ 5 raise exceptions.Throttled(wait)

1 class Throttled(APIException): 2 status_code = status.HTTP_429_TOO_MANY_REQUESTS 3 default_detail = _('Request was throttled.') 4 extra_detail_singular = 'Expected available in {wait} second.' 5 extra_detail_plural = 'Expected available in {wait} seconds.' 6 default_code = 'throttled' 7 8 def __init__(self, wait=None, detail=None, code=None): 9 if detail is None: 10 detail = force_text(self.default_detail) 11 if wait is not None: 12 wait = math.ceil(wait) 13 detail = ' '.join(( 14 detail, 15 force_text(ungettext(self.extra_detail_singular.format(wait=wait), 16 self.extra_detail_plural.format(wait=wait), 17 wait)))) 18 self.wait = wait 19 super(Throttled, self).__init__(detail, code)
3.方法
a. 基於用戶IP限制訪問頻率

from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- import time from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import BaseThrottle from rest_framework.settings import api_settings # 保存訪問記錄 RECORD = { '用戶IP': [12312139, 12312135, 12312133, ] } class TestThrottle(BaseThrottle): ctime = time.time def get_ident(self, request): """ 根據用戶IP和代理IP,當做請求者的唯一IP Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR if present and number of proxies is > 0. If not use all of HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. """ xff = request.META.get('HTTP_X_FORWARDED_FOR') remote_addr = request.META.get('REMOTE_ADDR') num_proxies = api_settings.NUM_PROXIES if num_proxies is not None: if num_proxies == 0 or xff is None: return remote_addr addrs = xff.split(',') client_addr = addrs[-min(num_proxies, len(addrs))] return client_addr.strip() return ''.join(xff.split()) if xff else remote_addr def allow_request(self, request, view): """ 是否仍然在允許范圍內 Return `True` if the request should be allowed, `False` otherwise. :param request: :param view: :return: True,表示可以通過;False表示已超過限制,不允許訪問 """ # 獲取用戶唯一標識(如:IP) # 允許一分鍾訪問10次 num_request = 10 time_request = 60 now = self.ctime() ident = self.get_ident(request) self.ident = ident if ident not in RECORD: RECORD[ident] = [now, ] return True history = RECORD[ident] while history and history[-1] <= now - time_request: history.pop() if len(history) < num_request: history.insert(0, now) return True def wait(self): """ 多少秒后可以允許繼續訪問 Optionally, return a recommended number of seconds to wait before the next request. """ last_time = RECORD[self.ident][0] now = self.ctime() return int(60 + last_time - now) class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定制錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒之后再重試.' extra_detail_plural = '請 {wait} 秒之后再重試.' raise Throttled(wait)
b. 基於用戶IP顯示訪問頻率(利於Django緩存)

REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'test_scope': '10/m', }, }

from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import SimpleRateThrottle class TestThrottle(SimpleRateThrottle): # 配置文件定義的顯示頻率的Key scope = "test_scope" def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定制錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒之后再重試.' extra_detail_plural = '請 {wait} 秒之后再重試.' raise Throttled(wait)
c. view中限制請求頻率

REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'xxxxxx': '10/m', }, }

from django.conf.urls import url, include from web.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import exceptions from rest_framework.throttling import ScopedRateThrottle # 繼承 ScopedRateThrottle class TestThrottle(ScopedRateThrottle): def get_cache_key(self, request, view): """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """ if not request.user: ident = self.get_ident(request) else: ident = request.user return self.cache_format % { 'scope': self.scope, 'ident': ident } class TestView(APIView): throttle_classes = [TestThrottle, ] # 在settings中獲取 xxxxxx 對應的頻率限制值 throttle_scope = "xxxxxx" def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容') def throttled(self, request, wait): """ 訪問次數被限制時,定制錯誤信息 """ class Throttled(exceptions.Throttled): default_detail = '請求被限制.' extra_detail_singular = '請 {wait} 秒之后再重試.' extra_detail_plural = '請 {wait} 秒之后再重試.' raise Throttled(wait)
d. 匿名時用IP限制+登錄時用Token限制

REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': { 'luffy_anon': '10/m', 'luffy_user': '20/m', }, }

from django.conf.urls import url, include from web.views.s3_throttling import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]

#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle): """ 匿名用戶,根據IP進行限制 """ scope = "luffy_anon" def get_cache_key(self, request, view): # 用戶已登錄,則跳過 匿名頻率限制 if request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class LuffyUserRateThrottle(SimpleRateThrottle): """ 登錄用戶,根據用戶token限制 """ scope = "luffy_user" def get_ident(self, request): """ 認證成功時:request.user是用戶對象;request.auth是token對象 :param request: :return: """ # return request.auth.token return "user_token" def get_cache_key(self, request, view): """ 獲取緩存key :param request: :param view: :return: """ # 未登錄用戶,則跳過 Token限制 if not request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class TestView(APIView): throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET請求,響應內容') def post(self, request, *args, **kwargs): return Response('POST請求,響應內容') def put(self, request, *args, **kwargs): return Response('PUT請求,響應內容')
e. 全局使用

REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES': [ 'api.utils.throttles.throttles.LuffyAnonRateThrottle', 'api.utils.throttles.throttles.LuffyUserRateThrottle', ], 'DEFAULT_THROTTLE_RATES': { 'anon': '10/day', 'user': '10/day', 'luffy_anon': '10/m', 'luffy_user': '20/m', }, }
下面來看看最簡單的從源碼中分析的示例,這只是舉例說明了一下

1 from django.conf.urls import url 2 from app04 import views 3 urlpatterns = [ 4 url('limit/',views.LimitView.as_view()), 5 6 ]

1 from django.shortcuts import render 2 from rest_framework.views import APIView 3 from rest_framework.response import Response 4 from rest_framework import exceptions 5 # from rest_framewor import 6 # Create your views here. 7 class MyThrottle(object): 8 def allow_request(self,request,view): 9 #返回False,限制 10 #返回True,不限制 11 pass 12 def wait(self): 13 return 1000 14 15 16 class LimitView(APIView): 17 authentication_classes = [] #不讓認證用戶 18 permission_classes = [] #不讓驗證權限 19 throttle_classes = [MyThrottle, ] 20 def get(self,request): 21 # self.dispatch 22 return Response('控制訪問頻率示例') 23 24 def throttled(self, request, wait): 25 '''可定制方法設置中文錯誤''' 26 # raise exceptions.Throttled(wait) 27 class MyThrottle(exceptions.Throttled): 28 default_detail = '請求被限制' 29 extra_detail_singular = 'Expected available in {wait} second.' 30 extra_detail_plural = 'Expected available in {wait} seconds.' 31 default_code = '還需要再等{wait}秒' 32 raise MyThrottle(wait)
需求:對匿名用戶進行限制,每個用戶一分鍾允許訪問10次(只針對用戶來說)
a、基於用戶IP限制訪問頻率
流程分析:
- 先獲取用戶信息,如果是匿名用戶,獲取IP。如果不是匿名用戶就可以獲取用戶名。
- 獲取匿名用戶IP,在request里面獲取,比如IP= 1.1.1.1。
- 吧獲取到的IP添加到到recode字典里面,需要在添加之前先限制一下。
- 如果時間間隔大於60秒,說明時間久遠了,就把那個時間給剔除 了pop。在timelist列表里面現在留的是有效的訪問時間段。
- 然后判斷他的訪問次數超過了10次沒有,如果超過了時間就return False。
- 美中不足的是時間是固定的,我們改變他為動態的:列表里面最開始進來的時間和當前的時間進行比較,看需要等多久。
具體實現:

1 from django.shortcuts import render 2 from rest_framework.views import APIView 3 from rest_framework.response import Response 4 from rest_framework import exceptions 5 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率 6 import time 7 # Create your views here. 8 RECORD = {} 9 class MyThrottle(BaseThrottle): 10 11 def allow_request(self,request,view): 12 '''對匿名用戶進行限制,每個用戶一分鍾訪問10次 ''' 13 ctime = time.time() 14 ip = '1.1.1.1' 15 if ip not in RECORD: 16 RECORD[ip] = [ctime] 17 else: 18 #[152042123,15204212,3152042,123152042123] 19 time_list = RECORD[ip] #獲取ip里面的值 20 while True: 21 val = time_list[-1]#取出最后一個時間,也就是訪問最早的時間 22 if (ctime-60)>val: #吧時間大於60秒的給剔除了 23 time_list.pop() 24 #剔除了之后timelist里面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次 25 else: 26 break 27 if len(time_list) >10: 28 return False # 返回False,限制 29 time_list.insert(0, ctime) 30 return True #返回True,不限制 31 32 def wait(self): 33 ctime = time.time() 34 first_in_time = RECORD['1.1.1.1'][-1] 35 wt = 60-(ctime-first_in_time) 36 return wt 37 38 39 class LimitView(APIView): 40 authentication_classes = [] #不讓認證用戶 41 permission_classes = [] #不讓驗證權限 42 throttle_classes = [MyThrottle, ] 43 def get(self,request): 44 # self.dispatch 45 return Response('控制訪問頻率示例') 46 47 def throttled(self, request, wait): 48 '''可定制方法設置中文錯誤''' 49 # raise exceptions.Throttled(wait) 50 class MyThrottle(exceptions.Throttled): 51 default_detail = '請求被限制' 52 extra_detail_singular = 'Expected available in {wait} second.' 53 extra_detail_plural = 'Expected available in {wait} seconds.' 54 default_code = '還需要再等{wait}秒' 55 raise MyThrottle(wait)

1 # from django.shortcuts import render 2 # from rest_framework.views import APIView 3 # from rest_framework.response import Response 4 # from rest_framework import exceptions 5 # from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率 6 # import time 7 # # Create your views here. 8 # RECORD = {} 9 # class MyThrottle(BaseThrottle): 10 # 11 # def allow_request(self,request,view): 12 # '''對匿名用戶進行限制,每個用戶一分鍾訪問10次 ''' 13 # ctime = time.time() 14 # ip = '1.1.1.1' 15 # if ip not in RECORD: 16 # RECORD[ip] = [ctime] 17 # else: 18 # #[152042123,15204212,3152042,123152042123] 19 # time_list = RECORD[ip] #獲取ip里面的值 20 # while True: 21 # val = time_list[-1]#取出最后一個時間,也就是訪問最早的時間 22 # if (ctime-60)>val: #吧時間大於60秒的給剔除了 23 # time_list.pop() 24 # #剔除了之后timelist里面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次 25 # else: 26 # break 27 # if len(time_list) >10: 28 # return False # 返回False,限制 29 # time_list.insert(0, ctime) 30 # return True #返回True,不限制 31 # 32 # def wait(self): 33 # ctime = time.time() 34 # first_in_time = RECORD['1.1.1.1'][-1] 35 # wt = 60-(ctime-first_in_time) 36 # return wt 37 # 38 # 39 # class LimitView(APIView): 40 # authentication_classes = [] #不讓認證用戶 41 # permission_classes = [] #不讓驗證權限 42 # throttle_classes = [MyThrottle, ] 43 # def get(self,request): 44 # # self.dispatch 45 # return Response('控制訪問頻率示例') 46 # 47 # def throttled(self, request, wait): 48 # '''可定制方法設置中文錯誤''' 49 # # raise exceptions.Throttled(wait) 50 # class MyThrottle(exceptions.Throttled): 51 # default_detail = '請求被限制' 52 # extra_detail_singular = 'Expected available in {wait} second.' 53 # extra_detail_plural = 'Expected available in {wait} seconds.' 54 # default_code = '還需要再等{wait}秒' 55 # raise MyThrottle(wait) 56 57 58 59 from django.shortcuts import render 60 from rest_framework.views import APIView 61 from rest_framework.response import Response 62 from rest_framework import exceptions 63 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率 64 import time 65 # Create your views here. 66 RECORD = {} 67 class MyThrottle(BaseThrottle): 68 69 def allow_request(self,request,view): 70 '''對匿名用戶進行限制,每個用戶一分鍾訪問10次 ''' 71 ctime = time.time() 72 self.ip =self.get_ident(request) 73 if self.ip not in RECORD: 74 RECORD[self.ip] = [ctime] 75 else: 76 #[152042123,15204212,3152042,123152042123] 77 time_list = RECORD[self.ip] #獲取ip里面的值 78 while True: 79 val = time_list[-1]#取出最后一個時間,也就是訪問最早的時間 80 if (ctime-60)>val: #吧時間大於60秒的給剔除了 81 time_list.pop() 82 #剔除了之后timelist里面就是有效的時間了,在進行判斷他的訪問次數是不是超過10次 83 else: 84 break 85 if len(time_list) >10: 86 return False # 返回False,限制 87 time_list.insert(0, ctime) 88 return True #返回True,不限制 89 90 def wait(self): 91 ctime = time.time() 92 first_in_time = RECORD[self.ip][-1] 93 wt = 60-(ctime-first_in_time) 94 return wt 95 96 97 class LimitView(APIView): 98 authentication_classes = [] #不讓認證用戶 99 permission_classes = [] #不讓驗證權限 100 throttle_classes = [MyThrottle, ] 101 def get(self,request): 102 # self.dispatch 103 return Response('控制訪問頻率示例') 104 105 def throttled(self, request, wait): 106 '''可定制方法設置中文錯誤''' 107 # raise exceptions.Throttled(wait) 108 class MyThrottle(exceptions.Throttled): 109 default_detail = '請求被限制' 110 extra_detail_singular = 'Expected available in {wait} second.' 111 extra_detail_plural = 'Expected available in {wait} seconds.' 112 default_code = '還需要再等{wait}秒' 113 raise MyThrottle(wait)
b、用resetframework內部的限制訪問頻率(利於Django緩存)
源碼分析:
from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限制訪問頻率

1 class BaseThrottle(object): 2 """ 3 Rate throttling of requests. 4 """ 5 6 def allow_request(self, request, view): 7 """ 8 Return `True` if the request should be allowed, `False` otherwise. 9 """ 10 raise NotImplementedError('.allow_request() must be overridden') 11 12 def get_ident(self, request): #唯一標識 13 """ 14 Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR 15 if present and number of proxies is > 0. If not use all of 16 HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR. 17 """ 18 xff = request.META.get('HTTP_X_FORWARDED_FOR') 19 remote_addr = request.META.get('REMOTE_ADDR') #獲取IP等 20 num_proxies = api_settings.NUM_PROXIES 21 22 if num_proxies is not None: 23 if num_proxies == 0 or xff is None: 24 return remote_addr 25 addrs = xff.split(',') 26 client_addr = addrs[-min(num_proxies, len(addrs))] 27 return client_addr.strip() 28 29 return ''.join(xff.split()) if xff else remote_addr 30 31 def wait(self): 32 """ 33 Optionally, return a recommended number of seconds to wait before 34 the next request. 35 """ 36 return None

1 class SimpleRateThrottle(BaseThrottle): 2 """ 3 一個簡單的緩存實現,只需要` get_cache_key() `。被覆蓋。 4 速率(請求/秒)是由視圖上的“速率”屬性設置的。類。該屬性是一個字符串的形式number_of_requests /期。 5 周期應該是:(的),“秒”,“M”,“min”,“h”,“小時”,“D”,“一天”。 6 以前用於節流的請求信息存儲在高速緩存中。 7 A simple cache implementation, that only requires `.get_cache_key()` 8 to be overridden. 9 10 The rate (requests / seconds) is set by a `rate` attribute on the View 11 class. The attribute is a string of the form 'number_of_requests/period'. 12 13 Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day') 14 15 Previous request information used for throttling is stored in the cache. 16 """ 17 cache = default_cache 18 timer = time.time 19 cache_format = 'throttle_%(scope)s_%(ident)s' 20 scope = None 21 THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES 22 23 def __init__(self): 24 if not getattr(self, 'rate', None): 25 self.rate = self.get_rate() 26 self.num_requests, self.duration = self.parse_rate(self.rate) 27 28 def get_cache_key(self, request, view):#這個相當於是一個半成品,我們可以來補充它 29 """ 30 Should return a unique cache-key which can be used for throttling. 31 Must be overridden. 32 33 May return `None` if the request should not be throttled. 34 """ 35 raise NotImplementedError('.get_cache_key() must be overridden') 36 37 def get_rate(self): 38 """ 39 Determine the string representation of the allowed request rate. 40 """ 41 if not getattr(self, 'scope', None): 42 msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % 43 self.__class__.__name__) 44 raise ImproperlyConfigured(msg) 45 46 try: 47 return self.THROTTLE_RATES[self.scope] 48 except KeyError: 49 msg = "No default throttle rate set for '%s' scope" % self.scope 50 raise ImproperlyConfigured(msg) 51 52 def parse_rate(self, rate): 53 """ 54 Given the request rate string, return a two tuple of: 55 <allowed number of requests>, <period of time in seconds> 56 """ 57 if rate is None: 58 return (None, None) 59 num, period = rate.split('/') 60 num_requests = int(num) 61 duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] 62 return (num_requests, duration) 63 64 #1、一進來會先執行他, 65 def allow_request(self, request, view): 66 """ 67 Implement the check to see if the request should be throttled. 68 69 On success calls `throttle_success`. 70 On failure calls `throttle_failure`. 71 """ 72 if self.rate is None: 73 return True 74 75 self.key = self.get_cache_key(request, view) #2、執行get_cache_key,這里的self.key就相當於我們舉例ip 76 if self.key is None: 77 return True 78 79 self.history = self.cache.get(self.key, []) #3、得到的key,默認是一個列表,賦值給了self.history, 80 # 這時候self.history就是每一個ip對應的訪問記錄 81 self.now = self.timer() 82 83 # Drop any requests from the history which have now passed the 84 # throttle duration 85 while self.history and self.history[-1] <= self.now - self.duration: 86 self.history.pop() 87 if len(self.history) >= self.num_requests: 88 return self.throttle_failure() 89 return self.throttle_success() 90 91 def throttle_success(self): 92 """ 93 Inserts the current request's timestamp along with the key 94 into the cache. 95 """ 96 self.history.insert(0, self.now) 97 self.cache.set(self.key, self.history, self.duration) 98 return True 99 100 def throttle_failure(self): 101 """ 102 Called when a request to the API has failed due to throttling. 103 """ 104 return False 105 106 def wait(self): 107 """ 108 Returns the recommended next request time in seconds. 109 """ 110 if self.history: 111 remaining_duration = self.duration - (self.now - self.history[-1]) 112 else: 113 remaining_duration = self.duration 114 115 available_requests = self.num_requests - len(self.history) + 1 116 if available_requests <= 0: 117 return None 118 119 return remaining_duration / float(available_requests)
請求一進來會先執行SimpleRateThrottle這個類的構造方法

1 def __init__(self): 2 if not getattr(self, 'rate', None): 3 self.rate = self.get_rate() #點進去看到需要些一個scope ,2/m 4 self.num_requests, self.duration = self.parse_rate(self.rate)

1 def get_rate(self): 2 """ 3 Determine the string representation of the allowed request rate. 4 """ 5 if not getattr(self, 'scope', None): #檢測必須有scope,沒有就報錯了 6 msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % 7 self.__class__.__name__) 8 raise ImproperlyConfigured(msg) 9 10 try: 11 return self.THROTTLE_RATES[self.scope] 12 except KeyError: 13 msg = "No default throttle rate set for '%s' scope" % self.scope 14 raise ImproperlyConfigured(msg)

1 def parse_rate(self, rate): 2 """ 3 Given the request rate string, return a two tuple of: 4 <allowed number of requests>, <period of time in seconds> 5 """ 6 if rate is None: 7 return (None, None) 8 num, period = rate.split('/') 9 num_requests = int(num) 10 duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] 11 return (num_requests, duration)

1 #2、接下來會先執行他, 2 def allow_request(self, request, view): 3 """ 4 Implement the check to see if the request should be throttled. 5 6 On success calls `throttle_success`. 7 On failure calls `throttle_failure`. 8 """ 9 if self.rate is None: 10 return True 11 12 self.key = self.get_cache_key(request, view) #2、執行get_cache_key,這里的self.key就相當於我們舉例ip 13 if self.key is None: 14 return True #不限制 15 # [114521212,11452121211,45212121145,21212114,521212] 16 self.history = self.cache.get(self.key, []) #3、得到的key,默認是一個列表,賦值給了self.history, 17 # 這時候self.history就是每一個ip對應的訪問記錄 18 self.now = self.timer() 19 20 # Drop any requests from the history which have now passed the 21 # throttle duration 22 while self.history and self.history[-1] <= self.now - self.duration: 23 self.history.pop() 24 if len(self.history) >= self.num_requests: 25 return self.throttle_failure() 26 return self.throttle_success()

1 def wait(self): 2 """ 3 Returns the recommended next request time in seconds. 4 """ 5 if self.history: 6 remaining_duration = self.duration - (self.now - self.history[-1]) 7 else: 8 remaining_duration = self.duration 9 10 available_requests = self.num_requests - len(self.history) + 1 11 if available_requests <= 0: 12 return None 13 14 return remaining_duration / float(available_requests)
代碼實現:

1 ###########用resetframework內部的限制訪問頻率############## 2 class MySimpleRateThrottle(SimpleRateThrottle): 3 scope = 'xxx' 4 def get_cache_key(self, request, view): 5 return self.get_ident(request) #返回唯一標識IP 6 7 class LimitView(APIView): 8 authentication_classes = [] #不讓認證用戶 9 permission_classes = [] #不讓驗證權限 10 throttle_classes = [MySimpleRateThrottle, ] 11 def get(self,request): 12 # self.dispatch 13 return Response('控制訪問頻率示例') 14 15 def throttled(self, request, wait): 16 '''可定制方法設置中文錯誤''' 17 # raise exceptions.Throttled(wait) 18 class MyThrottle(exceptions.Throttled): 19 default_detail = '請求被限制' 20 extra_detail_singular = 'Expected available in {wait} second.' 21 extra_detail_plural = 'Expected available in {wait} seconds.' 22 default_code = '還需要再等{wait}秒' 23 raise MyThrottle(wait)
記得在settings里面配置

1 REST_FRAMEWORK = { 2 'UNAUTHENTICATED_USER': None, 3 'UNAUTHENTICATED_TOKEN': None, #將匿名用戶設置為None 4 "DEFAULT_AUTHENTICATION_CLASSES": [ 5 "app01.utils.MyAuthentication", 6 ], 7 'DEFAULT_PERMISSION_CLASSES':[ 8 # "app03.utils.MyPermission",#設置路徑, 9 ], 10 'DEFAULT_THROTTLE_RATES':{ 11 'xxx':'2/minute' #2分鍾 12 } 13 } 14 15 #緩存:放在文件 16 CACHES = { 17 'default': { 18 'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache', 19 'LOCATION': 'cache', #文件路徑 20 } 21 }
對匿名用戶進行限制,每個用戶1分鍾允許訪問5次,對於登錄的普通用戶1分鍾訪問10次,VIP用戶一分鍾訪問20次
- 比如首頁可以匿名訪問
- #先認證,只有認證了才知道是不是匿名的,
- #權限登錄成功之后才能訪問, ,index頁面就不需要權限了
- If request.user #判斷登錄了沒有

1 from django.contrib import admin 2 3 from django.conf.urls import url, include 4 from app05 import views 5 6 urlpatterns = [ 7 url('index/',views.IndexView.as_view()), 8 url('manage/',views.ManageView.as_view()), 9 ]

1 from django.shortcuts import render 2 from rest_framework.views import APIView 3 from rest_framework.response import Response 4 from rest_framework.authentication import BaseAuthentication #認證需要 5 from rest_framework.throttling import BaseThrottle,SimpleRateThrottle #限流處理 6 from rest_framework.permissions import BasePermission 7 from rest_framework import exceptions 8 from app01 import models 9 # Create your views here. 10 ###############3##認證##################### 11 class MyAuthentcate(BaseAuthentication): 12 '''檢查用戶是否存在,如果存在就返回user和auth,如果沒有就返回''' 13 def authenticate(self, request): 14 token = request.query_params.get('token') 15 obj = models.UserInfo.objects.filter(token=token).first() 16 if obj: 17 return (obj.username,obj.token) 18 return None #表示我不處理 19 20 ##################權限##################### 21 class MyPermission(BasePermission): 22 message='無權訪問' 23 def has_permission(self, request, view): 24 if request.user: 25 return True #true表示有權限 26 return False #false表示無權限 27 28 class AdminPermission(BasePermission): 29 message = '無權訪問' 30 31 def has_permission(self, request, view): 32 if request.user=='haiyan': 33 return True # true表示有權限 34 return False # false表示無權限 35 36 ############3#####限流##################3## 37 class AnonThrottle(SimpleRateThrottle): 38 scope = 'wdp_anon' #相當於設置了最大的訪問次數和時間 39 def get_cache_key(self, request, view): 40 if request.user: 41 return None #返回None表示我不限制,登錄用戶我不管 42 #匿名用戶 43 return self.get_ident(request) #返回一個唯一標識IP 44 45 class UserThrottle(SimpleRateThrottle): 46 scope = 'wdp_user' 47 def get_cache_key(self, request, view): 48 #登錄用戶 49 if request.user: 50 return request.user 51 return None #返回NOne表示匿名用戶我不管 52 53 54 ##################視圖##################### 55 #首頁支持匿名訪問, 56 #無需要登錄就可以訪問 57 class IndexView(APIView): 58 authentication_classes = [MyAuthentcate,] #認證判斷他是不是匿名用戶 59 permission_classes = [] #一般主頁就不需要權限驗證了 60 throttle_classes = [AnonThrottle,UserThrottle,] #對匿名用戶和普通用戶的訪問限制 61 62 def get(self,request): 63 # self.dispatch 64 return Response('訪問首頁') 65 66 def throttled(self, request, wait): 67 '''可定制方法設置中文錯誤''' 68 69 # raise exceptions.Throttled(wait) 70 class MyThrottle(exceptions.Throttled): 71 default_detail = '請求被限制' 72 extra_detail_singular = 'Expected available in {wait} second.' 73 extra_detail_plural = 'Expected available in {wait} seconds.' 74 default_code = '還需要再等{wait}秒' 75 76 raise MyThrottle(wait) 77 78 #需登錄就可以訪問 79 class ManageView(APIView): 80 authentication_classes = [MyAuthentcate, ] # 認證判斷他是不是匿名用戶 81 permission_classes = [MyPermission,] # 一般主頁就不需要權限驗證了 82 throttle_classes = [AnonThrottle, UserThrottle, ] # 對匿名用戶和普通用戶的訪問限制 83 84 def get(self, request): 85 # self.dispatch 86 return Response('管理人員訪問頁面') 87 88 def throttled(self, request, wait): 89 '''可定制方法設置中文錯誤''' 90 91 # raise exceptions.Throttled(wait) 92 class MyThrottle(exceptions.Throttled): 93 default_detail = '請求被限制' 94 extra_detail_singular = 'Expected available in {wait} second.' 95 extra_detail_plural = 'Expected available in {wait} seconds.' 96 default_code = '還需要再等{wait}秒' 97 98 raise MyThrottle(wait)
四、總結
1、認證:就是檢查用戶是否存在;如果存在返回(request.user,request.auth);不存在request.user/request.auth=None
2、權限:進行職責的划分
3、限制訪問頻率
認證 - 類:authenticate/authenticate_header ##驗證不成功的時候執行的 - 返回值: - return None, - return (user,auth), - raise 異常 - 配置: - 視圖: class IndexView(APIView): authentication_classes = [MyAuthentication,] - 全局: REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, "DEFAULT_AUTHENTICATION_CLASSES": [ # "app02.utils.MyAuthentication", ], } 權限 - 類:has_permission/has_object_permission - 返回值: - True、#有權限 - False、#無權限 - exceptions.PermissionDenied(detail="錯誤信息") #異常自己隨意,想拋就拋,錯誤信息自己指定 - 配置: - 視圖: class IndexView(APIView): permission_classes = [MyPermission,] - 全局: REST_FRAMEWORK = { "DEFAULT_PERMISSION_CLASSES": [ # "app02.utils.MyAuthentication", ], } 限流 - 類:allow_request/wait PS: scope = "wdp_user" - 返回值:
return True、#不限制
return False #限制 - 配置: - 視圖: class IndexView(APIView): throttle_classes=[AnonThrottle,UserThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('訪問首頁') - 全局 REST_FRAMEWORK = { "DEFAULT_THROTTLE_CLASSES":[ ], 'DEFAULT_THROTTLE_RATES':{ 'wdp_anon':'5/minute', 'wdp_user':'10/minute', } }