Django上自建一個token校驗機制


場景介紹

自己開發的接口有些需要實名認證,不能直接讓匿名用戶訪問,那如果使用django的login_require裝飾器設置在接口方法前面 會直接返回登錄頁面,所以在這個時候,可以考慮自己開發一個驗證機制了,防止匿名用戶調用。

驗證流程

  1. 先讓用戶提交用戶密碼到獲取token的接口
  2. 拿到用戶名密碼后去后台驗證是否有效,如果有效就判斷這個用戶是否有token在數據庫中,有的話就更新這個token,否則就新建一條記錄在數據庫,然后返回這個新建好的token的給請求者。
  3. 用戶拿到剛才請求到的token后,在請求頭或者請求參數里面攜帶這個token。然后再去請求對應的接口。
  4. 請求過來以后,首先自建的驗證機制會對token進行校驗,去請求參數里面找,參數也沒有就返回校驗失敗,找到token以后就去判斷這個token是否存在和過期,token無效、過期的話,返回校驗失敗,只有校驗成功以后,才會放行請求到接口上,此時接口才會真正處理用戶的請求。

需要做的事

  1. 新建一個表
  2. 新建一個創建token的方法
  3. 新建一個校驗token的裝飾器
  4. 接口上使用裝飾器即可

以上在你的項目中一個models和views編寫即可,編寫完以后,哪里需要使用,導入這個裝飾器即可。

實操步驟

以下都運行在python3.5.6 和 django 1.11.13 上,無問題。

創建數據庫
from django.db import models

# Create your models here.
from pisces import settings
from datetime import timedelta,datetime
from django.contrib.auth.models import User
import pytz
import random
#tzutc_8=pytz.FixedOffset(480)
#tzutc_8=pytz.timezone("UTC")
tzutc_8=pytz.timezone('Asia/Shanghai')

class user_token(models.Model):
    user = models.OneToOneField(User,related_name='user_profile')
    utype = models.CharField(max_length=15, default="guest") # ?????? guest access admin
    utoken = models.CharField(max_length=256, null=True,blank=True)
    utokenCreateTime = models.DateTimeField(null=True,blank=True)
    utokenExpire = models.DateTimeField(null=True,blank=True)

    def getRandomChar(self,start=None,end=None,length=30):
        '''
        get a random int and then translate to char
        :param start:
        :param end:
        :param length:
        :return:  return a token
        '''
        if ( start and not start.isdigit() ) or not start:
            start = 65
        if ( end and not end.isdigit() ) or not end:
            end = 122
        randomChar = ""
        for i in range(length):
            x = random.randint(start,end)
            # >>> chr(91)
            # '['
            # >>> chr(92)
            # '\\'
            # >>> chr(93)
            # ']'
            # >>> chr(94)
            # '^'
            # >>> chr(95)
            # '_'
            # >>> chr(96)
            # '`'

            if x == 91 or x == 92 or x == 93 or x == 94 or x == 95 or x == 96:
                continue
            randomChar = randomChar + chr(x)
        return randomChar

    def updateToken(self):
        '''
        update the user's token ,if the token is exist,it will be update ,else the token is not exist,it will be create
        :return:
        '''
        utokenDuration = settings.UTOKEN_DURATION
        if self.utokenCreateTime:  # means it has created,
            if not  self.utokenExpire:
                self.utokenExpire = datetime.now() + timedelta(seconds=utokenDuration)
                self.save()
            differenceTime = self.utokenExpire - self.utokenCreateTime
            if differenceTime.seconds - utokenDuration <= 60: # update the token before 1 min
                randomChar = self.getRandomChar()
                self.utoken = randomChar
                self.utokenExpire = datetime.now()   + timedelta(seconds=utokenDuration)
                self.save()
                return {"code":200,"msg":"update utoken successfully!"}
            else:
                return {"code":301,"msg":"the utoken is not expire!so won't update! "}
        else:
            randomChar = self.getRandomChar()
            self.utoken = randomChar
            self.utokenCreateTime = datetime.now()
            self.utokenExpire = datetime.now()  + timedelta(seconds=utokenDuration)
            self.save()
            return {"code":200,"msg":"create utoken successfully!"}

    def checkUtokenIsValid(self,utoken):
        '''
        check the utoken weather is valid or not!
        :return:
        '''
        if self.utokenCreateTime and self.utokenExpire:  # means it has created,
            #print(self.utokenCreateTime.astimezone(tzutc_8),  self.utokenCreateTime, datetime.now())
            if self.utokenExpire.astimezone(tzutc_8) <=  pytz.utc.localize(datetime.now()): # update the token before 1 min
                return {"code":400,"msg":"the utoken was expire! please update it! exipre time:%s , system time:%s"%(self.utokenExpire.astimezone(tzutc_8).strftime("%Y-%m-%d %H:%M:%S"),pytz.utc.localize(datetime.now()))}
            else:
                if utoken == self.utoken:
                    return {"code":200,"msg":"correct"}
                else:
                    return {"code":401,"msg":"not correct"}
        else:
            return {"code":400,"msg":"the utoken is not create! please authenticate! "}


    def __str__(self):
        return "%s-%s"%(self.user.username,self.utype)

編寫裝飾器與創建token的方法
創建token的方法
from django.shortcuts import render,redirect
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from center.models import user_token
from django.http import JsonResponse
from django.contrib.auth import login as auth_login, logout as auth_logout
from django.views.decorators.csrf import csrf_exempt
from pisces.mylog import writeDebug,writeInfo,writeErr

@csrf_exempt
def get_utoken(request):
    '''
    get the user's token
    :param request:
    :return:
    '''
    if request.method == 'POST':
        username = request.POST.get('username', '')
        password = request.POST.get('password', '')
        user = authenticate(username=username, password=password)
        if user:
            writeDebug("center/views","get_utoken","the user[%s] login successfully"%username)
            auth_login(request, user, )
            userObj = User.objects.filter(username=username)
            if not userObj:
                writeDebug("center/views","get_utoken","no userObj[%s],begin to create the user"%username)
                ux = User.objects.create_user(username,"%s@chanjet.com"%username,password)
                utk = user_token(user=ux)
                utk.save()
                utkobj = user_token.objects.filter(user=ux).first()
                updateTokenMsg = utkobj.updateToken()
                utoken = utkobj.utoken
            else:
                utkobj = user_token.objects.filter(user=userObj)
                if not utkobj:
                    utkobj_save = user_token(user=userObj.first())
                    utkobj_save.save()
                    #utkObj = user_token.objects.filter(user=userObj)
                updateTokenMsg = utkobj.first().updateToken()
                utkobj.first().save()

                utoken = utkobj.first().utoken
            writeInfo("frame/views","login","the username[ {username} ] login and create token successfully!".format(username=username))
            return JsonResponse({"utoken":utoken,"code":200,"msg":updateTokenMsg.get("msg")})
        writeInfo("frame/views","login","the username[ {username} ] or password is invalied,can't login!".format(username=username))
        return JsonResponse({"code":401,"msg":"the username or password is not correct!"})
    else:
        return JsonResponse({"code":402,"msg":"only post method is support"})
校驗token的裝飾器
from django.shortcuts import render,redirect
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from center.models import user_token
from django.http import JsonResponse
from django.contrib.auth import login as auth_login, logout as auth_logout
from django.views.decorators.csrf import csrf_exempt
from pisces.mylog import writeDebug,writeInfo,writeErr
def checkUserToken(func):
    '''
    check if the user has the token
    :param func:
    :return:
    '''
    def warper(*args,**kwargs):
        try:
            utoken = args[0].POST.get("utoken")
            if utoken is None:
                utoken = args[0].GET.get("utoken")
        except BaseException as e:
            try:
                utoken = args[0].GET.get("utoken")
            except BaseException as e:
                return JsonResponse({"code":400,"msg":"can't get utoken! please check params"})

        if not utoken:
            return JsonResponse({"code":400,"msg":"can't get utoken! please check params"})
            
        userObj = user_token.objects.filter(utoken=utoken)
        if userObj:
            ux = userObj.first()
            result = ux.checkUtokenIsValid(utoken)
            if result.get("code") == 200:
                kwargs['username'] = ux.user.username
                return func(*args,**kwargs)
            else:
                return JsonResponse(result)
        else:
            return JsonResponse({"code":404,"msg":"can't find the user by utoken--> %s"%(utoken)})
    return warper

使用裝飾器


from center.views import checkUserToken
from django.http.response import JsonResponse

@checkUserToken
@csrf_exempt
def create_zk_with_zkjob(request):
    '''
    
    :param request:
    :return:
    '''
    // 請求參數該怎么獲取就怎么獲取,並沒有改變參數的獲取方式
    // 以下替換成你的邏輯代碼即可。
    post_env = request.POST.get("env")
    post_cxxx = request.POST.get("cxxx")
    post_project_name = request.POST.get("project_enname")
    zkop = core.zkopreation(post_env)
    zknodename = "%s%s_%s/foo"%(zkop.zkRootPath,post_cxxx,post_project_name)
    zkop_result = zkop.createPath(zknodename)
    return JsonResponse(zkop_result)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM