在工作中遇到要對開發的接口做壓力測試,以前沒有做過開清楚什么壓測工具好用,正好接口不是什么復雜的接口,curl -X post "接口地址" --data-binary @二進制認證文件 OK!(@表示驗證數據是文件類型)
既然這樣那我就寫個腳本好了,腳本內容如下:
#!/usr/bin/evn python #_*_coding:utf8_*_ from multiprocessing import Pool,Queue import time,subprocess,os class YaCe(object): def __init__(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"): self.api = api self.binfile = binfile self.status = status self.maxpool = maxpool self.maxrequest = maxrequest self.qu = qu def prorequest(self): for i in range(self.maxrequest): self.qu.put(i) print(i) for i in range(int(self.maxpool)): self.qu.put(None) print("None") def conumers(self,i): while True: data = self.qu.get(True) if data == None: print("進程%s任務完成..."%i) break else: command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) if self.status == "success": logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time if "CgoyMDAwMDAwMDAw" in command: print("進程%s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write(command+"\n") f.close() else: print("進程%s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write("Faild\n") f.write(command+"\n") f.close() else: logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time #print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) if "CAES+" in command: print("進程%s__%s..."%(str(i),str(data))) info = command.split('\n')[-3:] info1 = "\n".join(info) with open(logfile,"a") as f: f.write(info1+"\n") f.close() else: print("進程%s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write("Faild\n") f.write(command+"\n") f.close() def multirun(self): ps = int(int(self.maxpool) - 1) p = Pool(ps) for i in range(self.maxpool): print("開啟子進程%s"%i) p.apply_async(self.conumers,args=(self,i)) print('等待所有添加的進程運行完畢。。。') p.close() p.join() endtime = time.strftime("%Y%m%d_%X",time.localtime()) if self.status == "success": logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time else: logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time with open(logfile,"a") as f: f.write("============[%s]============\n"%endtime) f.close() print('End!!,PID:%s'% os.getpid()) if __name__ == "__main__": q = Queue() Yc = YaCe('壓測接口','二進制證認文件',開多少個進程,queue(隊列),maxrequest=100(模擬測試多少次訪問),status="faild"(這里因為測試的兩個接口,返回不一樣用status參數區分測試的接口的返回值處理)) Yc.prorequest() print("++++++") global date_time date_time = time.strftime("%Y%m%d_%X",time.localtime()) Yc.multirun()
問題
到這里寫完了,測試的問題來了,從腳本來看如果運行成功,會有多進程在處理隊列的輸出,可是結果的輸出確是如下:
0 1 2 3 4 5 6 7 8 9 None None ++++++ 開啟子進程0 開啟子進程1 等待所有添加的進程運行完畢。。。 End!!,PID:4819
原因
子進程conumers方法完全沒有運行,也沒有報錯這就尷尬了;查了大量的文檔資料;發現這個pool方法都使用了queue.Queue將task傳遞給工作進程。multiprocessing必須將數據序列化以在進程間傳遞。方法只有在模塊的頂層時才能被序列化,跟類綁定的方法不能被序列化,就會出現上面的異常 ; 那腫么辦,我不是一個輕易放棄的人,終於被我找到了方法;
注意
解決方作者是在python3下測試了,python2下用腳本的subprocess要換成value,command = commands.getstatusoutput
解決方法1(親測)
1.首先要看報錯,需要對腳本修改如下:
YaCe類下的multirun方法下修改:
for i in range(self.maxpool): print("開啟子進程%s"%i) p.apply_async(self.conumers,args=(self,i))
為
for i in range(self.maxpool): print("開啟子進程%s"%i) res = p.apply_async(self.conumers,args=(self,i)) print(res.get)
這就可以看到報錯:
**cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin.instancemethod failed**
2.解決方法如下在腳本中加一個新的函數
(1)
.def conumers_wrapper(cls_instance,i): return cls_instance.conumers(i)
(2).修改YaCe下multirun方法
for i in range(self.maxpool): print("開啟子進程%s"%i) res = p.apply_async(self.conumers,args=(self,i)) print(res.get())
為
for i in range(self.maxpool): print("開啟子進程%s"%i) res = p.apply_async(conumers_wrapper,args=(self,i)) print(res.get)
問題解決了,運行一下腳本結果還有報錯:
**RuntimeError: Queue objects should only be shared between processes through inheritance**
原因
這里不可以用Queue,要改用Manager.Queue;因為進程之前的同共離用Queue會用問題;
完結
最終代碼如下:
#!/usr/bin/evn python #_*_coding:utf8_*_ from multiprocessing import Pool,Queue,Manager import time,subprocess,os class YaCe(object): def __init__(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"): self.api = api self.binfile = binfile self.status = status self.maxpool = maxpool self.maxrequest = maxrequest self.qu = qu def prorequest(self): for i in range(self.maxrequest): self.qu.put(i) print(i) for i in range(int(self.maxpool)): self.qu.put(None) print("None") def conumers(self,i): while True: data = self.qu.get(True) if data == None: print("進程%s任務完成..."%i) break else: #print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) #command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) if self.status == "success": logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time if "CgoyMDAwMDAwMDAw" in command: print("進程%s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write(command+"\n") f.close() else: with open(logfile,"a") as f: f.write("Faild\n") f.write(command+"\n") f.close() else: logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time #print("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) #command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile)) if "CAES+" in command: print("進程%s__%s..."%(str(i),str(data))) info = command.split('\n')[-3:] info1 = "\n".join(info) with open(logfile,"a") as f: f.write(info1+"\n") f.close() else: print("進程%s__%s..."%(str(i),str(data))) with open(logfile,"a") as f: f.write("Faild\n") f.write(command+"\n") f.close() def multirun(self): ps = int(int(self.maxpool) - 1) p = Pool(ps) for i in range(self.maxpool): print("開啟子進程%s"%i) p.apply_async(conumers_wrapper,args=(self,i)) #print(res.get) print('等待所有添加的進程運行完畢。。。') p.close() p.join() endtime = time.strftime("%Y%m%d_%X",time.localtime()) if self.status == "success": logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time else: logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time with open(logfile,"a") as f: f.write("============[%s]============\n"%endtime) f.close() print('End!!,PID:%s'% os.getpid()) def conumers_wrapper(cls_instance,i): return cls_instance.conumers(i) if __name__ == "__main__": q = Manager().Queue() Yc = YaCe('壓測接口','二進制證認文件',開多少個進程,queue(隊列),maxrequest=100(模擬測試多少次訪問),status="faild"(這里因為測試的兩個接口,返回不一樣用status參數區分測試的接口的返回值處理)) Yc.prorequest() print("++++++") global date_time date_time = time.strftime("%Y%m%d_%X",time.localtime()) Yc.multirun()