1、API驗證分析
API三關驗證
客戶端和服務端中都存放一份相同的隨機字符串,客戶端發請求的時候把隨機字符串和當前時間進行MD5加密,同時帶着當前時間通過請求頭發送到API,進入三關驗證。
第一關是時間驗證 (驗證服務器當前時間和客戶端發送過來的時間,超過10s后,驗證不通過)
第二關是MD5規則驗證(服務端把自己的密鑰同客戶端發送過來的時間進行MD5加密,進行密文的比較)
第三關是訪問列表驗證(從訪問列表中查詢是否存在,如果存在,驗證不通過,否則把當前值存到列表中,並設置超時時間),這里的時間可以設置成2S
邏輯圖1
邏輯圖2
原理:
1、客戶端與服務器都存放着用於驗證的Token字段,值字段無論通過什么方法外部的黑客都是無法獲取的。
2、客戶端把本地的用戶名+時間戳+Token的組合進行MD5加密生成一段新的md5-token
3、客戶端訪問的時候攜帶:用戶名、時間戳、生成的一段新的md5-token
4、服務端收到請求后,先判斷用戶名、時間戳是否合法(時間戳和當前時間不能大於2分鍾)
5、用戶名和時間戳都合法了,在去Redis中判斷是否有當前用戶的key判斷是否在這2分鍾有訪問過,我這里設置Redis2分鍾過期,即合法用戶第一次訪問后,把他的用戶名加入到Redis中作為key:MD5作為vlaue存儲。如果存在說明是在2分鍾內訪問的。拒絕
6、以上都通過之后說明是合法用戶和合法請求,然后在判斷MD5值是否相等如果相同認證通過,執行相關Veiws然后並返回相關的狀態或數據
2、HouseStark代碼
#_*_coding:utf-8_*_ from core import info_collection from conf import settings import urllib.request,sys,os,json,datetime import urllib.parse from core import api_token class ArgvHandler(object): def __init__(self,argv_list): self.argvs = argv_list self.parse_argv() def parse_argv(self): if len(self.argvs) >1: if hasattr(self,self.argvs[1]): func = getattr(self,self.argvs[1]) func() else: self.help_msg() else: self.help_msg() def help_msg(self): msg = ''' collect_data 收集資產數據 run_forever 未實現 get_asset_id 獲取資產id report_asset 匯報資產數據到服務器 ''' print(msg) def collect_data(self): obj = info_collection.InfoCollection() asset_data = obj.collect() #收集 print(asset_data) def run_forever(self): pass def __attach_token(self,url_str): '''generate md5 by token_id and username,and attach it on the url request''' user = settings.Params['auth']['user'] token_id = settings.Params['auth']['token'] md5_token,timestamp = api_token.get_token(user,token_id) url_arg_str = "user=%s×tamp=%s&token=%s" %(user,timestamp,md5_token) if "?" in url_str:#already has arg new_url = url_str + "&" + url_arg_str else: new_url = url_str + "?" + url_arg_str return new_url #print(url_arg_str) def __submit_data(self,action_type,data,method): ''' send data to server :param action_type: url :param data: 具體要發送的數據 :param method: get/post :return: ''' if action_type in settings.Params['urls']: if type(settings.Params['port']) is int: url = "http://%s:%s%s" %(settings.Params['server'],settings.Params['port'],settings.Params['urls'][action_type]) else: url = "http://%s%s" %(settings.Params['server'],settings.Params['urls'][action_type]) url = self.__attach_token(url) print('Connecting [%s], it may take a minute' % url) if method == "get": args = "" for k,v in data.items(): args += "&%s=%s" %(k,v) args = args[1:] url_with_args = "%s?%s" %(url,args) print(url_with_args) try: req = urllib.request.urlopen(url_with_args,timeout=settings.Params['request_timeout']) #req_data =urlopen(req,timeout=settings.Params['request_timeout']) #callback = req_data.read() callback = req.read() print("-->server response:",callback) return callback except urllib.URLError as e: sys.exit("\033[31;1m%s\033[0m"%e) elif method == "post": try: data_encode = urllib.parse.urlencode(data).encode() req = urllib.request.urlopen(url=url,data=data_encode,timeout=settings.Params['request_timeout']) #res_data = urllib.urlopen(req,timeout=settings.Params['request_timeout']) callback = req.read() callback = json.loads(callback.decode()) print("\033[31;1m[%s]:[%s]\033[0m response:\n%s" %(method,url,callback) ) return callback except Exception as e: sys.exit("\033[31;1m%s\033[0m"%e) else: raise KeyError #def __get_asset_id_by_sn(self,sn): # return self.__submit_data("get_asset_id_by_sn",{"sn":sn},"get") def load_asset_id(self,sn=None): asset_id_file = settings.Params['asset_id'] has_asset_id = False if os.path.isfile(asset_id_file): asset_id = open(asset_id_file).read().strip() if asset_id.isdigit(): return asset_id else: has_asset_id = False else: has_asset_id = False def __update_asset_id(self,new_asset_id): asset_id_file = settings.Params['asset_id'] f = open(asset_id_file,"w",encoding="utf-8") f.write(str(new_asset_id)) f.close() def report_asset(self): obj = info_collection.InfoCollection() asset_data = obj.collect() asset_id = self.load_asset_id(asset_data["sn"]) if asset_id: #reported to server before asset_data["asset_id"] = asset_id post_url = "asset_report" else:#first time report to server '''report to another url,this will put the asset into approval waiting zone, when the asset is approved ,this request returns asset's ID''' asset_data["asset_id"] = None post_url = "asset_report_with_no_id" data = {"asset_data": json.dumps(asset_data)} response = self.__submit_data(post_url,data,method="post") if "asset_id" in response: self.__update_asset_id(response["asset_id"]) self.log_record(response) def log_record(self,log,action_type=None): f = open(settings.Params["log_file"],"ab") if log is str: pass if type(log) is dict: if "info" in log: for msg in log["info"]: log_format = "%s\tINFO\t%s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S"),msg) #print msg f.write(log_format.encode()) if "error" in log: for msg in log["error"]: log_format = "%s\tERROR\t%s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S"),msg) f.write(log_format.encode()) if "warning" in log: for msg in log["warning"]: log_format = "%s\tWARNING\t%s\n" %(datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S"),msg) f.write(log_format.encode()) f.close()
一次請求包含POST和GET
部分代碼
def __attach_token(self,url_str): '''generate md5 by token_id and username,and attach it on the url request''' user = settings.Params['auth']['user'] token_id = settings.Params['auth']['token'] md5_token,timestamp = api_token.get_token(user,token_id) url_arg_str = "user=%s×tamp=%s&token=%s" %(user,timestamp,md5_token) if "?" in url_str:#already has arg new_url = url_str + "&" + url_arg_str else: new_url = url_str + "?" + url_arg_str return new_url
客戶端截圖
3、settings代碼
#_*_coding:utf8_*_ import os BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) Params = { #"server": "192.168.12.22", "server": "127.0.0.1", "port":8000, 'request_timeout':30, "urls":{ "asset_report_with_no_id":"/asset/report/asset_with_no_asset_id/", #新資產待批准區 "asset_report":"/asset/report/", #正式資產表接口 }, 'asset_id': '%s/var/.asset_id' % BaseDir, 'log_file': '%s/logs/run_log' % BaseDir, 'auth':{ 'user':'jack', 'token': 'agbc!232' }, }
4、api_token代碼
#_*_coding:utf-8_*_ import hashlib,time def get_token(username,token_id): timestamp = int(time.time()) md5_format_str = "%s\n%s\n%s" %(username,timestamp,token_id) obj = hashlib.md5() obj.update(md5_format_str.encode()) print("token format:[%s]" % md5_format_str) print("token :[%s]" % obj.hexdigest()) return obj.hexdigest()[10:17], timestamp if __name__ =='__main__': print(get_token('alex','test') )
為什么要切片
主要是因為太長了
5、API認證測試
合法認證測試
用戶名密碼截圖
客戶端截圖
服務器控台截圖
更改token后驗證
后台修改token
客戶端截圖