multiprocessing包—Process模塊開啟多進程的兩種方式,Process的方法,守護進程
進程同步控制—multiprocessing.Lock multiprocessing.Semaphore multiprocessing.Event
進程間通信(IPC)— multiprocessing.Pipe multiprocessing.Queue
進程間的數據共享 — multiprocessing.Manager

1.是運行中的程序; 2.是系統最小的資源分配單位 3.為多個任務之間的數據安全和內存隔離做約束
multiprocessing:提到多進程就要想到它
multiprocess不是一個模塊而是python中一個操作、管理進程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的所有子模塊。
一、multiprocessing.Process開啟多進程:
#process方法: start,join,terminate,isalive 屬性 name,pid,daemon(True為守護進程)

#除了子進程代碼,其他都是主進程 #注意進程的三態轉換 就緒態-(時間片)-執行態,阻塞態 #process方法: start,join,terminate,isalive 屬性 name,pid,daemon(True為守護進程) import time from multiprocessing import Process def func(): #在子進程中執行的代碼,因為被注冊了 print('開啟一個子進程') time.sleep(1) print('證明,print1和print*是一起啟動的') #其中p是一個主進程,實例化一個進程對象, # p.start()這句話為主進程代碼。(但是它告訴系統啟動子進程,操作系統創建新進程,執行進程中的代碼) if __name__ == '__main__':#·只有在windows下開啟子進程,才需要這句話,其他系統不用 p = Process(target=func) #注冊函數。 可以理解為,把func這個函數,放到一個新的py文件里執行 #p是一個進程對象,告訴系統開啟一個子進程 p.start() #啟動子進程,子進程啟動起來后,進入就緒排隊狀態,延遲0.01還是0.001秒看排隊長度 print('*'*5) #其中 這里的print和p.start是一起啟動的,不管有多少個print,都同步啟動 #假設如果我們直接調用func()函數,那么print('*'*5)則與函數此時是同步的,要等函數運行完才能執行print('*'*5) #但是此時我們用子進程p.start告訴系統,要開啟一個子進程去執行func,然后系統就會給func一個進程id,開始運行func.同時主程序會繼續運行自己的代碼print('*'*5) # 此時程序變為了異步了,不影響p.start后面的代碼運行,也就是說,p.start告訴系統開啟子進程調用func后,馬上運行 #print('*'*5),然后進程開啟后,就緒狀態進入了執行狀態,再運行 func。 # 此時運行func和執行print('*'*5),變為了異步,不是同步了。這兩段代碼已經是在兩條馬路上跑的兩輛車了 def f(name): #這里是子進程代碼 print('hello', name) print('我是子進程') if __name__ == '__main__': #以下全部都是主進程代碼,包括import模塊那 p = Process(target=f, args=('bob',)) #主進程 p.start() #開啟了一個子進程,理解為把f放到新的py里執行,但是實際上他還有后續的主進程代碼【print('執行主進程的內容了')】要執行 time.sleep(1) #主進程的代碼 print('執行主進程的內容了') #主進程的代碼 #搭配上面的例子食用更佳。如果上面的sleep是0.1秒,就會先執行上面的print2。如果是1秒,則肯定是先執行,print('執行主進程的內容了') #此時主進程的代碼,和子進程要啟動的代碼。就是異步執行

#pid processid進程id ppid :parent process id import os,time from multiprocessing import Process # print('py中的進程',os.getpid())#執行這整段代碼,這句話會運行兩次第一次指向父進程,第二次指向子進程 def func(name): print('參數是',name) print('子進程',os.getpid()) print('子進程的父進程',os.getppid()) #就是下面這一段的pid time.sleep(1) # name = input('111111111') #EOFError: EOF when reading a line print('子進程,調皮一下') if __name__ == '__main__': p = Process(target=func,args=('gkx',)) #注意傳參數時候 args=('gkx',) 逗號一定要加,參數可以是*args,不能是**kwargs p.start() print('父進程',os.getpid()) print('父進程的父進程',os.getppid()) #這個就是pycharm的進程id

# 進程的生命周期 # 主進程 # 子進程 # 開啟了子進程的主進程 : # 主進程自己的代碼如果長,等待自己的代碼執行結束, # 子進程的執行時間長,主進程會在主進程代碼執行完畢之后等待子進程執行完畢之后 主進程才結束 #···子進程不一定要依賴運行着的父進程 #至於父進程如果關閉了 但是子進程沒運行完, #比如 控制台 python XXX.py 運行了py文件,此時如果關閉了控制台,py文件是否會跟着關閉? #在linux中 如果 python XXX.py & 后面跟上&則 py文件會一直在后台運行 #所以,關閉父進程,子進程是否關閉要看是怎么規定的

import os from multiprocessing import Process class MyProcess(Process): def __init__(self,arg1,arg2): #如果屬性都有默認值,那么在實例化的時候可以不傳屬性,類型函數的傳值方式 super().__init__()#父類的屬性都有默認值所以不用列出 self.arg1 = arg1 self.arg2 = arg2 def run(self): print(self.name) print(self.pid) print('arg1,arg2',self.arg1,self.arg2) # print('run參數',name) print(os.getpid()) if __name__ == '__main__': print('主進程',os.getpid()) p1 = MyProcess('1','2') # p1.run('gkx') p1.start() p2 = MyProcess('3','4') p2.start() #~~~第二種方法的條件: #1.自定義類,繼承Process類 #2.定義個名為 run的方法,run方法中是在子進程中執行的代碼 #3.子進程的參數,通過 __init__的方法初始化,並調用父類的__init__
#注意進程間是數據隔離的,所以才會有進程間的通信需求,在博客的后面會提到

from multiprocessing import Process def func(): num = input('>>>') #EOFError: EOF when reading a line print(num) if __name__ == '__main__': Process(target=func).start() #子進程中不能有input #因為在pycharm中開啟多個進程,軟件幫你優化后,顯示在同一個控制台 #但是實際上它還是兩個進程的,感受不到子進程的input,所以報錯 #因此,子進程是不能input 的 #不同於聊天,QQ用的是UDP,是客戶端和客戶端之間的通信
(1)Process模塊的幾個方法:

import time,os from multiprocessing import Process def func(arg1,arg2): print('*'*arg1) # time.sleep(2) #有種效果:讓時間片輪轉一下 print('='*arg2) if __name__ == '__main__': p = Process(target=func ,args=(10,20,)) p.start() print('哈哈哈哈哈') p.join() #感知一個子進程的結束,從異步變成了同步。有點像把子進程代碼拼進主進程 print('====== 運行完了')
# 多進程代碼
# from multiprocessing import Process ,一定要在 if __name__ == '__main__':下運行
# 方法
# 進程對象.start() 開啟一個子進程
# 進程對象.join() 感知一個子進程的結束
# 進程對象.terminate() 結束一個子進程
# 進程對象.is_alive() 查看某個子進程是否還在運行
# 屬性
# 進程對象.name 進程名
# 進程對象.pid 進程號
# 進程對象.daemon 值為True的時候,表示新的子進程是一個守護進程
# 守護進程 隨着主進程代碼的執行結束而結束
# 一定在start之前設置
(2)開啟多個子進程
使用 空列表 [p.join() for p in p_lst] 保證多個子進程運行完后,才運行主進程

import time,os from multiprocessing import Process def func(arg1,arg2): print('*'*arg1) print('!'*arg1) # time.sleep(2) #由於是異步執行,這里sleep2秒,有點像幾條馬路上的一個紅路燈,把所有子進程攔住,紅綠燈一消失,所有進程就開始執行 #所以開啟多個子進程,也只會等待2秒而已 print('='*arg2) if __name__ == '__main__': p_lst = [] for i in range(10): #按順序告訴系統要開啟多個子進程,但是系統並不會按順序執行!有些是執行態,有些會是就緒態 p = Process(target=func ,args=(10*i,20*i,)) p_lst.append(p) p.start() #p.join()如果在for循環里加了p.join則類似變為了同步 [p.join() for p in p_lst] #保證print('====== 運行完了')肯定在最后運行 # p.join() #由於系統執行順序不確定,print('====== 運行完了') 會出現在不確定的地方 print('+++++++ 運行完了')

from multiprocessing import Process import os def func(filename,content): with open(filename,'w') as f: f.write(content*10*'*') if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=func,args=('info%s'%i,i)) #這里進程是同時啟動的,但是有些進入執行態,有些進入就緒態 p_lst.append(p) p.start() # p.join() i=0等待文件寫入,i=1 等待文件寫入........ [p.join() for p in p_lst] #等待大家跑步結束,之前的所有進程必須在這里都執行完才能繼續往 下執行 #這里, i=1,i=2,i=3,i=4,i=5是同時寫入的,全部寫入完,才打印文件名 print([i for i in os.walk(os.getcwd())]) #在使用for生成進程的時候,進程是幾乎同時進行的,浪費的時間只是for i in range(5) 這極短的時間 #這個例子適合場景: 所有的子進程需要異步執行,但是需要所有子進程做完了的結果,返回給主進程
在用for循環的時候,肯定是按順序的。但是for循環只是極短的時間,生成的子進程,此時有些進程進入就緒態,有些進入執行態,所以順序不一
(3)守護進程:

#守護進程---> 主進程結束的時候,子進程也主動結束 import time from multiprocessing import Process def func(): # 守護進程 會 隨着 主進程的代碼執行完畢 而 結束 while True: time.sleep(0.2) print('我還活着') def fun2(): print('in the func22222222222') time.sleep(8) print('func2 finished') if __name__ == '__main__': p = Process(target=func) p.daemon=True #設置子進程為守護進程,一定要在 start方法前 設置好 p.start() p.terminate()#強制結束一個子進程 time.sleep(0.0000000000000000000001) print(p.is_alive()) #檢驗子進程是否或者。雖然上一個語句結束了,但是這里操作系統正在回收,但是此時響應時間非常短,所以會返回True。sleep一下就是false了 p2 = Process(target=fun2) p2.start() # p2.join() i = 0 while i<5: print('我是socket server.') time.sleep(1) i +=1 # 守護進程 會 隨着 主進程的代碼執行完畢 而 結束 # 在主進程內結束一個子進程 p.terminate() # 結束一個進程不是在執行方法之后立即生效,需要一個操作系統響應的過程 # 檢驗一個進程是否活着的狀態 p.is_alive() # p.name p.pid 這個進程的名字和進程號
(4)使用多進程實現socket服務端的並發

import socket from multiprocessing import Process def server(conn): conn.send('你好'.encode('utf-8')) msg = conn.recv(1024).decode('utf-8') print(msg) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() p = Process(target=server,args=(conn,)) p.start() sk.close()

import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) msg = sk.recv(1024).decode('utf-8') print(msg) ret = input('>>>> ') sk.send(ret.encode('utf8')) sk.close()
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
(1)multiprocessing.Lock 同步鎖/互斥鎖。使用鎖來保證數據安全:

#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然可以用文件共享數據實現進程間通信,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.需要自己加鎖處理 #因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來, 我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

#會造成數據不安全的地方 要加鎖 lock.acquire lock.release from multiprocessing import Process,Lock import json,time def tick(): with open('ticket_num',) as f: ret = json.load(f) time.sleep(0.1) print('余票: %s'%ret['ticket']) def buy_tick(i,lock): #with lock lock.acquire() #相當於拿鑰匙才能進門。一旦有進程拿了鑰匙,其他進程在這里拿不到,就會阻塞 with open('ticket_num',) as f: #阻塞到有人還鑰匙為止 dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0: dic['ticket'] -= 1 print('\033[32m%s買到票啦\033[0m'%i) else: print('\033[31m沒有買到票%s\033[0m'%i) time.sleep(0.1) with open('ticket_num','w') as f: json.dump(dic,f) lock.release() #還鑰匙 if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=tick) p_lst.append(p) p.start() [p.join() for p in p_lst] print('\033[34;42m開始買票了\033[0m') lock = Lock() for i in range(10): p2 = Process(target=buy_tick,args=(i,lock)) p2.start() # class MyTicket(Process): # def __init__(self,i,lock): # super().__init__() # self.i = i # self.lock = lock # # def run(self): # self.lock.acquire() # 相當於拿鑰匙才能進門。一旦有進程拿了鑰匙,其他進程在這里拿不到,就會阻塞 # with open('ticket_num', ) as f: # 阻塞到有人還鑰匙為止 # dic = json.load(f) # time.sleep(0.1) # if dic['ticket'] > 0: # dic['ticket'] -= 1 # print('\033[32m%s買到票啦\033[0m' % self.i) # else: # print('\033[31m沒有買到票%s\033[0m' % self.i) # time.sleep(0.1) # with open('ticket_num', 'w') as f: # json.dump(dic, f) # self.lock.release() # 還鑰匙 # # if __name__ == '__main__': # lock = Lock() # for i in 'abcdefghijk': # p = MyTicket(i,lock) # p.start()
當不加鎖的時候,會出現多個人同時搶到1張票,這明顯是不合理的,所以需要加鎖
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

用遞歸鎖解決:from multiprocessing import RLock (具體參見線程中的RLock)
(2)multiprocessing.Semaphore 信號量,也就是加了計數器的鎖,即同一時間只能讓指定數目的程序執行任務。以下以移動KTV舉例:

互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
假設商場里有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念

#信號量 semaphore sem = Semaphore(int) ,也是鎖的概念,只是內部加了個計數器 import time,random from multiprocessing import Semaphore from multiprocessing import Process # sem = Semaphore(2) # sem.acquire() # print('第一把鑰匙') # sem.acquire() # print('第二把鑰匙') # sem.acquire() # print('第三把鑰匙') def ktv(i,sem): sem.acquire() print('%s走進去KTV了'%i) time.sleep(random.randint(1,5)) print('%s走出來了'%i) sem.release() if __name__ == '__main__': sem = Semaphore(5) for i in range(20): p = Process(target=ktv,args=(i,sem)) p.start()
(3)multiprocessing.Event 事件:事件的阻塞與否是要看 event.wait()是True還是False
其中 wait(time) 可以傳參數,wait(2)表示等待2秒后,就不阻塞了

from multiprocessing import Event #代碼要執行,要得到一個信號通知wait(event),同一時間只能由一個進程執行這段代碼lock,同一時間只能有指定數量的進程執行這段代碼semaphore #一個信號可以使所有的進程都進入阻塞狀態 #也可以控制所有的進程解除阻塞 #一個事件被創建之后,默認是阻塞狀態 e = Event() #創建了一個事件對象 print('事件默認狀態',e.is_set()) #查看一個事件的狀態,默認設置為阻塞 e.set() #將Flag改為True,解除阻塞 print('set之后',e.is_set()) e.clear() #將Flag改為False,繼續阻塞 print('clear之后',e.is_set()) e.wait() #是依據 e.is_set()的值來決定是否阻塞 #這里的wait是指,依據事件的狀態來決定是否在wait處阻塞 print(123456) #is_set()為 False 那么阻塞,默認阻塞。 # set 將Flag改為True,解除阻塞 #clear 將Flag改為False,繼續阻塞

# 事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞, # 如果“Flag”值為True,那么event.wait 方法時便不再阻塞。 #e.wait set clear #wait要配套 set和clear使用。單單只有 set和clear 是不阻塞的 #set和clear是告訴 wait,要不要阻塞,真正的阻塞機制還是在 wait這

import time import random from multiprocessing import Event,Process def cars(e,i): if not e.is_set(): print('car%i在等待'%i) e.wait() # 阻塞 直到得到一個 事件狀態變成 True 的信號 print('\033[0;32;40mcar%i通過\033[0m' % i) def light(e): while True: if e.is_set(): #如果e.is_set()真才走下面這句,但是此時 e.is_set默認是False,所以先綠燈亮了 e.clear() print('\033[31m紅燈亮了\033[0m') else: #默認false,所以先走這里 e.set() #設置 e.is_set() 為True ,此時有車子正在生成 print('\033[32m綠燈亮了\033[0m') time.sleep(2) if __name__ == '__main__': e = Event() traffic = Process(target=light,args=(e,)) traffic.start() for i in range(20): car = Process(target=cars, args=(e,i)) car.start() time.sleep(random.random()) #思路整理: #當事件傳入的時候,首先判斷了e.is_set()是false,然后改變 e.is_set()的狀態。此時car正在生成,等待事件為 0-1的隨機小數 #大部分情況這個小數是足夠大,讓e.is_set()狀態改變的,但是有時候,car隨機的小數非常小,小到e.set還在就緒態,所以有很小的幾率,會默認 e.is_set()是 false #讓車輛等待和同行顯示正常的關鍵點,在 等待時間要明確
三.進程間的通信 — IPC(Inter-Process Communication)
多進程之間,如果不通過特殊的手段,不會共享數據。即數據隔離:

#多進程之間,如果不通過特殊的手段,不會共享數據。即數據隔離 import os from multiprocessing import Process # n = 50 #因為在這個py里,這兩行會被執行兩次 # print(n) def func(): global n n = 0 print('子pid %s'%os.getpid(),n) # n = 100 # func() # print(n) if __name__ == '__main__': n = 100 p = Process(target=func) p.start() p.join() print('父pid %s' % os.getpid(), n) #n 還是100 說明,子進程和主進程,全局不一樣,即內存是隔離的
#以后工作中,一般不會直接用Queue,會使用更強大的幾個模塊: kafka:大數據的 消息中間鍵 可保留數據 rebbitmq memcache:不可保留
(1) mulitiprocessing.Queue 隊列。
隊列可以理解為:基於管道和鎖實現的一個數據在線程/進程之間互相通信的容器

#最主要的四個方法 put get full(滿了返回True) empty (為空返回True) 還有q.get_nowait() 具體見最下方 from multiprocessing import Queue,Process def produce(q): q.put('hello') print('放進去了') def consume(q): ret = q.get() print('收到了',ret) if __name__ == '__main__': q = Queue() #不設置最大值 Queue(5)最大存儲5 p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume,args=(q,)) c.start()

Queue([maxsize])
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍后程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
方法介紹

''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,但是隊列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。 # 如果隊列中的數據一直不被取走,程序就會永遠停在這里。 try: q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。 print('隊列已經滿了') # 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了 單看隊列用法
隊列中最出名的模型:生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
#首先判斷生產者是否生產完畢,不能用empty和get_nowait,因為有可能消費者消費得快,隊列被取光了,此時隊列為空,但是生產者隨時可能再put進來 #其次,多個進程取隊列里取值,比如此時put一個None讓進程break,但是其中一個進程拿到了None退出了,另一個卻繼續阻塞 #所以需要put多個None

# 隊列 # 生產者消費者模型 #首先判斷生產者是否生產完畢,不能用empty和get_nowait,因為有可能消費者消費得快,隊列被取光了,此時隊列為空,但是生產者隨時可能再put進來 #其次,多個進程取隊列里取值,比如此時put一個None讓進程break,但是其中一個進程拿到了None退出了,另一個卻繼續阻塞 #所以需要put多個None import time import random from multiprocessing import Process,Queue def producer(name,food,q): for i in range(10): time.sleep(random.randint(1, 3)) f = '%s生產了%s%s'%(name,food,i) q.put(f) print(f) def consumer(name,q): while True: food = q.get() if food == None: print('沒了~~~~~~') break print('\033[31m%s消費了%s\033[0m'%(name,food)) time.sleep(random.randint(1,3)) if __name__ == '__main__': q = Queue(20) p1 = Process(target=producer,args=('egg','包子',q)) p2 = Process(target=producer,args=('wusir','Cake',q)) p1.start() p2.start() c1 = Process(target=consumer,args=('alex',q)) c2 = Process(target=consumer,args=('jin',q)) c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None) # def consumer(q,name): # while True: # food = q.get() # if food is None: # print('%s獲取到了一個空'%name) # break # print('\033[31m%s消費了%s\033[0m' % (name,food)) # time.sleep(random.randint(1,3)) # # def producer(name,food,q): # for i in range(4): # time.sleep(random.randint(1,3)) # f = '%s生產了%s%s'%(name,food,i) # print(f) # q.put(f) # # if __name__ == '__main__': # q = Queue(20) # p1 = Process(target=producer,args=('Egon','包子',q)) # p2 = Process(target=producer, args=('wusir','泔水', q)) # c1 = Process(target=consumer, args=(q,'alex')) # c2 = Process(target=consumer, args=(q,'jinboss')) # p1.start() # p2.start() # c1.start() # c2.start() # p1.join() # p2.join() # q.put(None) # q.put(None)
使用multiprocessing.JoinableQueue的生產者消費者模型詳解:

JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。

import time import random from multiprocessing import Process,JoinableQueue def consumer(q,name): while True: food = q.get() print('\033[31m%s消費了%s\033[0m' % (name,food)) time.sleep(random.randint(1,3)) q.task_done() # count - 1 每處理一個數據,就獲得一個task_done記錄,最后獲取一個join信號,進行判斷,是否全部都已經task_done #可以理解為,join信號發送過來說我生產了20個東西,task_done對比以下記錄,如果也有20個記錄則表示處理完畢 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生產了%s%s'%(name,food,i) print(f) q.put(f) q.join() # 阻塞 直到一個隊列中的所有數據 全部被處理完畢。原本只要生產完就行,現在要等到包子都吃完才行 #當全部生產完畢,會發送一個join信號 #隊列生產過程被拉長了,從生產完就結束,變為了要等 q.task_done() 完才能結束 #也就是說,執行q.task_done() 都會記錄下來,直到隊列中所有的數據,都有task_done記錄 if __name__ == '__main__': q = JoinableQueue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer, args=('wusir','泔水', q)) c1 = Process(target=consumer, args=(q,'alex')) c2 = Process(target=consumer, args=(q,'jinboss')) p1.start() p2.start() c1.daemon = True # 設置為守護進程 主進程中的代碼執行完畢之后,子進程自動結束 c2.daemon = True c1.start() c2.start() p1.join() p2.join() # 感知一個子進程的結束 #首先通過 q.join感知consumer的結束,然后通過 p1.join感知 producer的結束。再然后通過結束主進程,來結束consumer的子進程(設置了守護進程) # 在消費者這一端: # 每次獲取一個數據 # 處理一個數據 # 發送一個記號 : 標志一個數據被處理成功 # 在生產者這一端: # 每一次生產一個數據, # 且每一次生產的數據都放在隊列中 # 在隊列中刻上一個記號 # 當生產者全部生產完畢之后, # join信號 : 已經停止生產數據了 # 且要等待之前被刻上的記號都被消費完 # 當數據都被處理完時,join阻塞結束 # consumer 中把所有的任務消耗完 # producer 端 的 join感知到,停止阻塞 # 所有的producer進程結束 # 主進程中的p.join結束 # 主進程中代碼結束 # 守護進程(消費者的進程)結束
(2) mulitiprocessing.Pipe 管道。
Pipe:進程數據的不安全性

from multiprocessing import Process,Pipe def func(conn1,conn2): conn2.close() while True: # msg = conn1.recv() # print(msg) try: msg = conn1.recv() print(msg) except EOFError: #只有當主進程2個口都關閉,子進程關閉了其中一個口,且管道內沒數據了,才會報錯EOFError conn1.close() #因為程序要判定,任何一個口都不會給我發消息了 break

#隊列 管道+鎖,所以以后會比較多使用隊列 #管道 更底層的 #IPC(Inter-Process Communication)進程間通信 #Pipe:進程數據的不安全性,在只有一個生產者和只有一個消費者時,不會有問題 #但是存在這種問題:同時有幾個進程往管道要數據,有一條數據被准備取走了,但是另一個進程,此時又來取,就會出現問題。 #多個消費者取同一個數據 #所以要通過枷鎖解決 from multiprocessing import Process,Pipe,Lock import time import random def producer(con,pro,name,food): con.close() for i in range(6): f = '%s生產了%s%s'%(name,food,i) print(f) pro.send(f) time.sleep(0.5) pro.send(None) pro.send(None) pro.close() def consumer(con,pro,name,lock): pro.close() while True: lock.acquire() msg = con.recv() lock.release() time.sleep(0.5) if msg: print('\033[32m%s消費了%s\033[0m' % (name, msg)) else: con.close() break # try: # lock.acquire() # msg = con.recv() # lock.release() # print('\033[32m%s消費了%s\033[0m'%(name,msg)) # time.sleep(0.5) # except EOFError: #子進程,主進程的通道關閉,且管道里沒值了,就會引發EOFError # con.close # print('meile') # lock.release() # break if __name__ == '__main__': con,pro = Pipe() # lock = Lock() p1 = Process(target=producer,args=(con,pro,'egg','baozi',)) c1 = Process(target=consumer,args=(con,pro,'aaaa',lock)) c2 = Process(target=consumer,args=(con,pro,'bbbb',lock)) p1.start() c1.start() c2.start() con.close() #注意要引發 EFOError ,主進程得con和pro也要關閉 pro.close()

#Manager # 進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的 # 雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此 # A manager returned by Manager() will support types # list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. #但是manager的數據共享是不安全性的,同管道,可能兩個進程取了同一個數據 #加鎖解決 from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)