CMDB資產管理系統開發【day27】:cmdb API安全認證


1、API驗證分析

API三關驗證

客戶端和服務端中都存放一份相同的隨機字符串,客戶端發請求的時候把隨機字符串和當前時間進行MD5加密,同時帶着當前時間通過請求頭發送到API,進入三關驗證。

第一關是時間驗證 (驗證服務器當前時間和客戶端發送過來的時間,超過10s后,驗證不通過)
第二關是MD5規則驗證(服務端把自己的密鑰同客戶端發送過來的時間進行MD5加密,進行密文的比較)
第三關是訪問列表驗證(從訪問列表中查詢是否存在,如果存在,驗證不通過,否則把當前值存到列表中,並設置超時時間),這里的時間可以設置成2S

邏輯圖1

邏輯圖2

原理:

1、客戶端與服務器都存放着用於驗證的Token字段,值字段無論通過什么方法外部的黑客都是無法獲取的。
2、客戶端把本地的用戶名+時間戳+Token的組合進行MD5加密生成一段新的md5-token
3、客戶端訪問的時候攜帶:用戶名、時間戳、生成的一段新的md5-token
4、服務端收到請求后,先判斷用戶名、時間戳是否合法(時間戳和當前時間不能大於2分鍾)
5、用戶名和時間戳都合法了,在去Redis中判斷是否有當前用戶的key判斷是否在這2分鍾有訪問過,我這里設置Redis2分鍾過期,即合法用戶第一次訪問后,把他的用戶名加入到Redis中作為key:MD5作為vlaue存儲。如果存在說明是在2分鍾內訪問的。拒絕
6、以上都通過之后說明是合法用戶和合法請求,然后在判斷MD5值是否相等如果相同認證通過,執行相關Veiws然后並返回相關的狀態或數據

2、HouseStark代碼

#_*_coding:utf-8_*_

from core import info_collection
from conf import settings
import urllib.request,sys,os,json,datetime
import urllib.parse
from core import api_token


class ArgvHandler(object):
    def __init__(self,argv_list):
        self.argvs = argv_list
        self.parse_argv()


    def parse_argv(self):
        if len(self.argvs) >1:
            if hasattr(self,self.argvs[1]):
                func = getattr(self,self.argvs[1])
                func()
            else:
                self.help_msg()
        else:
            self.help_msg()
    def help_msg(self):
        msg = '''
        collect_data   收集資產數據
        run_forever    未實現
        get_asset_id   獲取資產id
        report_asset   匯報資產數據到服務器
        '''
        print(msg)

    def collect_data(self):
        obj = info_collection.InfoCollection()
        asset_data = obj.collect() #收集
        print(asset_data)
    def run_forever(self):
        pass
    def __attach_token(self,url_str):
        '''generate md5 by token_id and username,and attach it on the url request'''
        user = settings.Params['auth']['user']
        token_id = settings.Params['auth']['token']

        md5_token,timestamp = api_token.get_token(user,token_id)
        url_arg_str = "user=%s&timestamp=%s&token=%s" %(user,timestamp,md5_token)
        if "?" in url_str:#already has arg
            new_url = url_str + "&" + url_arg_str
        else:
            new_url = url_str + "?" + url_arg_str
        return  new_url
        #print(url_arg_str)

    def __submit_data(self,action_type,data,method):
        '''
        send data to server
        :param action_type: url
        :param data: 具體要發送的數據
        :param method: get/post
        :return:
        '''
        if action_type in settings.Params['urls']:
            if type(settings.Params['port']) is int:
                url = "http://%s:%s%s" %(settings.Params['server'],settings.Params['port'],settings.Params['urls'][action_type])
            else:
                url = "http://%s%s" %(settings.Params['server'],settings.Params['urls'][action_type])

            url =  self.__attach_token(url)
            print('Connecting [%s], it may take a minute' % url)
            if method == "get":
                args = ""
                for k,v in data.items():
                    args += "&%s=%s" %(k,v)
                args = args[1:]
                url_with_args = "%s?%s" %(url,args)
                print(url_with_args)
                try:
                    req = urllib.request.urlopen(url_with_args,timeout=settings.Params['request_timeout'])
                    #req_data =urlopen(req,timeout=settings.Params['request_timeout'])
                    #callback = req_data.read()
                    callback = req.read()
                    print("-->server response:",callback)
                    return callback
                except urllib.URLError as e:
                    sys.exit("\033[31;1m%s\033[0m"%e)
            elif method == "post":
                try:
                    data_encode = urllib.parse.urlencode(data).encode()
                    req = urllib.request.urlopen(url=url,data=data_encode,timeout=settings.Params['request_timeout'])
                    #res_data = urllib.urlopen(req,timeout=settings.Params['request_timeout'])
                    callback = req.read()
                    callback = json.loads(callback.decode())
                    print("\033[31;1m[%s]:[%s]\033[0m response:\n%s" %(method,url,callback) )
                    return callback
                except Exception as e:
                    sys.exit("\033[31;1m%s\033[0m"%e)
        else:
            raise KeyError



    #def __get_asset_id_by_sn(self,sn):
    #    return  self.__submit_data("get_asset_id_by_sn",{"sn":sn},"get")
    def load_asset_id(self,sn=None):
        asset_id_file = settings.Params['asset_id']
        has_asset_id = False
        if os.path.isfile(asset_id_file):
            asset_id = open(asset_id_file).read().strip()
            if asset_id.isdigit():
                return  asset_id
            else:
                has_asset_id =  False
        else:
            has_asset_id =  False

    def __update_asset_id(self,new_asset_id):
        asset_id_file = settings.Params['asset_id']
        f = open(asset_id_file,"w",encoding="utf-8")
        f.write(str(new_asset_id))
        f.close()


    def report_asset(self):
        obj = info_collection.InfoCollection()
        asset_data = obj.collect()
        asset_id = self.load_asset_id(asset_data["sn"])
        if asset_id: #reported to server before
            asset_data["asset_id"] = asset_id
            post_url = "asset_report"
        else:#first time report to server
            '''report to another url,this will put the asset into approval waiting zone, when the asset is approved ,this request returns
            asset's ID'''

            asset_data["asset_id"] = None
            post_url = "asset_report_with_no_id"



        data = {"asset_data": json.dumps(asset_data)}
        response = self.__submit_data(post_url,data,method="post")

        if "asset_id" in response:
            self.__update_asset_id(response["asset_id"])

        self.log_record(response)

    def log_record(self,log,action_type=None):
        f = open(settings.Params["log_file"],"ab")
        if log is str:
            pass
        if type(log) is dict:

            if "info" in log:
                for msg in log["info"]:
                    log_format = "%s\tINFO\t%s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S"),msg)
                    #print msg
                    f.write(log_format.encode())
            if "error" in log:
                for msg in log["error"]:
                    log_format = "%s\tERROR\t%s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S"),msg)
                    f.write(log_format.encode())
            if "warning" in log:
                for msg in log["warning"]:
                    log_format = "%s\tWARNING\t%s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S"),msg)
                    f.write(log_format.encode())

        f.close()

一次請求包含POST和GET

部分代碼

def __attach_token(self,url_str):
        '''generate md5 by token_id and username,and attach it on the url request'''
        user = settings.Params['auth']['user']
        token_id = settings.Params['auth']['token']

        md5_token,timestamp = api_token.get_token(user,token_id)
        url_arg_str = "user=%s&timestamp=%s&token=%s" %(user,timestamp,md5_token)
        if "?" in url_str:#already has arg
            new_url = url_str + "&" + url_arg_str
        else:
            new_url = url_str + "?" + url_arg_str
        return  new_url 

客戶端截圖

3、settings代碼

#_*_coding:utf8_*_
import os
BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

Params = {
    #"server": "192.168.12.22",
    "server": "127.0.0.1",
    "port":8000,
    'request_timeout':30,
    "urls":{
          "asset_report_with_no_id":"/asset/report/asset_with_no_asset_id/", #新資產待批准區
          "asset_report":"/asset/report/", #正式資產表接口
        },
    'asset_id': '%s/var/.asset_id' % BaseDir,
    'log_file': '%s/logs/run_log' % BaseDir,

    'auth':{
        'user':'jack',
        'token': 'agbc!232'
        },
}

4、api_token代碼

#_*_coding:utf-8_*_

import hashlib,time

def get_token(username,token_id):
    timestamp = int(time.time())
    md5_format_str = "%s\n%s\n%s" %(username,timestamp,token_id)
    obj = hashlib.md5()
    obj.update(md5_format_str.encode())
    print("token format:[%s]" % md5_format_str)
    print("token :[%s]" % obj.hexdigest())
    return obj.hexdigest()[10:17], timestamp


if __name__ =='__main__':
    print(get_token('alex','test') )

為什么要切片

主要是因為太長了

5、API認證測試

合法認證測試

用戶名密碼截圖

  

客戶端截圖

  

服務器控台截圖

更改token后驗證

后台修改token

客戶端截圖

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM