概述
由於python中全局解釋器鎖(GIL)的存在,所以python多線程並不能有效利用CPU多核的性能(相當於單核並發)實現多線程多核並行,所以在對CPU密集型的程序時處理效率較低,反而對IO密集型的才有效率的大幅度提高。
如果想要充分地使用多核CPU的資源,需要使用多進程,python中提供multiprocessing實現。
CPU密集型:主要特點是需要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。
IO密集型:主要涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。
所以python在多線程處理CPU密集型程序時可以選擇多進程實現,有效的利用多核提升效率;而IO密集型的由於99%的時間都花在IO上,花在CPU上的時間很少,所以多線程也能提高很大效率
Process對象
multiprocessing.Process類類似於threading.Thread,涉及參數以及屬性方法如下
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
參數
- group 應該始終是 None,它僅用於兼容 threading.Thread
- target 是由 run() 方法調用的可調用對象,它默認為 None ,意味着什么都沒有被調用
- name 是進程名稱
- args 是目標調用的參數元組
- kwargs 是目標調用的關鍵字參數字典
- daemon 表是否為守護進程,為 True 或 False
方法
- run() 表示進程活動的方法。
- start() 啟動進程活動。每個進程對象最多只能調用一次
- join([timeout]) 如果可選參數 timeout 是 None (默認值),則該方法將阻塞,直到調用 join() 方法的進程終止。簡單說哪個子進程調用了join方法,主進程就要等該子進程執行完后才能繼續向下執行
- is_alive() 返回進程是否還活着
屬性
- pid 返回進程ID
- name 進程的名稱
- daemon 進程的守護標志,一個布爾值
- exitcode 子進程退出代碼。如果進程尚未終止,這將是 None 。
創建多進程
類似與多線程,創建方式都是差不多的
1.通過函數方式創建
import multiprocessing import time def run(sec): print('這是進程名字', multiprocessing.current_process().name) print('這是進程PID', multiprocessing.current_process().pid) time.sleep(sec) if __name__ == '__main__': print('這是主進程名字:', multiprocessing.current_process().name) print('這是主進程PID:', multiprocessing.current_process().pid) s_time = time.time() p1 = multiprocessing.Process(target=run, args=(1,)) p2 = multiprocessing.Process(target=run, args=(2,)) p3 = multiprocessing.Process(target=run, args=(3,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print('主進程結束', multiprocessing.current_process().name) print('一共用時', time.time()-s_time)
2.通過類來創建
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, sec): super(MyProcess, self).__init__() self.sec = sec def run(self): print('這是進程名字', multiprocessing.current_process().name) print('這是進程PID', multiprocessing.current_process().pid) time.sleep(self.sec) if __name__ == '__main__': print('這是主進程名字:', multiprocessing.current_process().name) print('這是主進程PID:', multiprocessing.current_process().pid) s_time = time.time() p1 = MyProcess(1) p2 = MyProcess(2) p3 = MyProcess(3) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print('主進程結束', multiprocessing.current_process().name) print('一共用時', time.time()-s_time)
進程間通信
進程是資源(CPU、內存等)分配的最小單位,每個進程有獨立的地址空間與系統資源,每啟動一個新的進程相當創建全局變量的一份副本,子進程里的數據修改無法影響到主進程以及其他子進程中的數據,不同子進程之間的數據也不能共享,這是多進程與多線程最明顯的區別
示例如下
import multiprocessing import time tmp = 0 class MyProcess(multiprocessing.Process): def __init__(self, q): super(MyProcess, self).__init__() self.q = q def run(self): global tmp tmp = tmp+self.q print('進程%s,tmp值為%d'%(multiprocessing.current_process().name,tmp)) if __name__ == '__main__': p1 = MyProcess(1) p2 = MyProcess(2) p1.start() p2.start() p1.join() p2.join() print('主進程%s,tmp值為%d'%(multiprocessing.current_process().name,tmp))
輸出為:
進程MyProcess-1,tmp值為1
進程MyProcess-2,tmp值為2
主進程MainProcess,tmp值為0
所以多進程之間數據獨立,通過id(tmp)也可以看出。但是對文件file這種存儲在硬盤中的資源讀寫操作,或者一些通過multiprocessing 下 Value, Array創建的共享變量,對於這些資源進程之間會存在競爭,如果要避免多進程訪問這種共享資源出現沖突,會使用進程鎖的方式
共享內存的創建如下,具體方法這里省略
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
進程鎖
進程鎖可以避免因為多個進程訪問共享資源而發生沖突,這里的共享資源不是像多線程中那樣的全局變量,上面已經說了普通的全局變量不會在進程間共享,而是系統中的文件或者console輸出這類系統的資源,還有特別的能在進程間通信的共享內存資源,這些能被進程競爭。
這里以文件為例,因為同一時間,只能有一個進程,對文件進行寫入操作,這是操作系統的設定。同時由操作系統隨機決定哪個進程來寫入操作
from multiprocessing import Process, Lock # 導入進程鎖 def f1(l,num): l.acquire() # 加鎖 f = open("file.txt", "a+") i = 10000 while i > 0: f.write("hello word %s\n" % i) i -= 1 print("process", num) f.close() l.release() # 釋放鎖 def f2(l,num): l.acquire() # 加鎖 f = open("file.txt", "a+") i = 10000 while i > 0: f.write("hello best word %s\n" % i) i -= 1 print("process", num) f.close() l.release() # 釋放鎖 if __name__ == "__main__": lock = Lock() # 定義鎖 p1 = Process(target=f1, args=(lock, 1,)) p2 = Process(target=f2, args=(lock, 2,)) p1.start() p2.start() p1.join() p2.join()
沒有加鎖的時候,會發現文檔中一個進程寫到一半,另一個進程的數據也寫入,是亂序的(可以將鎖去掉試試,這里不給出代碼了)
加了鎖后就會是正常一個進程寫完,再另一個進程繼續寫入
進程隊列Queue
Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
與線程中使用的隊列Queue區別:
1.from queue import Queue:是一種隊列模型數據結構,類似於普通列表,有先進先出模式,堆棧模式,優先級模式等
2.from multiprocessing import Queue:是多進程並發的Queue隊列,用於解決多進程間的通信問題。可以將對象序列化傳遞再進程間,普通Queue實現不了。
主要方法還是get()與put()
put(obj[, block[, timeout]]):將對象放入隊列。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
get([block[, timeout]]):從隊列取出一個對象,如果blocked為True(默認值),並且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。
close(): 指示當前進程將不會再往隊列中放入對象。一旦所有緩沖區中的數據被寫入管道之后,后台的線程會退出。這個方法在隊列被gc回收時會自動調用。
from multiprocessing import Process,Queue class book(): Storage = False def __init__(self,name): self.name = name def storagebook(self): Storage = True print("%s has storaged"%self.name) def worker1(q): n = 1 while n < 10: q.put(book("book-%d"%n)) n+=1 def worker2(q): while True: #qsize()返回隊列長度 if q.qsize() != 0: q.get().storagebook() else: break if __name__ == "__main__": q = Queue() p1 = Process(target=worker1, args=(q,)) p2 = Process(target=worker2, args=(q,)) p1.start() #這里先啟動生產書的進程 p2.start() p1.join() p2.join()
管道Pipe
通過multiprocessing.Pipe([duplex])會返回(conn1, conn2)一對Connection對象,代表一個管道的兩個端。
Pipe()有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。
send和recv方法分別是Connection對象的發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出EOFError。
Connection對象方法主要有:
- send(obj) 將一個對象發送到連接的另一端,可以用 recv() 讀取。發送的對象必須是可以序列化的,過大的對象 ( 接近 32MiB+ ,這個值取決於操作系統 ) 有可能引發 ValueError 異常。
- recv() 返回一個由另一端使用 send()發送的對象。該方法會一直阻塞直到接收到對象。 如果對端關閉了連接或者沒有東西可接收,將拋出 EOFError 異常。
- close() 關閉連接對象。當連接對象被垃圾回收時會自動調用。
- poll([timeout]) 返回連接對象中是否有可以讀取的數據。如果未指定 timeout ,此方法會馬上返回。如果 timeout 是一個數字,則指定了最大阻塞的秒數。如果 timeout 是 None ,那么將一直等待,不會超時。
from multiprocessing import Process,Pipe class book(): Storage = False def __init__(self,name): self.name = name def storagebook(self): Storage = True print("%s has storaged"%self.name) def worker1(p): n = 1 while n < 10: p.send(book("book-%d"%n)) n+=1 p.close() def worker2(p): while True: if p.poll(): #判斷還有沒有數據 p.recv().storagebook() else: break if __name__ == "__main__": conn1, conn2 = Pipe(duplex=False) p1 = Process(target=worker1, args=(conn2,)) p2 = Process(target=worker2, args=(conn1,)) p1.start() #這里先啟動生產書的進程 p2.start() p1.join() p2.join()
進程池Pool
可以使用multiprocessing.Pool實現簡單的多進程任務,進程池事先划分系統資源,並將資源分配給池中的進程,這些進程是創建Pool對象時已經創建及初始化好了的。當我們想創建新的進程任務時,新建的任務就可以直接取得Pool中的進程資源,而不用動態的從系統獲取新的資源。如果進程池中沒有可用的進程資源時,程序就會等待。
Pool類主要方法有:
- apply(): 直到得到結果之前一直阻塞。同步操作
- apply_async(): 這是 apply() 方法的一個變體,返回的是一個result對象。這是一個異步的操作,在所有的子類執行之前不會鎖住主進程。
- map(): 這是內置的 map() 函數的並行版本。在得到結果之前一直阻塞,此方法將可迭代的數據的每一個元素作為進程池的一個任務來執行。
- map_async(): 這是 map() 方法的一個變體,返回一個result對象。如果指定了回調函數,回調函數應該是callable的,並且只接受一個參數。當result准備好時會自動調用回調函數(除非調用失敗)。回調函數應該立即完成,否則,持有result的進程將被阻塞。
簡單使用進程池
import multiprocessing from multiprocessing import Process, Pool import time def func(sec): time.sleep(sec) print('當前進程:%s pid:%d' % (multiprocessing.current_process().name, multiprocessing.current_process().pid)) if __name__ == '__main__': print('主進程開始:%s' % multiprocessing.current_process().name) s_time = time.time() p = Pool(5) # 創建pool對象,5表示池中創建5個進程 for i in range(10): p.apply_async(func, args=(2,)) p.close() # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調用,否則會報ValueError: Pool is still running錯誤 p.join() # 等待進程池中的所有進程執行完畢 print('主進程結束:%s' % multiprocessing.current_process().name) print('一共用時: ', time.time()-s_time)
打印結果如下
可以看到我們創建的任務實際是用進程池里面的進程資源
使用callback
進程池中回調函數callback作用是:進程池中任何一個任務一旦處理完了,就立即告知主進程,主進程則調用一個函數去處理該結果,該函數即回調函數
使用callback的好處是可以將耗時的任務放在子進程中,等子進程有結果時再去通知主進程處理,實際就是異步操作的實現
把上面的例子改一下
import multiprocessing from multiprocessing import Process, Pool import time def func(sec): time.sleep(1) # sleep一秒是為了模擬阻塞的情況 print('當前進程:%s pid:%d' % (multiprocessing.current_process().name, multiprocessing.current_process().pid)) return {multiprocessing.current_process().name: sec} def func2(res): print('當前進程:%s pid:%d' % (multiprocessing.current_process().name, multiprocessing.current_process().pid)) print(res) if __name__ == '__main__': print('主進程開始:%s' % multiprocessing.current_process().name) s_time = time.time() p = Pool(5) # 創建pool對象,5表示池中創建5個進程 for i in range(10): p.apply_async(func, args=(i,), callback=func2) # 使用callback p.close() # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調用,否則會報ValueError: Pool is still running錯誤 p.join() # 等待進程池中的所有進程執行完畢 print('主進程結束:%s' % multiprocessing.current_process().name) print('一共用時: ', time.time()-s_time)
打印結果如下
AsyncResult對象
由Pool.apply_async()和Pool.map_async()返回的result實例對象的類,主要方法有
- get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發multiprocessing.TimeoutError。
- ready():如果調用完成,返回True
- successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發AssertionError異常
- wait([timeout]):等待結果變為可用。
示例如下
import multiprocessing from multiprocessing import Process, Pool import time def func(sec): time.sleep(sec) print('當前進程:%s pid:%d' % (multiprocessing.current_process().name, multiprocessing.current_process().pid)) return {multiprocessing.current_process().name: sec} if __name__ == '__main__': print('主進程開始:%s' % multiprocessing.current_process().name) s_time = time.time() p = Pool(5) # 創建pool對象,5表示池中創建5個進程 result = [] for i in range(10): result.append(p.apply_async(func, args=(i,))) p.close() # 關閉進程池,防止將任何其他任務提交到池中。需要在join之前調用,否則會報ValueError: Pool is still running錯誤 p.join() # 等待進程池中的所有進程執行完畢 for res in result: print(res.get()) #get()獲取返回值 print('主進程結束:%s' % multiprocessing.current_process().name) print('一共用時: ', time.time()-s_time)
與使用callback不同的是,這個需要子進程都結束后,才能在主進程中處理
map用法
進程池中map()方法等價於內置函數map(func, *iterables),也是一種創建進程任務的簡化方法
內置函數map()是對iterables依次執行func(item),將執行結果組成一個 List 返回(python2是list,python3是map對象)
進程池中map()則是返回list,而map_async()返回的是AsyncResult的變體,要通過get()得到list
import multiprocessing from multiprocessing import Process, Pool import time def func(sec): time.sleep(sec) print('當前進程:%s pid:%d' % (multiprocessing.current_process().name, multiprocessing.current_process().pid)) return {multiprocessing.current_process().name: sec} if __name__ == '__main__': print('主進程開始:%s' % multiprocessing.current_process().name) s_time = time.time() p = Pool(5) result = p.map_async(func, range(10)) # map創建任務 p.close() p.join() for res in result.get(): # map對象使用get方法返回list print(res) print(result.get()) print('主進程結束:%s' % multiprocessing.current_process().name) print('一共用時: ', time.time()-s_time)
參考:https://docs.python.org/zh-cn/3/library/multiprocessing.html 以及部分網上資料