django restframework 基於token實現用戶認證詳解


本文主要介紹django restframework 用戶認證部分的內容

  • 環境配置
  • 基於 token 認證
  • JWT 認證

1、環境配置

pip install django==2.0

pip install djangorestframework==3.10.0

pip install pymysql==1.0.2

2、基於token認證(非全局配置)

2.1 數據庫模型設計:models.py

from django.db import models
from werkzeug.security import generate_password_hash, check_password_hash

class User(models.Model):
    db_table = 'adminuser_user'

    id = models.AutoField(primary_key=True)
    username = models.CharField(max_length=30)
    password_hash = models.TextField()
    root = models.IntegerField()
    email = models.EmailField(max_length=30, default='')
    mobile = models.CharField(max_length=11, default='')
    status = models.IntegerField(default=0)


    @property
    def password(self):
        raise AttributeError('Can not read password!')

    @password.setter
    def password(self, password):
        self.password_hash = generate_password_hash(password)

    def verify_password(self, password):
        return check_password_hash(self.password_hash, password)

2.2 同步數據庫:數據同步好了之后添加幾條用戶信息

image

2.3 路由:urls.py

from django.urls import path,re_path
from . import views, loginView

urlpatterns = [
    # 登錄接口
    path('admin/login', loginView.LoginView.as_view()),
    path('admin/users', views.UserView.as_view()),
]
app_name = 'Admin'

2.4 視圖:views.py

-------------------------------------------loginView.py-------------------------------------------
from .models import User
import datetime
from rest_framework.response import Response
from rest_framework.views import APIView
import jwt
from PetHome import settings


class LoginView(APIView):
    def post(self, request):
        ret = {
            "data": {},
            "meta": {
                "status": 200,
                "message": ""
            }
        }
        try:
            username = request.data["username"]
            password = request.data["password"]
            print(username, password)
            if username and password:
                user = User.objects.filter(username=username)
                print(2)
                if user.count == 0:
                    print(3)
                    ret["meta"]["status"] = 500
                    ret["meta"]["message"] = "用戶不存在或密碼錯誤"
                    return Response(ret)
                elif user and user.first().verify_password(password):
                    print(4)
                    dict = {
                        "exp": datetime.datetime.now() + datetime.timedelta(days=1),  # 過期時間
                        "iat": datetime.datetime.now(),  # 開始時間
                        "id": user.first().id,
                        "username": user.first().username,
                        "mobile": user.first().mobile
                    }
                    token = jwt.encode(dict, settings.SECRET_KEY, algorithm="HS256")
                    ret["data"]["token"] = token
                    ret["data"]["username"] = user.first().username
                    ret["data"]["user_id"] = user.first().id
                    ret["meta"]["status"] = 200
                    ret["meta"]["message"] = "登錄成功"
                    return Response(ret)
                else:
                    ret["meta"]["status"] = 500
                    ret["meta"]["message"] = "用戶不存在或密碼錯誤"
                    return Response(ret)
        except Exception as error:
            print(error)
            ret["meta"]["status"] = 500
            ret["meta"]["message"] = "用戶不存在或密碼錯誤"
            return Response(ret)

-------------------------------------------views.py-------------------------------------------

from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User

class UserView(APIView):
    authentication_classes = [TokenAuthtication, ]
    def get(self, request, *args, **kwargs):
        """
        獲取用戶列表
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        pk = kwargs.get('pk')  # 獲取單挑數據 id 值
        query = request.GET.get('query')
        print(query)
        ret = {
            "data": {
                "users": []
            },
            "meta": {
                "status": 200,
                "message": ""
            }
        }
        page = LargeResultsSetPagination()
        if not pk:
            if not query:
                queryset = User.objects.all().order_by("-id")
                page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                ser = UserSerializer(instance=page_user, many=True)
                ret["data"]["users"] = ser.data
            else:
                queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
                page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                ser = UserSerializer(instance=page_user, many=True)
                ret["data"]["users"] = ser.data
            return page.get_paginated_response(ret)
        else:
            if User.objects.filter(pk=pk):
                obj_dict = User.objects.filter(pk=pk).first()
                ser = UserSerializer(instance=obj_dict, many=False)
                ret["data"]["users"] = ser.data
            else:
                ret["meta"]["status"] = 500
                ret["meta"]["message"] = "系統報錯"
            return Response(ret)


-------------------------------------------TokenAuthtication.py-------------------------------------------

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
import time
from PetHome import settings
import jwt
from .models import User


class TokenAuthtication(BaseAuthentication):
    def authenticate(self, request):
        """
        先獲取 header 中的token進行解析,在進行校驗
            - 獲取token,檢查過期時間是否大於當前時間
        :param request:
        :return:
        """
        try:
            # header 中的token 在 request.MEAT.get("HTTP_TOKEN")獲取
            token = request.META.get("HTTP_TOKEN")
            print("檢查token:" + token)
            data = jwt.decode(token, settings.SECRET_KEY, 'HS256')
            print(data)
            if data["exp"] < int(time.time()):
                raise exceptions.AuthenticationFailed("登錄超時,請重新登錄")
            elif User.objects.filter(username=data["username"]).count == 0:
                raise exceptions.AuthenticationFailed("認證用戶不存在")

        except Exception as error:
            print(error)
            raise exceptions.AuthenticationFailed("用戶未登錄,請登錄")

2.5 測試

http://127.0.0.1:8089/admin/login

image

http://127.0.0.1:8089/admin/users

1、用戶未登錄時 (請求headers未攜帶token)
image
2、用戶已經登 (請求headers攜帶token)
image

3、全局配置

  • setting.py添加配置
    -------------------------------------------setting.py.py-------------------------------------------
# django restframework 配置
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': ['adminuser.TokenAuthtication.TokenAuthtication', ]
}

1、全局配置用戶認證后,所有的接口類都不需要添加 authentication_classes = [TokenAuthtication, ] 屬性,默認所有接口都會進行登錄校驗
------------------------------------------例子1:views.py-------------------------------------------

from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User

class UserView(APIView):
    def get(self, request, *args, **kwargs):
        """
        獲取用戶列表
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        pk = kwargs.get('pk')  # 獲取單挑數據 id 值
        query = request.GET.get('query')
        print(query)
        ret = {
            "data": {
                "users": []
            },
            "meta": {
                "status": 200,
                "message": ""
            }
        }
        page = LargeResultsSetPagination()
        if not pk:
            if not query:
                queryset = User.objects.all().order_by("-id")
                page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                ser = UserSerializer(instance=page_user, many=True)
                ret["data"]["users"] = ser.data
            else:
                queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
                page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                ser = UserSerializer(instance=page_user, many=True)
                ret["data"]["users"] = ser.data
            return page.get_paginated_response(ret)
        else:
            if User.objects.filter(pk=pk):
                obj_dict = User.objects.filter(pk=pk).first()
                ser = UserSerializer(instance=obj_dict, many=False)
                ret["data"]["users"] = ser.data
            else:
                ret["meta"]["status"] = 500
                ret["meta"]["message"] = "系統報錯"
            return Response(ret)

2、為解決部分接口不需要登錄驗證問題,只需要在該類中設置 authentication_classes = [] ,[] 表示不需要認證

------------------------------------------例子2:views.py-------------------------------------------

from werkzeug.security import generate_password_hash
from .TokenAuthtication import TokenAuthtication
from rest_framework.response import Response
from rest_framework.views import APIView
from adminuser.serializer.userSerializers import *
from rest_framework.pagination import PageNumberPagination
from collections import OrderedDict
from .models import User

class UserView(APIView):
    authentication_classes = []
    def get(self, request, *args, **kwargs):
        """
        獲取用戶列表
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        pk = kwargs.get('pk')  # 獲取單挑數據 id 值
        query = request.GET.get('query')
        print(query)
        ret = {
            "data": {
                "users": []
            },
            "meta": {
                "status": 200,
                "message": ""
            }
        }
        page = LargeResultsSetPagination()
        if not pk:
            if not query:
                queryset = User.objects.all().order_by("-id")
                page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                ser = UserSerializer(instance=page_user, many=True)
                ret["data"]["users"] = ser.data
            else:
                queryset = User.objects.filter(Q(mobile__contains=query) | Q(username__contains=query) | Q(email__contains=query)).order_by("-id")
                page_user = page.paginate_queryset(queryset=queryset, request=request, view=self)
                ser = UserSerializer(instance=page_user, many=True)
                ret["data"]["users"] = ser.data
            return page.get_paginated_response(ret)
        else:
            if User.objects.filter(pk=pk):
                obj_dict = User.objects.filter(pk=pk).first()
                ser = UserSerializer(instance=obj_dict, many=False)
                ret["data"]["users"] = ser.data
            else:
                ret["meta"]["status"] = 500
                ret["meta"]["message"] = "系統報錯"
            return Response(ret)

不傳 token 也能返回數據啦

參考文檔:
1、https://www.cnblogs.com/zhangqunshi/p/8432209.html
2、https://www.cnblogs.com/xingxingnbsp/articles/12597565.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM