python中多進程(multiprocessing)


一、 multiprocessing中使用子進程概念

from multiprocessing import Process

可以通過Process來構造一個子進程

p = Process(target=fun,args=(args))

再通過p.start()來啟動子進程

再通過p.join()方法來使得子進程運行結束后再執行父進程

 
             
from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'
 
             
二、在multiprocessing中使用pool

如果需要多個子進程時可以考慮使用進程池(pool)來管理

from multiprocessing import Pool
 

from multiprocessing import Pool
import os, time

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

 

pool創建子進程的方法與Process不同,是通過

p.apply_async(func,args=(args))實現,一個池子里能同時運行的任務是取決你電腦的cpu數量,如我的電腦現在是有4個cpu,那會子進程task0,task1,task2,task3可以同時啟動,task4則在之前的一個某個進程結束后才開始

http://my.oschina.net/yangyanxing/blog/296052  結果見連接

上面的程序運行后的結果其實是按照上圖中1,2,3分開進行的,先打印1,3秒后打印2,再3秒后打印3

代碼中的p.close()是關掉進程池子,是不再向里面添加進程了,對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。

當時也可以是實例pool的時候給它定義一個進程的多少

如果上面的代碼中p=Pool(5)那么所有的子進程就可以同時進行

三、多個子進程間的通信

多個子進程間的通信就要采用第一步中說到的Queue,比如有以下的需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,

 

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Get %s from queue.' % value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父進程創建Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進程pw,寫入:
    pw.start()    
    # 等待pw結束:
    pw.join()
    # 啟動子進程pr,讀取:
    pr.start()
    pr.join()
    # pr進程里是死循環,無法等待其結束,只能強行終止:
    print
    print '所有數據都寫入並且讀完' 

四、關於上面代碼的幾個有趣的問題

if __name__=='__main__':    
    # 父進程創建Queue,並傳給各個子進程:
    q = Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))    
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有數據都寫入並且讀完'
 

 

如果main函數寫成上面的樣本,本來我想要的是將會得到一個隊列,將其作為參數傳入進程池子里的每個子進程,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類

if __name__=='__main__':
    manager = multiprocessing.Manager()
    # 父進程創建Queue,並傳給各個子進程:
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    time.sleep(0.5)
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有數據都寫入並且讀完'

這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以后再擴展multiprocess中的Manager類吧

關於鎖的應用,在不同程序間如果有同時對同一個隊列操作的時候,為了避免錯誤,可以在某個函數操作隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操作,鎖也要在manager對象中的鎖

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 寫數據進程執行的代碼:
def write(q,lock):
    lock.acquire() #加上鎖
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value        
        q.put(value)        
    lock.release() #釋放鎖  

# 讀數據進程執行的代碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(False)
            print 'Get %s from queue.' % value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    manager = multiprocessing.Manager()
    # 父進程創建Queue,並傳給各個子進程:
    q = manager.Queue()
    lock = manager.Lock() #初始化一把鎖
    p = Pool()
    pw = p.apply_async(write,args=(q,lock))    
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有數據都寫入並且讀完'

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM