python下Pool与target方法写在同一个类里要注意的坑


在工作中遇到要对开发的接口做压力测试,以前没有做过开清楚什么压测工具好用,正好接口不是什么复杂的接口,curl -X post "接口地址" --data-binary @二进制认证文件    OK!(@表示验证数据是文件类型)

既然这样那我就写个脚本好了,脚本内容如下:

#!/usr/bin/evn python
#_*_coding:utf8_*_
from multiprocessing import Pool,Queue
import time,subprocess,os
class YaCe(object):
    def __init__(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):
        self.api = api
        self.binfile = binfile
        self.status = status
        self.maxpool = maxpool
        self.maxrequest = maxrequest
        self.qu = qu
    def prorequest(self):
        for i in range(self.maxrequest):
            self.qu.put(i)
            print(i)
        for i in range(int(self.maxpool)):
            self.qu.put(None)
            print("None")

    def conumers(self,i):
        while True:
            data = self.qu.get(True)
            if data == None:
                print("进程%s任务完成..."%i)
                break
            else:
                command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                if self.status == "success":
                    logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
                    if "CgoyMDAwMDAwMDAw" in command:
                        print("进程%s__%s..."%(str(i),str(data)))
                        with open(logfile,"a") as f:
                            f.write(command+"\n")
                        f.close()
                    else:
                                            print("进程%s__%s..."%(str(i),str(data)))
                        with open(logfile,"a") as f:
                            f.write("Faild\n")
                            f.write(command+"\n")
                        f.close()
                else:
                    logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time
                    #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                    command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                    if "CAES+" in command:
                        print("进程%s__%s..."%(str(i),str(data)))
                        info = command.split('\n')[-3:]
                        info1 = "\n".join(info)
                        with open(logfile,"a") as f:
                            f.write(info1+"\n")
                        f.close()
                    else:
                        print("进程%s__%s..."%(str(i),str(data)))
                        with open(logfile,"a") as f:
                            f.write("Faild\n")
                            f.write(command+"\n")
                        f.close()
    def multirun(self):
        ps = int(int(self.maxpool) - 1)
        p = Pool(ps)
        for i in range(self.maxpool):
            print("开启子进程%s"%i)
            p.apply_async(self.conumers,args=(self,i))
        print('等待所有添加的进程运行完毕。。。')
        p.close()
        p.join()
        endtime = time.strftime("%Y%m%d_%X",time.localtime())
        if self.status == "success":
            logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
        else:
            logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time
        with open(logfile,"a") as f:
            f.write("============[%s]============\n"%endtime)
        f.close()
        print('End!!,PID:%s'% os.getpid())


if __name__ == "__main__":
    q = Queue()
    Yc = YaCe('压测接口','二进制证认文件',开多少个进程,queue(队列),maxrequest=100(模拟测试多少次访问),status="faild"(这里因为测试的两个接口,返回不一样用status参数区分测试的接口的返回值处理))
    Yc.prorequest()
    print("++++++")
    global date_time
    date_time = time.strftime("%Y%m%d_%X",time.localtime())
    Yc.multirun()

问题
到这里写完了,测试的问题来了,从脚本来看如果运行成功,会有多进程在处理队列的输出,可是结果的输出确是如下

0
1
2
3
4
5
6
7
8
9
None
None
++++++
开启子进程0
开启子进程1
等待所有添加的进程运行完毕。。。
End!!,PID:4819

原因
子进程conumers方法完全没有运行,也没有报错这就尴尬了;查了大量的文档资料;发现这个pool方法都使用了queue.Queue将task传递给工作进程。multiprocessing必须将数据序列化以在进程间传递。方法只有在模块的顶层时才能被序列化,跟类绑定的方法不能被序列化,就会出现上面的异常 ; 那肿么办,我不是一个轻易放弃的人,终于被我找到了方法;

注意
解决方作者是在python3下测试了,python2下用脚本的subprocess要换成value,command = commands.getstatusoutput

解决方法1(亲测)


1.首先要看报错,需要对脚本修改如下:
YaCe类下的multirun方法下修改

for i in range(self.maxpool):
            print("开启子进程%s"%i)
            p.apply_async(self.conumers,args=(self,i))


for i in range(self.maxpool):
            print("开启子进程%s"%i)
            res = p.apply_async(self.conumers,args=(self,i))
print(res.get)


这就可以看到报错:


**cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin.instancemethod failed**

2.解决方法如下在脚本中加一个新的函数
(1)

.def conumers_wrapper(cls_instance,i):
    return cls_instance.conumers(i)

       
(2).修改YaCe下multirun方法

for i in range(self.maxpool):
            print("开启子进程%s"%i)
            res = p.apply_async(self.conumers,args=(self,i))
print(res.get())

为                   

for i in range(self.maxpool):
            print("开启子进程%s"%i)
            res = p.apply_async(conumers_wrapper,args=(self,i))
print(res.get)


问题解决了,运行一下脚本结果还有报错:
**RuntimeError: Queue objects should only be shared between processes through inheritance**

原因
这里不可以用Queue,要改用Manager.Queue;因为进程之前的同共离用Queue会用问题;

完结
最终代码如下
:

#!/usr/bin/evn python
#_*_coding:utf8_*_
from multiprocessing import Pool,Queue,Manager
import time,subprocess,os
class YaCe(object):
    def __init__(self,api,binfile,maxpool,qu,maxrequest=100000,status="success"):
        self.api = api
        self.binfile = binfile
        self.status = status
        self.maxpool = maxpool
        self.maxrequest = maxrequest
        self.qu = qu
    def prorequest(self):
        for i in range(self.maxrequest):
            self.qu.put(i)
            print(i)
        for i in range(int(self.maxpool)):
            self.qu.put(None)
            print("None")

    def conumers(self,i):
        while True:
            data = self.qu.get(True)
            if data == None:
                print("进程%s任务完成..."%i)
                break
            else:
                #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                #command = subprocess.getoutput("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                if self.status == "success":
                    logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
                    if "CgoyMDAwMDAwMDAw" in command:
                        print("进程%s__%s..."%(str(i),str(data)))
                        with open(logfile,"a") as f:
                            f.write(command+"\n")
                        f.close()
                    else:
                        with open(logfile,"a") as f:
                            f.write("Faild\n")
                            f.write(command+"\n")
                        f.close()
                else:
                    logfile = os.getcwd()+"/"+"roomlist.log"+"_%s"%date_time
                    #print("time curl -X POST '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                    command = subprocess.getoutput("time curl -X POST --connect-timeout 10 '%s'  --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                    #command = subprocess.getoutput("time curl -X POST '%s' --data-binary @%s"%(self.api,os.getcwd()+"/"+self.binfile))
                    if "CAES+" in command:
                        print("进程%s__%s..."%(str(i),str(data)))
                        info = command.split('\n')[-3:]
                        info1 = "\n".join(info)
                        with open(logfile,"a") as f:
                            f.write(info1+"\n")
                        f.close()
                    else:
                        print("进程%s__%s..."%(str(i),str(data)))
                        with open(logfile,"a") as f:
                            f.write("Faild\n")
                            f.write(command+"\n")
                        f.close()
    def multirun(self):
        ps = int(int(self.maxpool) - 1)
        p = Pool(ps)
        for i in range(self.maxpool):
            print("开启子进程%s"%i)
            p.apply_async(conumers_wrapper,args=(self,i))
        #print(res.get)
        print('等待所有添加的进程运行完毕。。。')
        p.close()
        p.join()
        endtime = time.strftime("%Y%m%d_%X",time.localtime())
        if self.status == "success":
            logfile = os.getcwd()+"/"+"headbeat.log"+"_%s"%date_time
        else:
            logfile = os.getcwd() + "/" + "roomlist.log"+"_%s"%date_time
        with open(logfile,"a") as f:
            f.write("============[%s]============\n"%endtime)
        f.close()
        print('End!!,PID:%s'% os.getpid())

def conumers_wrapper(cls_instance,i):
    return cls_instance.conumers(i)

if __name__ == "__main__":
    q = Manager().Queue()
    Yc = YaCe('压测接口','二进制证认文件',开多少个进程,queue(队列),maxrequest=100(模拟测试多少次访问),status="faild"(这里因为测试的两个接口,返回不一样用status参数区分测试的接口的返回值处理))
    Yc.prorequest()
    print("++++++")
    global date_time
    date_time = time.strftime("%Y%m%d_%X",time.localtime())
    Yc.multirun()


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM