前言
在上一篇【python進階】深入理解系統進程1中,我們講述了多任務的一些概念,多進程的創建,fork等一些問題,這一節我們繼續接着講述系統進程的一些方法及注意點
multiprocessing
如果你打算編寫多進程的服務程序,Unix/Linux⽆疑是正確的選擇。由於 Windows沒有fork調⽤,難道在Windows上⽆法⽤Python編寫多進程的程 序?
由於Python是跨平台的,⾃然也應該提供⼀個跨平台的多進程⽀持。 multiprocessing模塊就是跨平台版本的多進程模塊。
multiprocessing模塊提供了⼀個Process類來代表⼀個進程對象,下⾯的例⼦ 演示了啟動⼀個⼦進程並等待其結束:
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('子進程運行中,name= %s ,pid=%d...' % (name, os.getpid())) if __name__=='__main__': print('父進程 %d.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('子進程將要執行') p.start() p.join() print('子進程已結束')
運行結果:
說明
- 創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動,這樣創建進程比fork()還要簡單。
- join()方法可以等待子進程結束后再繼續往下運行,通常用於進程間的同步。
Process語法結構如下:
Process([group [, target [, name [, args [, kwargs]]]]])
-
target:表示這個進程實例所調用對象;
-
args:表示調用對象的位置參數元組;
-
kwargs:表示調用對象的關鍵字參數字典;
-
name:為當前進程實例的別名;
-
group:大多數情況下用不到;
Process類常用方法:
-
is_alive():判斷進程實例是否還在執行;
-
join([timeout]):是否等待進程實例執行結束,或等待多少秒;
-
start():啟動進程實例(創建子進程);
-
run():如果沒有給定target參數,對這個對象調用start()方法時,就將執行對象中的run()方法;
-
terminate():不管任務是否完成,立即終止;
Process類常用屬性:
-
name:當前進程實例別名,默認為Process-N,N為從1開始遞增的整數;
-
pid:當前進程實例的PID值;
實例1
from multiprocessing import Process import os from time import sleep # 子進程要執行的代碼 def run_proc(name, age, **kwargs): for i in range(10): print('子進程運行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid())) print(kwargs) sleep(0.5) if __name__=='__main__': print('父進程 %d.' % os.getpid()) p = Process(target=run_proc, args=('test',18), kwargs={"m":20}) print('子進程將要執行') p.start() sleep(1) p.terminate() p.join() print('子進程已結束')
運行結果:
實例2
from multiprocessing import Process import time import os #兩個子進程將會調用的兩個方法 def worker_1(interval): print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(),os.getpid())) t_start = time.time() time.sleep(interval) #程序將會被掛起interval秒 t_end = time.time() print("worker_1,執行時間為'%0.2f'秒"%(t_end - t_start)) def worker_2(interval): print("worker_2,父進程(%s),當前進程(%s)"%(os.getppid(),os.getpid())) t_start = time.time() time.sleep(interval) t_end = time.time() print("worker_2,執行時間為'%0.2f'秒"%(t_end - t_start)) #輸出當前程序的ID print("進程ID:%s"%os.getpid()) #創建兩個進程對象,target指向這個進程對象要執行的對象名稱, #args后面的元組中,是要傳遞給worker_1方法的參數, #因為worker_1方法就一個interval參數,這里傳遞一個整數2給它, #如果不指定name參數,默認的進程對象名稱為Process-N,N為一個遞增的整數 p1=Process(target=worker_1,args=(2,)) p2=Process(target=worker_2,name="dongGe",args=(1,)) #使用"進程對象名稱.start()"來創建並執行一個子進程, #這兩個進程對象在start后,就會分別去執行worker_1和worker_2方法中的內容 p1.start() p2.start() #同時父進程仍然往下執行,如果p2進程還在執行,將會返回True print("p2.is_alive=%s"%p2.is_alive()) #輸出p1和p2進程的別名和pid print("p1.name=%s"%p1.name) print("p1.pid=%s"%p1.pid) print("p2.name=%s"%p2.name) print("p2.pid=%s"%p2.pid) #join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成后, #再繼續執行下面的語句,一般用於進程間的數據同步,如果不寫這一句, #下面的is_alive判斷將會是True,在shell(cmd)里面調用這個程序時 #可以完整的看到這個過程,大家可以嘗試着將下面的這條語句改成p1.join(1), #因為p2需要2秒以上才可能執行完成,父進程等待1秒很可能不能讓p1完全執行完成, #所以下面的print會輸出True,即p1仍然在執行 p1.join() print("p1.is_alive=%s"%p1.is_alive())
執行結果:
進程的創建-Process子類
創建新的進程還能夠使用類的方式,可以自定義一個類,繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象,請看下面的實例:
from multiprocessing import Process import time import os #繼承Process類 class Process_Class(Process): #因為Process類本身也有__init__方法,這個子類相當於重寫了這個方法, #但這樣就會帶來一個問題,我們並沒有完全的初始化一個Process類,所以就不能使用從這個類繼承的一些方法和屬性, #最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作 def __init__(self,interval): Process.__init__(self) self.interval = interval #重寫了Process類的run()方法 def run(self): print("子進程(%s) 開始執行,父進程為(%s)"%(os.getpid(),os.getppid())) t_start = time.time() time.sleep(self.interval) t_stop = time.time() print("(%s)執行結束,耗時%0.2f秒"%(os.getpid(),t_stop-t_start)) if __name__=="__main__": t_start = time.time() print("當前程序進程(%s)"%os.getpid()) p1 = Process_Class(2) #對一個不包含target屬性的Process類執行start()方法,就會運行這個類中的run()方法,所以這里會執行p1.run() p1.start() p1.join() t_stop = time.time() print("(%s)執行結束,耗時%0.2f"%(os.getpid(),t_stop-t_start))
執行結果:
進程池Pool
當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來執行,請看下面的實例:
from multiprocessing import Pool import os,time,random def worker(msg): t_start = time.time() print("%s開始執行,進程號為%d"%(msg,os.getpid())) #random.random()隨機生成0~1之間的浮點數 time.sleep(random.random()*2) t_stop = time.time() print(msg,"執行完畢,耗時%0.2f"%(t_stop-t_start)) po=Pool(3) #定義一個進程池,最大進程數3 for i in range(0,10): #Pool.apply_async(要調用的目標,(傳遞給目標的參數元祖,)) #每次循環將會用空閑出來的子進程去調用目標 po.apply_async(worker,(i,)) print("----start----") po.close() #關閉進程池,關閉后po不再接收新的請求 po.join() #等待po中所有子進程執行完成,必須放在close語句之后 print("-----end-----")
運行結果:
multiprocessing.Pool常用函數解析:
-
apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;
-
apply(func[, args[, kwds]]):使用阻塞方式調用func
-
close():關閉Pool,使其不再接受新的任務;
-
terminate():不管任務是否完成,立即終止;
-
join():主進程阻塞,等待子進程的退出, 必須在close或terminate之后使用;
apply堵塞式
from multiprocessing import Pool import os,time,random def worker(msg): t_start = time.time() print("%s開始執行,進程號為%d"%(msg,os.getpid())) #random.random()隨機生成0~1之間的浮點數 time.sleep(random.random()*2) t_stop = time.time() print(msg,"執行完畢,耗時%0.2f"%(t_stop-t_start)) po=Pool(3) #定義一個進程池,最大進程數3 for i in range(0,10): po.apply(worker,(i,)) print("----start----") po.close() #關閉進程池,關閉后po不再接收新的請求 po.join() #等待po中所有子進程執行完成,必須放在close語句之后 print("-----end-----")
運行結果:
進程間通信-Queue
Process之間有時需要通信,操作系統提供了很多機制來實現進程間的通信。
1. Queue的使用
可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue本身是一個消息列隊程序,首先用一個小實例來演示一下Queue的工作原理:
from multiprocessing import Queue q=Queue(3) #初始化一個Queue對象,最多可接收三條put消息 q.put("消息1") q.put("消息2") print(q.full()) #False q.put("消息3") print(q.full()) #True #因為消息列隊已滿下面的try都會拋出異常,第一個try會等待2秒后再拋出異常,第二個Try會立刻拋出異常 try: q.put("消息4",True,2) except: print("消息列隊已滿,現有消息數量:%s"%q.qsize()) try: q.put_nowait("消息4") except: print("消息列隊已滿,現有消息數量:%s"%q.qsize()) #推薦的方式,先判斷消息列隊是否已滿,再寫入 if not q.full(): q.put_nowait("消息4") #讀取消息時,先判斷消息列隊是否為空,再讀取 if not q.empty(): for i in range(q.qsize()): print(q.get_nowait())
運行結果:
說明
初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,或數量為負值,那么就代表可接受的消息數量沒有上限(直到內存的盡頭);
-
Queue.qsize():返回當前隊列包含的消息數量;
-
Queue.empty():如果隊列為空,返回True,反之False ;
-
Queue.full():如果隊列滿了,返回True,反之False;
-
Queue.get([block[, timeout]]):獲取隊列中的一條消息,然后將其從列隊中移除,block默認值為True;
1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果為空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為止,如果設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出"Queue.Empty"異常;
2)如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常;
-
Queue.get_nowait():相當Queue.get(False);
-
Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;
1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;
2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常;
- Queue.put_nowait(item):相當Queue.put(item, False);
2. Queue實例
我們以Queue為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue里讀數據:
from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): while True: if not q.empty(): value = q.get(True) print('Get %s from queue.' % value) time.sleep(random.random()) else: break if __name__=='__main__': # 父進程創建Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子進程pw,寫入: pw.start() # 等待pw結束: pw.join() # 啟動子進程pr,讀取: pr.start() pr.join() # pr進程里是死循環,無法等待其結束,只能強行終止: print('') print('所有數據都寫入並且讀完')
運行結果:
3. 進程池中的Queue
如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的實例演示了進程池中的進程如何通信:
#修改import中的Queue為Manager from multiprocessing import Manager,Pool import os,time,random def reader(q): print("reader啟動(%s),父進程為(%s)"%(os.getpid(),os.getppid())) for i in range(q.qsize()): print("reader從Queue獲取到消息:%s"%q.get(True)) def writer(q): print("writer啟動(%s),父進程為(%s)"%(os.getpid(),os.getppid())) for i in "dongGe": q.put(i) if __name__=="__main__": print("(%s) start"%os.getpid()) q=Manager().Queue() #使用Manager中的Queue來初始化 po=Pool() #使用阻塞模式創建進程,這樣就不需要在reader中使用死循環了,可以讓writer完全執行完成后,再用reader去讀取 po.apply(writer,(q,)) po.apply(reader,(q,)) po.close() po.join() print("(%s) End"%os.getpid())
運行結果: