一、multiprocessing.Process模塊簡介:
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
參數解釋:
group:默認是None,這個參數是為實現ThreadGroup類時的擴展保留,所以它應是None,
target:默認是None,target的值(值是進程要執行的任務,所以值應該是一個函數名,即一個要被執行的任務)是run()方法調用的對象。
name:默認是None,設置進程的名字。
args:是target=函數名,這個函數的位置參數,元組形式。
kwargs:是target=函數名,這個函數的默認參數,字典形式。
daemon:如果是True表示創建的是守護進程。False非守護進程。
常用方法:
run():開啟進程,可以在子類中重寫此方法。run()方法會調用構造函數中target指定的函數,函數的位置參數和關鍵字參數分別取自args和kwargs參數。
start():實例化對象調用此方法開啟線程,每個實例化對象只能調用一次start方法,該方法會調用run()開啟線程。
join([timeout]):默認是None,阻塞狀態,會等待進程結束后才會執行下面的代碼。如果指定timeout(單位是秒),程序會阻塞timeout秒后在執行下面的代碼。如果其進程終止或方法超時,該方法將返回None。
name:如果在實例化對象時沒有設置進程名,可以使用進程對象.name=進程名,設置進程名。
is_alive():進程是否存活。如果存活返回True,否則返回False
daemon:如果為True是守護進程,如果為False非守護進程,一定要在調用start()方法前調用才生效。注意,守護進程不允許創建子進程。否則,如果守護進程在其父進程退出時終止,則它的子進程將成為孤兒。
pid:返回進程的pid。
exitcode:返回進程退出碼,如果是None表示進程正在執行,如果是0表示進程正常執行完成,如果是負數進程時被信號終止了。
authkey:返回進程的身份驗證密鑰(字節字符串)。初始化多處理時,使用os.urandom()為主進程分配一個隨機字符串。創建進程對象時,它將繼承其父進程的身份驗證密鑰,但可以通過將authkey設置為另一個字節字符串來更改。
terminate():終止進程。在Unix上,這是使用SIGTERM信號完成的;在Windows上,使用TerminateProcess()。請注意,退出處理程序和finally子句等將不會被執行。進程的子進程不會終止,它們只會成為孤立進程。如果關聯的進程使用管道或隊列時使用此方法,則管道或隊列可能會損壞,其他進程可能無法使用。類似地,如果進程獲得了鎖或信號量等,那么終止它很可能導致其他進程死鎖。
kill():與terminate()相同,但是在Unix上使用SIGKILL信號。適用python3.7版本。
close():關閉進程對象,釋放與之關聯的所有資源。如果底層進程仍在運行,則會引發ValueError。一旦close()成功返回,進程對象將不能調用方法和屬性否則將引發ValueError。適用python3.7。
參考文檔:
https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing
二、multiprocessing.Process使用示例
不帶參數:
from multiprocessing import Process def f(): print("子進程") if __name__ == "__main__": p = Process(target=f) # 不帶參數 p.start() print("執行主進程內容") # 打印內容如下 執行主進程內容 子進程
帶參數:
from multiprocessing import Process def f(name,a=10,b=20): print(f"{a}+{b}的{name}是{a+b}") if __name__ == "__main__": p = Process(target=f,args=('和',),kwargs={'a':1,'b':2}) # 帶參數 p.start() print("執行主進程內容") # 打印內容如下 執行主進程內容 1+2的和是3
從打印結果我們可以看出程序先執行了主進程的print,之后才執行了子進程的print。這里主要是因為操作系統在開辟進程時需要花費一定的時間,所以程序在這段時間里,先執行了主進程的print,然后才執行子進程print。可以使用join()方法來等待子進程結束后在執行主線程代碼。如下:
from multiprocessing import Process def f(name,a=10,b=20): print(f"{a}+{b}的{name}是{a+b}") if __name__ == "__main__": p = Process(target=f,args=('和',),kwargs={'a':1,'b':2}) p.start() p.join() # 等待子進程結束后在執行下面的代碼 print("執行主進程內容") # 打印內容如下 1+2的和是3 執行主進程內容
第二種方式創建進程。很少用
from multiprocessing import Process class MyProcess(Process): # 繼承Process類 # 這里必須要調用Process中的init初始化參數 # 否則會因為無法傳參導致錯誤 def __init__(self,buf,a=10,b=20): self.buf = buf self.a = a self.b = b super().__init__() # 必須有 def run(self): # 重寫run()方法,必須自己實現 self.f(self.buf,self.a,self.b) def f(self,buf, a=10, b=20): print(f"{a}+{b}的{buf}是{a + b}") if __name__ == "__main__": p = MyProcess('和',a=1,b=2) p.start() p.join() # 等待子進程結束 print("我是主進程") # 打印內容如下 1+2的和是3 我是主進程
獲取進程PID號:
from multiprocessing import Process import os def f(): # 使用OS模塊獲取進程和父進程的PID print(f"父進程PID:{os.getppid()},子進程PID:{os.getpid()}") if __name__ == "__main__": p = Process(target=f,args=()) p.start() print('子進程PID:',p.pid) # 使用進程對象獲取進程PID # 打印內容如下 子進程PID: 6108 父進程PID:5320,子進程PID:6108
創建多個進程:
from multiprocessing import Process def f(process_name): print(f"{process_name}") if __name__ == "__main__": # 因為每個Process對象只能調用一次start方法 # 所以想要開幾個進程,就要創建多少個Process對象 for i in range(3): p = Process(target=f, args=("子進程-"+str(i),)) p.start() print("主進程") # 打印內容如下 主進程 子進程-0 子進程-2 子進程-1
我們會發現主進程比所有子進程都優先執行了,而且子進程也都不是按照順序執行的,這主要是因為開啟進程的耗時和操作系統的調度來決定的。
示例一:使用串行的方式開啟多個進程
from multiprocessing import Process import time def f(process_name): time.sleep(1) print(f"{process_name}") if __name__ == "__main__": start_time = time.time() for i in range(3): p = Process(target=f, args=("子進程-"+str(i),)) p.start() p.join() # 等待進程結束 end_time = time.time() print(f"執行了{end_time - start_time}") print('結束進程') # 打印內容如下 子進程-0 子進程-1 子進程-2 執行了3.326190233230591 結束進程
從打印結果我們可以看出是所有子進程運行后才執行了主進程。但是發現會很慢,這是因為我們的join把原本應多進程並行的程序(異步),變成了串行(同步),必須等待一個子進程結束后才會執行下一個子進程。這樣有可能違背了我們多進程並行的初衷,所以我們調整下join的位置。
# 下面請看示例二:
from multiprocessing import Process import time def f(process_name): print(f"{process_name}") if __name__ == "__main__": start_time = time.time() pro_list = [] # 存放進程對象 for i in range(3): p = Process(target=f, args=("子進程-"+str(i),)) p.start() pro_list.append(p) # 將進程對象添加到一個列表中 for i in pro_list: # 循環等待所有進程結束 i.join() end_time = time.time() print(f"執行了{end_time - start_time}") print('進程結束') # 打印內容如下 子進程-1 子進程-0 子進程-2 執行了0.17200994491577148 進程結束
對比示例一和示例二我們可以明顯發現示例二真正實現了多個進程的並發效果。
守護進程有兩個特性:
1、守護進程會在主進程代碼執行結束后就終止。
2、守護進程內無法再開啟子進程,否則拋出異常。
創建守護進程比較簡單如下:
from multiprocessing import Process import time def f(): time.sleep(1) print("守護進程") if __name__ == "__main__": p = Process(target=f,args=()) p.daemon=True # 開啟守護進程,一定要在start前執行。 p.start() print("主進程") # 打印內容如下 主進程
我們發現守護進程並沒有被執行,或者說還沒來得及執行就結束了,我們知道操作系統在開啟進程時要花費一定時間,在這個時間內主進程代碼執行完了,所以守護進程還沒來得及執行就結束了。可以使用join來等待守護進程執行完畢后在結束主進程
from multiprocessing import Process import time def f(): time.sleep(1) print("守護進程") if __name__ == "__main__": p = Process(target=f,args=()) p.daemon=True # 開啟守護進程,一定要在start前執行。 p.start() p.join() # 等待守護進程結束 print("主進程") # 打印內容如下 守護進程 主進程
進程鎖:
為保證數據的安全性,在有些場合要使用進程鎖,進程鎖會使由原來的並行變成串行,程序效率會下降,但是卻保證了數據的安全性,在數據安全性和程序效率面前,數據的安全性是大於程序的效率的。
下面以搶票為例,現在票數還有一張:

from multiprocessing import Process import time,json def search(name): # 查票 di = json.load(open("db")) print(f"{name}查票,剩余票數{di['count']}") def get(name): # 購票 di = json.load(open("db")) time.sleep(0.1) if di["count"] > 0: di["count"] -= 1 time.sleep(0.2) json.dump(di,open("db","w")) print(f"{name}購票成功") def task(name): search(name) get(name) if __name__ == "__main__": for i in range(5): # 只模擬5個人搶一張票 p = Process(target=task,args=("游客-"+str(i),)) p.start() # 打印結果如下 游客-2查票,剩余票數1 游客-1查票,剩余票數1 游客-0查票,剩余票數1 游客-4查票,剩余票數1 游客-3查票,剩余票數1 游客-2購票成功 游客-1購票成功 游客-0購票成功 游客-4購票成功 游客-3購票成功
所有人全部購票成功,這就對數據的安全性提出了挑戰。本來只有一張票,但是5個人都顯示購票成功,這當然不是我們想要的結果,問題的原因在於,所有的游客在差不多同一時間都進行了購票,大家看到的票數都是1張,第一個用戶購票后,將票數減1等於0還沒來得及將結果寫入文件,其它用戶也進行了購票的操作,在余票0被寫入文件的過程中,其它用戶也購票成功,並將結果寫入文件,造成了數據的混亂。
這里我們使用進程鎖Lock也叫互斥鎖,來解決問題。
from multiprocessing import Process,Lock import time,json def search(name): # 查票 di = json.load(open("db")) print(f"{name}查票,剩余票數{di['count']}") def get(name): # 購票 di = json.load(open("db")) time.sleep(0.1) if di["count"] > 0: di["count"] -= 1 time.sleep(0.2) json.dump(di,open("db","w")) print(f"{name}購票成功") def task(name,lock): search(name) # 查票 lock.acquire() # 加鎖 get(name) # 購票 lock.release() # 解鎖 if __name__ == "__main__": lock = Lock() # 獲取鎖 for i in range(5): # 只模擬5個人搶一張票 p = Process(target=task,args=("游客-"+str(i),lock)) p.start() # 打印內容如下 游客-0查票,剩余票數1 游客-1查票,剩余票數1 游客-2查票,剩余票數1 游客-3查票,剩余票數1 游客-4查票,剩余票數1 游客-0購票成功
在購票時加一個互斥鎖,這樣一個進程在購票時,其它的進程只能查看就不能進行購票的操作了,保證了數據的安全性,最終結果是正確的,這就是為什么我們明明看到有票,但是點擊購買后卻說沒票的原因,雖然加鎖后使原本並行的程序,變成了串行。但我們要知道在不能保證數據安全的情況下一切效率都是空談。
進程間的通信IPC(Inter-Process Communication)隊列
隊列:Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
隊列的常用方法:
Queue([maxsize]):創建共享的進程隊列。maxsize是隊列中允許的最大數值。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get([block[,timeout]]):返回q中的一個項目。如果隊列為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True.如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( ) 同q.get(False)方法。
q.put(item[,block[,timeout]]):將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。
q.qsize():返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty():如果調用此方法時q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full() :如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法時,后台線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread():不會再進程退出時自動連接后台線程。這可以防止join_thread()方法阻塞。
q.join_thread():連接隊列的后台線程。此方法用於在調用q.close()方法后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
下面我們已生產者消費者模型來進行演示:
from multiprocessing import Process,Queue import time,random def consumer(name,q): # 消費者 while True: task = q.get() # 從隊列中取出數據 if task == None:break print(f"{name}獲取數據{task}") time.sleep(random.random()) # 消費者效率比生產者效率高 def producer(name,q): # 生產者 for i in range(3): q.put(i) # 向對列中添加數據 print(f"{name}生產數據{i}") time.sleep(random.uniform(1,2)) # 模擬生產者的效率沒有消費者效率高 if __name__ == "__main__": q = Queue() # 獲取一個隊列 pro = [] for i in range(3): # 開啟生產者進程 p = Process(target=producer,args=("生產者"+str(i),q)) p.start() pro.append(p) # 開啟消費者進程 p1 = Process(target=consumer,args=("aaa",q)) p2 = Process(target=consumer,args=("bbb",q)) p1.start() p2.start() for i in pro: # 等待生產者結束 i.join() q.put(None) # 有幾個消費者進程,就put幾次None q.put(None)
JoinableQueue([maxsize]) 模塊
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join():生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
我們在來實現上述的生產者消費者模型。
from multiprocessing import Process,JoinableQueue import time,random def consumer(name,q): # 消費者 while True: task = q.get() # 從隊列中取出數據 q.task_done() # 通知生產者,我已經取完所有數據了 print(f"{name}獲取數據{task}") time.sleep(random.random()) # 消費者效率比生產者效率高 def producer(name,q): # 生產者 for i in range(1): q.put(i) # 向對列中添加數據 print(f"{name}生產數據{i}") time.sleep(random.uniform(1,2)) # 模擬生產者的效率沒有消費者效率高 q.join() # 生產完畢,等待消費者通知數據已經獲取完了 if __name__ == "__main__": q = JoinableQueue() # 獲取一個隊列 pro = [] for i in range(1): # 開啟生產者進程 p = Process(target=producer,args=("生產者"+str(i),q)) p.start() pro.append(p) # 開啟消費者進程 p1 = Process(target=consumer,args=("aaa",q)) p2 = Process(target=consumer,args=("bbb",q)) p1.daemon=True # 如果不設置守護進程,這兩個進程就不會結束。 p2.daemon=True # 因為他們只是通知生產者我接收到所有數據了,並沒有終止循環。 p1.start() p2.start() for i in pro: # 等待生產者結束 i.join()
這里再次說明將消費者設置成守護進程的原因,q.task_done它只是通知生產者,我把數據已經都取完了,僅此而已,所以while循環並不會退出。如果不設置守護進程,程序會卡在while循環里。
進程池
進程池就是預先創建一個進程組,然后有任務時從池中分配一個進程去執行任務。當任務數量超過進程池的數量時,就必須等待進程池中有空閑的進程時,才能利用空閑的進程去執行任務。
進程池的優點:
1、充分利用CPU資源。
2、多個進程在同一時刻可以同時執行,達到了並行的效果。
進程池的缺點:進程的創建、銷毀需要耗費CPU的時間。多進程適用於需要復雜計算少I/O阻塞的情況。如果程序不涉及復雜運算,最好是使用線程池。
關於進程池multiprocessing.Pool的一些方法:
apply(func [, args [, kwargs]]):需要注意的是:apply屬於進程同步的操作,即必須等待一個進程結束后才能執行下一個進程。
apply_async(func[,args[,kwargs]]):apply_async屬於進程的異步操作,所有進程可以同時執行,此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將立即傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。
p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成。
P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用。
同步進程池的示例:
import os,time from multiprocessing import Pool def work(n): print("PID:%s run" %os.getpid()) time.sleep(1) return n ** 2 if __name__ == "__main__": p = Pool(3) # 開啟進程池 res = [] for i in range(3): res.append(p.apply(work,args=(i,))) # 進程同步模式 print(res) # 打印返回結果 # 打印內容如下 PID:6180 run PID:9728 run [0, 1, 4]
因為是進程池的同步,所以進程時的執行順序是有序的,並且必須一個進程執行后才執行下一個進程。
進程池的異步示例:
import os,time from multiprocessing import Pool def work(n): print("PID:%s run" %os.getpid()) time.sleep(1) return n ** 2 if __name__ == "__main__": p = Pool(3) # 開啟進程池 res = [] for i in range(5): # 進程異步模式 res.append(p.apply_async(work,args=(i,))) # 因為是異步,所以開進程會很快 # 所以我們所有進程結束后在打印結果 p.close() # 關閉進程池 p.join() # 等待進程池結束, for i in res: print(i.get(),end=" ") # 打印內容如下 PID:7512 run PID:10176 run PID:7240 run PID:10176 run PID:7512 run 0 1 4 9 16
關於進程池有個問題需要注意,在子進程中不能使用input函數()
下一篇:線程理論部分:https://www.cnblogs.com/caesar-id/p/10758189.html
