一、守護進程:
借助 daemon=True,必須在被守護的進程開啟之前寫上 會讓被守護的子進程隨着主進程的結束而結束
start 開啟進程
join 阻塞進
舉例守護進程,異步阻塞
import time from multiprocessing import Process def func(): #設置要守護的函數 print('~' * 10) time.sleep(15) #讓子進程執行時間是十五秒 ,對比主進程10秒之后,是否會打印 --》 '@'*20 print('@'*20) #打印標識; def compire(): ##設置對比的函數 while True: #守護之后,會在主進程10秒之后也跟着結束 time.sleep(1) print('過去1秒') if __name__=='__main__': p=Process(target=func) p.daemon=True #守護第一個子進程 p.start() #開啟第一個子進程 c=Process(target=compire) c.daemon=True #守護第二個子進程 c.start() #開啟第二個子進程 for i in range(100): time.sleep(0.1) print('*'*i) # 讓主進程十秒后結束
總結:
""" 守護進程,就是能夠在主進程結束之后,子進程無論是循環還是有其他沒有執行的內容,都不會執行了 如果有兩個子進程,只保護其中一個,則兩另一個子進程不會守護影響,會繼續執行 進程守護要寫在,start 之前 被守護進程中不能再開啟子進程 """
二、進程中的其他方法:
pid查看進程ip
name查看進程名字
terminate 終結一個進程
is_alive() 查看一個進程是否活着,返回True False

import os import time from multiprocessing import Process def func(p): #由於主進程已經關閉了子進程,所以子進程不會再執行了 print('%s子進程ip'%os.getpid()) #子進程ip if __name__ == '__main__': p = Process(target=func) p.start() print(p.name,p.pid) #打印進程名字和id Process-1 8220 p.name = '進程名改啦' print(p.name) #進程名會更改 p.terminate() #異步,主進程發出關閉子進程命令,交給操作系統執行,至於什么時候執行,主進程並不關心,如果操作系統不是立馬執行了,下面打印子進程可能還活着。 print(p.is_alive()) #True time.sleep(2) #睡兩秒,操作系統一定是已經執行完了終結命令,下面再判斷進程是否活着,就是False print(p.is_alive()) #False
三、給進程加鎖 lock
# 在異步中,子進程的執行不受主進程是控制,當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
比如售票系統,在同一時間,多個人(子進程)買一張票,就會造成混亂,這時候就出現了鎖的概念
#lock #由並發變成了串行,犧牲了運行效率,但避免了競爭,保證了數據的安全。 from multiprocessing import Lock #導入 l=Lock() #制作一把鎖 ,只有一把鑰匙 l.acquire() #需要一把鑰匙 l.acquire() #又需要一把鑰匙 但是已經被拿走了,所以會阻塞在這 l.release() #把鑰匙還回去了,別人就可以來取鑰匙了

from multiprocessing import Lock from multiprocessing import process import random import json import time #首先建立一個‘piao’ 的text,手動存入字符串形式的字典{"count":4} 代表有四張火車票 def check_ticket(i): with open ('piao','r',encoding='utf-8') as f : ret=json.load(f)['count'] print('%s進來了,還有%d張票'%(i,ret)) def get_ticket(i): with open ('piao','r',encoding='utf-8') as f : ret = json.load(f)['count'] #讀文件夾里,還有幾張票 time.sleep(random.random()) if ret>0: #如果還有票,就可以買票 with open ('piao','w',encoding='utf-8') as f: json.dump({'count':ret-1},f) #每買走一張票,就把文件夾里票的數量減去1 print('%s買走了一張票,還剩%s張票'%(i,ret-1)) else: print('沒有票了') def task(i,l): #建立任務子進程 check_ticket(i) #檢查是誰進來了,還有幾張票的 l.acquire() #如果還有票,就讓他那一把鑰匙 get_ticket(i) #買走一張票,這時候其他人都在等候 l.release() #買完票了還鑰匙 # # if __name__=='__main__': l=Lock() #在建立子進程之前就把鎖做好 for i in range(10): #循環建立子進程,模擬有10個人一起買票 p= Process(target=task,args=(i,l)) p.start()
四、信號量 multiprocess.Semaphoren 是同時允許一定數量的線程更改數據 .也就是可以指定有幾把鑰匙
from multiprocessing import Semaphore #導入 ˈseməˌfôr s=Semaphore(2) #設置有2把鑰匙 s.acquire() #需要一把鑰匙 print(1) #打印1,說明成功拿走一把鑰匙 s.acquire() #需要一把鑰匙 print(2) #打印2,說明成功拿走一把鑰匙 s.release() #只有還了一把鑰匙,下面才能打印出3 s.acquire() #需要一把鑰匙 print(3) #這里並沒有打印出來3 因為只有兩把鑰匙,當都被拿走了 ,再來拿就會被阻塞 知直到有人還了鑰匙
模擬迷你唱吧, 只能容納四個人,出一個人,才能進一個人

from multiprocessing import Semaphore from multiprocessing import Process import random import time def sing_bar(i,s): s.acquire() print('%s進來了'%i) time.sleep(random.random()) #隨機某個時間,讓一個人走 print('%s走了'%i) s.release() if __name__=='__main__': s=Semaphore(4) #模擬唱吧只能容納4個人 for i in range(10): #模擬十個人一起去唱吧 p=Process(target=sing_bar,args=(i,s)) p.start()
五、進程之間的通信:基於IPC(Inter-Process Communication)協議
(1)事件
#事件:是主進程控制 多個 子進程的執行 from multiprocessing import Event #導入事件 s=Event() #建立一個信號燈,默認是紅燈 s.wait() #紅燈會阻塞 s.set() #紅燈變綠燈 s.clear() #清除綠燈 -->即綠燈變紅燈 s.is_set() # 是否阻塞 True就是綠燈 False就是紅燈 #機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時#就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。 #clear:將“Flag”設置為False #set:將“Flag”設置為True
模擬卡車過紅綠燈,卡車是隨機來,紅綠燈每3秒替換一次

from multiprocessing import Event import random import time from multiprocessing import Process def light(e): while True: #讓紅 綠 燈循環替換 if e.is_set(): # 是否阻塞 True就是綠燈 False就是紅燈 time.sleep(2) e.clear() # 阻塞 綠變紅 print('紅燈亮了') else: time.sleep(2) e.set() #阻塞變非阻塞 紅變綠 print('綠燈亮了') def car(i,e): e.wait() #綠燈會通過,紅燈就等待 print('%s車過了' % i) if __name__=='__main__': e=Event() p=Process(target=light,args=(e,)) #啟動 新線程控制紅綠燈 p.start() for i in range(20): #20輛卡車 if i%6==0: #如果整除6 那么就啟動一個卡車子進程 time.sleep(random.randint(1,3)) #啟動前 隨機等1-3秒 car_p = Process(target=car, args=(i,e)) car_p.start()
(2)對列
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞
from multiprocessing import Queue #導入對列,提供get 和 put 方法 put 幾個,get幾個,get完了就沒有了,再get 就會阻塞,且按照put的順序依次get q=Queue() q.put(1) #往對列里面放 q.put(2) print(q.get()) # 從對列里往外拿 print(q.get()) q.put(3) print(q.get()) q=Queue(4) #可以設置參數,意味着對列里只能放四個值 #q.qsize() #得出對列長度 #通過隊列實現了 主進程與子進程的通信 子進程與子進程之間的通信

from multiprocessing import Queue from multiprocessing import Process import time def producer(q): for i in range(20): #准備生產20個包子 q.put('%s個包子'%i) print(q) def consumer(q): for i in range(10): #一個消費者一次get10個包子,吃的速度慢與生產 time.sleep(1) print(q.get()) if __name__=='__main__': q=Queue(10) #設置對托盤(對列)只允許放10個包子 p=Process(target=producer,args=(q,)) p.start() time.sleep(1) c1 = Process(target=consumer, args=(q,)) #一個消費者get十個包子 c1.start() # c2 = Process(target=consumer, args=(q,)) #增加一個消費者,就可以把20個包子吃完,生產和消費就平衡了 # c2.start()
生產者消費者模型進階:由於正常情況下,消費者是不知道生產者生產多少,這時候 怎么實現雙向通信呢?這里有幾種方法:
(1)基於隊列實現生產者消費者模型,傳遞一個成產結束的信號

from multiprocessing import Queue from multiprocessing import Process import time import random def producer(q,food): for i in range(20): #兩個生產進程准備生產40個包子 q.put('%s個%s'%(i,food)) time.sleep(random.random()) q.put('生產完了') #生產結束的信號(由於有三個消費者,兩個生產者,那么只會產生兩個結束的信號,也就是還有一個消費者拿不到結束信號,程序還是沒法結束,所以下面還要再設置一個結束信號) q.put('生產完了') # 生產結束的信號 def consumer(q,name): while True:#兩個消費者 循環消費,直到拿到生產結束的信號 food=q.get() if food =='生產完了':break #判斷拿出來的數據是不是’生產完了‘的信號 ,如果是則說明生產結束,直接退出 else: print(name, food) if __name__=='__main__': q=Queue() p1=Process(target=producer,args=(q,'包子')) #第一個生產者生產20個包子 p2 = Process(target=producer, args=(q,'饅頭'))#第二個生產者生產20個饅頭 p1.start() p2.start() c1 = Process(target=consumer, args=(q,'zxe')) #第一個消費者 c2= Process(target=consumer, args=(q,'zzxxcc')) # 第二個消費者 c3 = Process(target=consumer, args=(q, 'zzxxcc')) # 第三個消費者 c1.start() c2.start() c3.start()
""" 對列是安全的 生產者消費者 模型: #由於消費者不知道生產者生產多少數據,所以要用while循環 #循環無法結束,就要生產者給一個結束信號 #信號數量=消費者的數量-生產者的數量+1,這種方法就略微麻煩了 """
(2)JoinableQueue 創建可連接的共享進程隊列,隊列允許項目的使用者通知生產者項目已經被成功處理
# q=JoinableQueue() # q.get q.put方法 # q.join 生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。 # q.task_done 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。

from multiprocessing import JoinableQueue from multiprocessing import Process import time import random def producer(q,): for i in range(5): q.put('生產的第%s個包子'%(i)) time.sleep(random.random()) q.join() #直到收到5個task_done信號,才會結束 def consumer(q,name): while True: #由於不知道生產者生產多少,所以用循環來處理不確定的數據 food=q.get() #處理數據(消費包子) print('%s吃了%s'%(name,food)) q.task_done() #每次消費一個包子都會向生產者發送一個task_done if __name__=='__main__': q=JoinableQueue() #創建可連接的共享進程隊列對象 p=Process(target=producer,args=(q,)) p.start() c1=Process(target=consumer,args=(q,'zxc')) #要把q對列對象傳進去 c1.daemon=True #守護進程,因為consumer子進程是一個無線循環的函數,為了讓它在處理完數據之后正常結束 c2=Process(target=consumer,args=(q,'zzxc')) c2.daemon = True #守護進程要寫在進程start前面 c1.start() c2.start() p.join() #等待p進程執行完,只要p結束了就代表consumer處理完數據了(如果不加阻塞,主進程會很快執行完了,那么守護進程也會隨之結束,而p進程的join 會一直等待task_done 整個程序將無法結束)
總結:生產者生產的數據全部被消費 ——> 生產者進程結束 ——> 主進程代碼執行結束 —— >消費者守護進程結束