python中multiprocessing、multiprocessing.dummy和threading用法筆記


一、multiprocessing

用法參考地址:multiprocessing用法
首先解釋一個誤區:
進程池的大小是每次同時執行的進程數,但是並不會影響主進程申請進程的數量。主進程申請多進程量不等於池子大小。

1、子進程無返回值

# -*- coding:utf-8 -*- from multiprocessing import Pool as Pool import time def func(msg): print 'msg:', msg time.sleep(2) print 'end:' pool = Pool(processes=3) for i in xrange(1, 5): msg = 'hello %d' % (i) pool.apply_async(func,(msg,)) # 非阻塞 # pool.apply(func,(msg,)) # 阻塞,apply()源自內建函數,用於間接的調用函數,並且按位置把元祖或字典作為參數傳入。 # pool.imap(func,[msg,]) # 非阻塞, 注意與apply傳的參數的區別 # pool.map(func, [msg, ]) # 阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print 'sub-process done' 
  1. 非阻塞方法
    multiprocessing.Pool.apply_async() 和 multiprocessing.Pool.imap()
    進程並發執行
  2. 阻塞方法
    multiprocessing.Pool.apply()和 multiprocessing.Pool.map()
    進程順序執行

2、子進程有返回值

只有apply_async可以有返回值,apply,map,imap不可以設置返回值.

# -*- coding:utf-8 -*- from multiprocessing import Pool as Pool import time def func(msg): print 'msg:', msg time.sleep(2) print 'end:' return msg pool = Pool(processes=3) result = [] for i in xrange(1, 5): msg = 'hello %d' % (i) res = pool.apply_async(func,(msg,)) # 非阻塞 只有apply_async可以有返回值,apply,map,imap不可以設置返回值 result.append(res) print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 for res in result: print "sub_process return: ", res.get() print 'sub-process done' 

一定要注意res.get()方法是堵塞的。只有子進程執行完畢並返回數據時 res.get()方法才會執行,否則主進程堵塞,並等待。
看下面這個程序: 如何高效處理子進程有返回值的多進程任務

from multiprocessing import Pool import Queue import time def test(p): time.sleep(0.5) if p == 100: return (p,True) else: return (p,False) if __name__ == "__main__": pool = Pool(processes=10) q = Queue.Queue() for i in xrange(500): # 將子進程對象存入隊列中。 q.put( pool.apply_async(test, args=(i,)) ) # 維持執行的進程總數為10,當一個進程執行完后添加新進程. print(i) ''' 因為這里使用的為pool.apply_async異步方法,因此子進程執行的過程中,父進程會執行while,獲取返回值並校驗。 ''' print("======", q.qsize()) while 1: a = q.get().get(); print(a) if a[1]: pool.terminate() # 結束進程池中的所有子進程。 break pool.join() 

該程序瞬間執行到 print("======", q.qsize()) 行,並且每次執行 a = q.get().get()代碼時,如果對應進程沒有執行完,即沒有返回輸出值時,該行代碼導致主進程堵塞等待。

如果需要申請龐大的進程數量時,就會很浪費資源比如下面:

for i in xrange(500000000): # 將子進程對象存入隊列中。 q.put( pool.apply_async(test, args=(i,)) ) # 維持執行的進程總數為10,當一個進程執行完后添加新進程. print(i) 

我們可以開啟2個線程,一個線程申請進程,另一個線程判斷結束所有子進程的進程是否已經到達。
如下:

from multiprocessing import Pool import Queue import threading import time def test(p): time.sleep(0.001) if p == 10000: return True else: return False if __name__ == "__main__": result = Queue.Queue() # 隊列 pool = Pool() def pool_th(): for i in xrange(50000000000): ##這里需要創建執行的子進程非常多 try: result.put(pool.apply_async(test, args=(i,))) except: break def result_th(): while 1: a = result.get().get() # 獲取子進程返回值 if a: pool.terminate() # 結束所有子進程 break ''' 利用多線程,同時運行Pool函數創建執行子進程,以及運行獲取子進程返回值函數。 ''' t1 = threading.Thread(target=pool_th) t2 = threading.Thread(target=result_th) t1.start() t2.start() t1.join() t2.join() pool.join() 

3、多進程共享資源

申請進程有兩種方式一種是multiprocessing.Process(),另一種是multiprocessing.Pool(process=3).apply_async().
multiprocessing提供三種多進程之間共享數據的數據結構: Queue, Array 和Manager.

from multiprocessing import Queue, Array, Manager 

Queue、和Array只適用Process類申請的多進程共享資源。
Manager可以適用Pool和Process類申請的多進程共享資源。

import time from multiprocessing import Manager, Pool lists = Manager().list() # 定義可被子進程共享的全局變量lists def func(i): # time.sleep(1) lists.append(i) print i pool = Pool(processes=3) for i in xrange(10000000): if len(lists) <= 0: pool.apply_async(func, args=(i,)) else: break pool.close() pool.join() print(lists) 

輸出結果為:且i最大值不定。主進程申請多進程量不等於池子大小。
在這里插入圖片描述

二、多線程 Multiprocessing.dummy

1、子進程無返回值

Multiprocessing.dummy.Pool() 與Multiprocessing.Pool() 的用法一樣

  1. 非阻塞方法
    multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()
    線程並發執行
  2. 阻塞方法
    multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()
    線程順序執行
from multiprocessing.dummy import Pool as Pool import time def func(msg): print('msg:', msg) time.sleep(2) print('end:') pool = Pool(processes=3) for i in range(1, 5): msg = 'hello %d' % (i) pool.apply_async(func, (msg,)) # 非阻塞 # pool.apply(func,(msg,)) # 阻塞,apply()源自內建函數,用於間接的調用函數,並且按位置把元祖或字典作為參數傳入。 # pool.imap(func,[msg,]) # 非阻塞, 注意與apply傳的參數的區別 # pool.map(func, [msg, ]) # 阻塞 print('Mark~~~~~~~~~~~~~~~') pool.close() pool.join() # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束 print('sub-process done') 

2、子進程有返回值

與多進程一樣,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以設置返回值.
3、多進程共享資源

三、多線程 Threading

1、創建方法

  1. 直接使用Thread類
from threading import Thread import time def run(a = None, b = None) : print a, b time.sleep(1) t = Thread(target = run, args = ("this is a", "thread")) #此時線程是新建狀態 print t.getName()#獲得線程對象名稱 print t.isAlive()#判斷線程是否還活着。 t.start()#啟動線程 t.join()#等待其他線程運行結束 
  1. 繼承Thread類
from threading import Thread import time class MyThread(Thread) : def __init__(self, a) : super(MyThread, self).__init__() #調用父類的構造方法 self.a = a def run(self) : print "sleep :", self.a time.sleep(self.a) t1 = MyThread(2) t2 = MyThread(4) t1.start() t2.start() t1.join() t2.join()
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM