本文主要介紹django restframework 用戶認證部分的內容
- 環境配置
- 基於 token 認證
- JWT 認證
1、環境配置
pip install django==2.0
pip install djangorestframework==3.10.0
pip install pymysql==1.0.2
2、基於token認證(非全局配置)
2.1 數據庫模型設計:models.py
from django.db import models
from werkzeug.security import generate_password_hash, check_password_hash
class User(models.Model):
db_table = 'adminuser_user'
id = models.AutoField(primary_key=True)
username = models.CharField(max_length=30)
password_hash = models.TextField()
root = models.IntegerField()
email = models.EmailField(max_length=30, default='')
mobile = models.CharField(max_length=11, default='')
status = models.IntegerField(default=0)
@property
def password(self):
raise AttributeError('Can not read password!')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
2.2 同步數據庫:數據同步好了之后添加幾條用戶信息
2.3 路由:urls.py
from django.urls import path,re_path
from . import views, loginView
urlpatterns = [
# 登錄接口
path('admin/login', loginView.LoginView.as_view()),
path('admin/users', views.UserView.as_view()),
]
app_name = 'Admin'
2.4 視圖:views.py
-------------------------------------------loginView.py-------------------------------------------
from .models import User
import datetime
from rest_framework.response import Response
from rest_framework.views import APIView
import jwt
from PetHome import settings
class LoginView(APIView):
def post(self, request):
ret = {
"data": {},
"meta": {
"status": 200,
"message": ""
}
}
try:
username = request.data["username"]
password = request.data["password"]
print(username, password)
if username and password:
user = User.objects.filter(username=username)
print(2)
if user.count == 0:
print(3)
ret["meta"]["status"] = 500
ret["meta"]["message"] = "用戶不存在或密碼錯誤"
return Response(ret)
elif user and user.first().verify_password(password):
print(4)
dict = {
"exp": datetime.datetime.now() + datetime.timedelta(days=1), # 過期時間
"iat": datetime.datetime.now(), # 開始時間
"id": user.first().id,
"username": user.first().username,
"mobile": user.first().mobile
}
token = jwt.encode(dict, settings.SECRET_KEY, algorithm="HS256")
ret["data"]["token"] = token
ret["data"]["username"] = user.first().username
ret["data"]["user_id"] = user.first().id
ret["meta"]["status"] = 200
ret["meta"]["message"] = "登錄成功"
return Response(ret)
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "用戶不存在或密碼錯誤"
return Response(ret)
except Exception as error:
print(error)
ret["meta"]["status"] = 500
ret["meta"]["message"] = "用戶不存在或密碼錯誤"
return Response(ret)
-------------------------------------------views.py-------------------------------------------
from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User
class UserView(APIView):
authentication_classes = [TokenAuthtication, ]
def get(self, request, *args, **kwargs):
"""
獲取用戶列表
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get('pk') # 獲取單挑數據 id 值
query = request.GET.get('query')
print(query)
ret = {
"data": {
"users": []
},
"meta": {
"status": 200,
"message": ""
}
}
page = LargeResultsSetPagination()
if not pk:
if not query:
queryset = User.objects.all().order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
else:
queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
return page.get_paginated_response(ret)
else:
if User.objects.filter(pk=pk):
obj_dict = User.objects.filter(pk=pk).first()
ser = UserSerializer(instance=obj_dict, many=False)
ret["data"]["users"] = ser.data
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "系統報錯"
return Response(ret)
-------------------------------------------TokenAuthtication.py-------------------------------------------
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
import time
from PetHome import settings
import jwt
from .models import User
class TokenAuthtication(BaseAuthentication):
def authenticate(self, request):
"""
先獲取 header 中的token進行解析,在進行校驗
- 獲取token,檢查過期時間是否大於當前時間
:param request:
:return:
"""
try:
# header 中的token 在 request.MEAT.get("HTTP_TOKEN")獲取
token = request.META.get("HTTP_TOKEN")
print("檢查token:" + token)
data = jwt.decode(token, settings.SECRET_KEY, 'HS256')
print(data)
if data["exp"] < int(time.time()):
raise exceptions.AuthenticationFailed("登錄超時,請重新登錄")
elif User.objects.filter(username=data["username"]).count == 0:
raise exceptions.AuthenticationFailed("認證用戶不存在")
except Exception as error:
print(error)
raise exceptions.AuthenticationFailed("用戶未登錄,請登錄")
2.5 測試
http://127.0.0.1:8089/admin/users
1、用戶未登錄時 (請求headers未攜帶token)
2、用戶已經登 (請求headers攜帶token)
3、全局配置
- setting.py添加配置
-------------------------------------------setting.py.py-------------------------------------------
# django restframework 配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': ['adminuser.TokenAuthtication.TokenAuthtication', ]
}
1、全局配置用戶認證后,所有的接口類都不需要添加 authentication_classes = [TokenAuthtication, ] 屬性,默認所有接口都會進行登錄校驗
------------------------------------------例子1:views.py-------------------------------------------
from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User
class UserView(APIView):
def get(self, request, *args, **kwargs):
"""
獲取用戶列表
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get('pk') # 獲取單挑數據 id 值
query = request.GET.get('query')
print(query)
ret = {
"data": {
"users": []
},
"meta": {
"status": 200,
"message": ""
}
}
page = LargeResultsSetPagination()
if not pk:
if not query:
queryset = User.objects.all().order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
else:
queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
return page.get_paginated_response(ret)
else:
if User.objects.filter(pk=pk):
obj_dict = User.objects.filter(pk=pk).first()
ser = UserSerializer(instance=obj_dict, many=False)
ret["data"]["users"] = ser.data
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "系統報錯"
return Response(ret)
2、為解決部分接口不需要登錄驗證問題,只需要在該類中設置 authentication_classes = [] ,[] 表示不需要認證
------------------------------------------例子2:views.py-------------------------------------------
from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User
class UserView(APIView):
authentication_classes = []
def get(self, request, *args, **kwargs):
"""
獲取用戶列表
:param request:
:param args:
:param kwargs:
:return:
"""
pk = kwargs.get('pk') # 獲取單挑數據 id 值
query = request.GET.get('query')
print(query)
ret = {
"data": {
"users": []
},
"meta": {
"status": 200,
"message": ""
}
}
page = LargeResultsSetPagination()
if not pk:
if not query:
queryset = User.objects.all().order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
else:
queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
ser = UserSerializer(instance=page_user, many=True)
ret["data"]["users"] = ser.data
return page.get_paginated_response(ret)
else:
if User.objects.filter(pk=pk):
obj_dict = User.objects.filter(pk=pk).first()
ser = UserSerializer(instance=obj_dict, many=False)
ret["data"]["users"] = ser.data
else:
ret["meta"]["status"] = 500
ret["meta"]["message"] = "系統報錯"
return Response(ret)
不傳 token 也能返回數據啦
參考文檔:
1、https://www.cnblogs.com/zhangqunshi/p/8432209.html
2、https://www.cnblogs.com/xingxingnbsp/articles/12597565.html