python多進程並發進程池Pool


簡介:  

       python中的多進程主要使用到 multiprocessing 這個庫。低版本python這個庫在使用 multiprocessing.Manager().Queue時會出問題,建議大家升級到高版本python。

一、多進程使用

1、linux下可使用 fork 函數

#!/bin/env python
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
    os._exit(1)
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

輸出 

Process (22246) start...
I (22246) just created a child process (22247).
I am child process (22247) and my parent is 22246.

2、使用 multiprocessing

 

#!/bin/env python
from multiprocessing import Process
import os
import time

def run_proc(name):
    time.sleep(3)
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    processes = list()
    for i in range(5):
        p = Process(target=run_proc, args=('test',))
        print 'Process will start.'
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    print 'Process end.'

輸出

Parent process 38140.
Process will start.
Process will start.
Process will start.
Process will start.
Process will start.
Run child process test (38141)...
Run child process test (38142)...
Run child process test (38143)...
Run child process test (38145)...
Run child process test (38144)...
Process end.

real    0m3.028s
user    0m0.021s
sys     0m0.004s

 

二、進程池Pool

 

Pool類相和關方法介紹:

 Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。 
下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法:

apply()

函數原型:apply(func[, args=()[, kwds={}]])  

該函數用於傳遞不定參數,同python中的apply函數一致,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以后不在出現)。

apply_async  

函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一致,但它是非阻塞的且支持結果返回后進行回調。 

map()

函數原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到結果返回。 
注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

map_async()

函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。  

close() 

關閉進程池(pool),使其不在接受新的任務。

terminal()

結束工作進程,不在處理未處理的任務。 

join()

主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用。

 

1、使用 multiprocessing.Pool 非阻塞

#!/bin/env python

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(3):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()    # behind close() or terminate()
    print "Sub-process(es) done.

運行結果

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
Sub-process(es) done.

real    0m3.493s
user    0m0.056s
sys     0m0.022s

 

2、使用 multiprocessing.Pool 阻塞版本

下面我們看一個簡單的multiprocessing.Pool類的實例:

 

#!/bin/env python

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(3):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))      

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()    # behind close() or terminate()
    print "Sub-process(es) done."

運行結果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

real    0m9.061s
user    0m0.036s
sys     0m0.019s

區別主要是 apply_async和 apply函數,前者是非阻塞的,后者是阻塞。可以看出運行時間相差的倍數正是進程池數量

3、使用 multiprocessing.Pool 並關注結果

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."

運行結果

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

real    0m3.526s
user    0m0.054s
sys     0m0.024s

4、在類中使用 multiprocessing.Pool

類中使用進程池會一般會出現錯誤

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

這個提示是因為 multiprocessing.Pool中使用了Queue通信,所有進入隊列的數據必須可序列化(picklable),包括自定義類實例等。如下:

#!/bin/env python

import multiprocessing

class SomeClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)
        #result = pool.apply_async(self.f, [10])     
        #print result.get(timeout=1)           
        print pool.map(self.f, range(10))

SomeClass().go()

運行提示

Traceback (most recent call last):
  File "4.py", line 18, in <module>
    SomeClass().go()
  File "4.py", line 16, in go
    print pool.map(self.f, range(10))
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

解決如下:(1)

#!/bin/env python
import multiprocessing

def func(x):
    return x*x

class SomeClass(object):
    def __init__(self,func):
        self.f = func

    def go(self):
        pool = multiprocessing.Pool(processes=4)
        #result = pool.apply_async(self.f, [10])
        #print result.get(timeout=1)
        print pool.map(self.f, range(10))

SomeClass(func).go()

輸出結果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

(2)一般情況下我們如果在類中寫好了處理邏輯,想要盡可能減少代碼變動則可以使用下面方法

#!/bin/env python

import multiprocessing

class SomeClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        result = list()
        pool = multiprocessing.Pool(processes=4)
        for i in range(10):
            result.append(pool.apply_async(func, [self, i]))
        pool.close()
        pool.join()
        for res in result:
            print res.get(timeout=1)   

def func(client, x):
    return client.f(x)

SomeClass().go()

輸出結果:

0
1
4
9
16
25
36
49
64
81

使用(2)的解決方法需要注意,如果SomeClass實例中有包含任何不可序列化的數據則會一直報錯,一般是到res.get()報錯,這時候你就要重新查看代碼是否有不可序列化的變量了。如果有的話可以更改成全局變量解決。

 

三、多進程中使用線程池

有一種情景下需要使用到多進程和多線程:在CPU密集型的情況下一個ip的處理速度是0.04秒前后,單線程運行的時間大概是3m32s,單個CPU使用率100%;使用進程池(size=10)時間大概是6m50s,其中只有1個進程的CPU使用率達到90%,其他均是在30%左右;使用線程池(size=10)時間大概是4m39s,單個CPU使用率100%

可以看出使用多進程在這時候並不占優勢,反而更慢。因為進程間的切換消耗了大部分資源和時間,而一個ip只需要0.04秒。而使用線程池由於只能利用單核CPU,則再怎么加大線程數量都沒法提升速度,所以這時候應該使用多進程加多線程結合。

def run(self):
    self.getData()
    ipNums = len(self.ipInfo)
    step = ipNums / multiprocessing.cpu_count()
    ipList = list()
    i = 0
    j = 1
    processList = list()
    for ip in self.ipInfo:
        ipList.append(ip)
        i += 1
        if i == step * j or i == ipNums:
            j += 1
            def innerRun():
                wm = Pool.ThreadPool(CONF.POOL_SIZE)
                for myIp in ipList:
                    wm.addJob(self.handleOne, myIp)
                wm.waitForComplete()
            process = multiprocessing.Process(target=innerRun)
            process.start()
            processList.append(process)
            ipList = list()
    for process in processList:
        process.join()

機器有8個CPU,則使用8個進程加線程池,速度提升到35s,8個CPU的利用率均在50%左右,機器平均CPU75%左右。

 

四、多進程間通信

個人使用的比較多的是隊列和共享內存。需要注意的是隊列中Queue.Queue是線程安全的,但並不是進程安全,所以多進程一般使用線程、進程安全的multiprocessing.Queue(),而使用這個Queue如果數據量太大會導致進程莫名卡住(絕壁大坑來的),需要不斷地消費。

The Queue class is a near clone of Queue.Queue. For example:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()
Queues are thread and process safe.

測試卡住的程序如下:

#!/bin/env python
from multiprocessing import Process, Queue


class A(object):
    def __init__(self):
        pass
    def r(self):
        def f(q):
            import time
            time.sleep(1)
            s = 2000 * 'ss'        # 不卡不卡不卡
            # s = 20000 * 'ss'          # 卡住卡住卡住              
            q.put(['hello', s])
            print "q.put(['hello', s])"
        q = Queue(maxsize=0)
        pL = list()
        for i in range(10):
            p = Process(target=f, args=(q,))
            p.start()
            pL.append(p)
        for p in pL:
            p.join()
        print len(q.get())


if __name__ == '__main__':
    A().r()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM