並發編程之多進程


一、什么是進程

一個正在運行的程序稱之為進程
是一種抽象概念 表示一個執行某件事情的過程進程的概念 起源於操作系統

第一代計算機 程序是固定 無法修改 某種計算機只能干某種活

第二代批處理系統 需要人工參與 將程序攢成一批 統一執行串行執行 提高計算機的的利用率 但是調試麻煩

第三代計算機 為了更好利用計算機資源,產生了

多道技術: (重點)
1.空間復用
內存分割為多個區域 每個區域存儲不同的應用程序

2.時間的復用
1.當一個程序遇到了I/O操作時 會切換到其他程序 (切換前需要保存當前運行狀態 以便恢復執行)提高效率

2.當你的應用程序執行時間過長 操作系統會強行切走 以保證其他程序也能正常運行 當然因為cpu速度賊快 用戶感覺不到,降低效率

3.有一個優先級更高的任務需要處理 此時也會切走降低了效率

我們編寫程序時 只能盡量減少I/O操作

總的來說 有了多道技術之后 操作系統可以同時運行多個程序吧 這種情形稱之為並發
但是本質好 這些程序還是一個一個排隊執行。

#一 操作系統的作用:
    1:隱藏丑陋復雜的硬件接口,提供良好的抽象接口
    2:管理、調度進程,並且將多個進程對硬件的競爭變得有序

#二 多道技術:
    1.產生背景:針對單核,實現並發
    ps:
    現在的主機一般是多核,那么每個核都會利用多道技術
    有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,會被調度到4個
    cpu中的任意一個,具體由操作系統調度算法決定。
    
    2.空間上的復用:如內存中同時有多道程序
    3.時間上的復用:復用一個cpu的時間片
       強調:遇到io切,占用cpu時間過長也切,核心在於切之前將進程的狀態保存下來,這樣
            才能保證下次切換回來時,能基於上次切走的位置繼續運行

進程的並行與並發

並發 看起來像是同時運行中 本質是不停切換執行 多個進程隨機執行
並行 同一時刻 多個進程 同時進行 只有多核處理器才有真正的並行

區別:

並行是從微觀上,也就是在一個精確的時間片刻,有不同的程序在執行,這就要求必須有多個處理器。
並發是從宏觀上,在一個時間段上可以看出是同時執行的,比如一個服務器同時處理多個session。

串行 一個一個 依次執行排隊
阻塞 遇到了I/O操作 看起來就是代碼卡住了非阻塞 不會卡住代碼的執行
阻塞 和 非阻塞 說的是同一個進程的情況下


同步 一個調用必須獲得返回結果才能繼續執行
異步 一個調用發起后 發起方不需要等待它的返回結果

同步和異步 必須存在多個進程(線程)
無論是進程還是線程都是兩條獨立的執行路徑

 

多進程的執行順序
主進程必然先執行子進程應該在主進程執行后執行一旦子進程啟動了 后續的順序就無法控制了

python如何使用多進程
1.直接創建Process對象 同時傳入要做的事情就是一個函數

p = Process(taget=一個函數,args=(函數的參數))
p.start() 讓操作系統啟動這個進程
2.創建一個類 繼承自Process 把要做的任務放在run方法中

常用屬性
start 開啟進程
join 父進程等待子進程
name 進程名稱
is_alive是否存活
terminate 終止進程
pid 獲取進程id

啟動進程的方式
1.系統初始化 會產生一個根進程
2.用戶的交互請求 鼠標雙擊某個程序
3.在一個進程中 發起了系統調用啟動了另一個進程
4.批處理作業開始 某些專用計算機可能還在使用

不同操作系統創建進程的方式不同
unix < centos mac linux
完全拷貝父進程的所有數據 子進程可以訪問父進程的數據嗎?不可以 但是可以訪問拷貝過來數據副本
windows
創建子進程 加載父進程中所有可執行的文件

二、在python程序中的進程操作

一 multiprocessing模塊介紹

python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu\_count\(\)查看),在python中大部分情況需要使用多進程。

Python提供了multiprocessing。 multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,>提供了Process、Queue、Pipe、Lock等組件。

需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內

二 Process類的介紹

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,可用來開啟一個子進程

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:

group參數未使用,值始終為None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name為子進程的名稱

方法介紹:

p.start():啟動進程,並調用該子進程中的p.run() 
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

屬性介紹:

p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

三 Process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

創建並開啟子進程的方式

import time
import random
from multiprocessing import Process

def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)

if __name__ == '__main__':
    #實例化得到四個對象
    p1=Process(target=piao,args=('egon',)) #必須加,號
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))

    #調用對象下的方法,開啟四個進程
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('')
方式一
import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

if __name__ == '__main__':
    #實例化得到四個對象
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')

    #調用對象下的方法,開啟四個進程
    p1.start() #start會自動調用run
    p2.start()
    p3.start()
    p4.start()
    print('')
方式二

四、Process對象的join方法

在主進程運行過程中如果想並發地執行其他的任務,我們可以開啟子進程,此時主進程的任務與子進程的任務分兩種情況

情況一:在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢后,主進程還需要等待子進程執行完畢,然后統一回收資源。

情況二:如果主進程的任務在執行到某一個階段時,需要等待子進程執行完畢后才能繼續執行,就需要有一種機制能夠讓主進程檢測子進程是否運行完畢,在子進程執行完畢后才繼續執行,否則一直在原地阻塞,這就是join方法的作用

 

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

if __name__ == '__main__':
    p1=Process(target=task,args=('egon',))
    p2=Process(target=task,args=('alex',))
    p3=Process(target=task,args=('yuanhao',))
    p4=Process(target=task,args=('wupeiqi',))

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    # 有的同學會有疑問: 既然join是等待進程結束, 那么我像下面這樣寫, 進程不就又變成串行的了嗎?
    # 當然不是了, 必須明確:p.join()是讓誰等?
    # 很明顯p.join()是讓主線程等待p的結束,卡住的是主進程而絕非子進程p,
    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print('')

 

詳細解析如下:

進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個並發的進程了

而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵

join是讓主線程等,而p1-p4仍然是並發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待

所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間

 

上述啟動進程與join進程可以簡寫為

 

p_l=[p1,p2,p3,p4]

for p in p_l:
    p.start()

for p in p_l:
    p.join()
import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父進程在執行')
多個進程同時運行,再談join方法(1)
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p in p_lst]
    print('父進程在執行')
多個進程同時運行,再談join方法(2)

除了上面這些開啟進程的方法,還有一種以繼承Process類的形式開啟進程的方式

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start會自動調用run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主線程')
通過繼承Process類開啟進程

進程之間的數據隔離問題

from multiprocessing import Process

def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)
進程之間的數據隔離問題

三、守護進程

主進程創建子進程,然后將該進程設置成守護自己的進程,守護進程就好比崇禎皇帝身邊的老太監,崇禎皇帝已死老太監就跟着殉葬了。

關於守護進程需要強調兩點:

其一:守護進程會在主進程代碼執行結束后就終止

其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

如果我們有兩個任務需要並發執行,那么開一個主進程和一個子進程分別去執行就ok了,如果子進程的任務在主進程任務結束后就沒有存在的必要了,那么該子進程應該在開啟前就被設置成守護進程。主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,3))
    print('%s is piao end' %name)


if __name__ == '__main__':
    p=Process(target=task,args=('egon',))
    p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
    p.start()
    print('') #只要終端打印出這一行內容,那么守護進程p也就跟着結束掉了
守護進程

守護 就是看着 陪着  在代碼中 進程只能由進程類守護
一個進程守護者另一個進程 指的是兩個進程之間的關聯關系
特點:守護進程(妃子) 在被守護進程(皇帝)死亡時 會跟隨被守護進程死亡

什么時候需要使用守護進程?
例如: qq中有個下載視頻 應該用子進程去做 但是 下載的過程中 qq退出 那么下載也沒必要繼續了

四、互斥鎖

from multiprocessing import Process,Lock

# 進程間 內存空間是相互獨立的
def task1(lock):
    lock.acquire()
    for i in range(10000):
        print("===")
    lock.release()

def task2(lock):
    lock.acquire()
    for i in range(10000):
        print("===============")
    lock.release()

def task3(lock):
    lock.acquire()
    for i in range(10000):
        print("======================================")
    lock.release()

if __name__ == '__main__':
    # 買了一把鎖
    mutex = Lock()

    # for i in range(10):
    #     p = Process(target=)
    p1 = Process(target=task1,args=(mutex,))
    p2 = Process(target=task2,args=(mutex,))
    p3 = Process(target=task3,args=(mutex,))

    # p1.start()
    # p1.join()
    # p2.start()
    # p2.join()
    # p3.start()
    # p3.join()

    p1.start()
    p2.start()
    p3.start()

    print("over!")
互斥鎖案例
     # 什么時候用鎖?
    # 當多個進程 同時讀寫同一份數據 數據很可能就被搞壞了
    # 第一個進程寫了一個中文字符的一個字節 cpu被切到另一個進程
    # 另一個進程也寫了一個中文字符的一個字節
    # 最后文件解碼失敗
    # 問題之所以出現 是因為並發 無法控住順序
    # 目前可以使用join來將所有進程並發改為串行

    # 與join的區別?
    # 多個進程並發的訪問了同一個資源  將導致資源競爭(同時讀取不會產生問題 同時修改才會出問題)
    # 第一個方案 加上join  但是這樣就導致了 不公平  相當於 上廁所得按照顏值來
    # 第二個方案 加鎖  誰先搶到資源誰先處理[
    # 相同點: 都變成了串行
    # 不同點:
    # 1.join順序固定 鎖順序不固定!
    # 2.join使整個進程的任務全部串行  而鎖可以指定哪些代碼要串行

    # 鎖使是什么?
    # 鎖本質上就是一個bool類型的標識符  大家(多個進程) 在執行任務之前先判斷標識符
    # 互斥鎖 兩個進程相互排斥

    # 注意 要想鎖住資源必須保證 大家拿到鎖是同一把

    # 怎么使用?
    # 在需要加鎖的地方 lock.acquire() 表示鎖定
    # 在代碼執行完后 一定要lock.release() 表示釋放鎖
    # lock.acquire()
    # 放需要競爭資源的代碼 (同時寫入數據)
    # lock.release()

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或者打印終端是沒有問題的,但是帶來的是競爭,競爭帶來的結果是錯亂,如下:多個進程模擬多個人執行搶票任務

#文件db.txt的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
from multiprocessing import Process
import time,json

def search(name):
    dic=json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count']))

def get(name):
    dic=json.load(open('db.txt'))
    time.sleep(1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(1) #模擬寫數據的網絡延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 購票成功\033[0m' %name)

def task(name):
    search(name)
    get(name)

if __name__ == '__main__':
    for i in range(10): #模擬並發10個客戶端搶票
        name='<路人%s>' %i
        p=Process(target=task,args=(name,))
        p.start()

並發運行,效率高,但競爭寫同一文件,數據寫入錯亂,只有一張票,賣成功給了10個人

<路人0> 查到剩余票數1
<路人1> 查到剩余票數1
<路人2> 查到剩余票數1
<路人3> 查到剩余票數1
<路人4> 查到剩余票數1
<路人5> 查到剩余票數1
<路人6> 查到剩余票數1
<路人7> 查到剩余票數1
<路人8> 查到剩余票數1
<路人9> 查到剩余票數1
<路人0> 購票成功
<路人4> 購票成功
<路人1> 購票成功
<路人5> 購票成功
<路人3> 購票成功
<路人7> 購票成功
<路人2> 購票成功
<路人6> 購票成功
<路人8> 購票成功
<路人9> 購票成功
運行結果

加鎖處理:購票行為由並發變成了串行,犧牲了運行效率,但保證了數據安全

#把文件db.txt的內容重置為:{"count":1}
from multiprocessing import Process,Lock
import time,json

def search(name):
    dic=json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩余票數%s\033[0m' %(name,dic['count']))

def get(name):
    dic=json.load(open('db.txt'))
    time.sleep(1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(1) #模擬寫數據的網絡延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 購票成功\033[0m' %name)

def task(name,lock):
    search(name)
    with lock: #相當於lock.acquire(),執行完自代碼塊自動執行lock.release()
        get(name)

if __name__ == '__main__':
    lock=Lock()
    for i in range(10): #模擬並發10個客戶端搶票
        name='<路人%s>' %i
        p=Process(target=task,args=(name,lock))
        p.start()
<路人0> 查到剩余票數1
<路人1> 查到剩余票數1
<路人2> 查到剩余票數1
<路人3> 查到剩余票數1
<路人4> 查到剩余票數1
<路人5> 查到剩余票數1
<路人6> 查到剩余票數1
<路人7> 查到剩余票數1
<路人8> 查到剩余票數1
<路人9> 查到剩余票數1
<路人0> 購票成功
運行結果

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行地修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

雖然可以用文件共享數據實現進程間通信,但問題是:

1、效率低(共享數據基於文件,而文件是硬盤上的數據)

2、需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:

1、效率高(多個進程共享一塊內存的數據)

2、幫我們處理好鎖問題。

這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。

隊列和管道都是將數據存放於內存中,而隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,因而隊列才是進程間通信的最佳選擇。

我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

五、進程間的通訊

IPC 指的是進程間通訊
之所以開啟子進程 肯定需要它幫我們完成任務 很多情況下 需要將數據返回給父進程
然而 進程內存是物理隔離的
解決方案:
1.將共享數據放到文件中 就是慢
2.管道 subprocess中的那個 管道只能單向通訊 必須存在父子關系
3.共享一塊內存區域 得操作系統幫你分配 速度快

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子進程xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 開啟子進程
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)
    print(dic)

六、僵屍進程和孤兒進程

一個進程任務執行完就死亡了 但是操作系統不會立即將其清理 為的是 開啟這個子進程的父進程可以訪問到這個子進程的信息
這樣的 任務完成的 但是沒有被操作系統清理的進程稱為僵屍進程 越少越好

孤兒進程 無害!  沒有爹的稱為孤兒
一個父進程已經死亡 然而他的子孫進程 還在執行着 這時候 操作系統會接管這些孤兒進程

七、隊列介紹

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

創建隊列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

參數介紹:

maxsize是隊列中允許最大項數,省略則無大小限制。
但需要明確:
    1、隊列內存放的是消息而非大數據
    2、隊列占用的是內存空間,因而maxsize即便是無大小限制也受限於內存大小

主要方法介紹:

q.put方法用以插入數據到隊列中。
q.get方法可以從隊列讀取並且刪除一個元素。

隊列使用:

from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #滿了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
"""
    進程間通訊的另一種方式 使用queue
    queue  隊列
    隊列的特點:
        先進的先出
        后進后出
        就像扶梯
"""
from multiprocessing import Process,Queue


# 基礎操作 必須要掌握的
# 創建一個隊列
# q = Queue()
# # 存入數據
# q.put("hello")
# q.put(["1","2","3"])
# q.put(1)
# # 取出數據
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())

# 阻塞操作 必須掌握
# q = Queue(3)
# # # 存入數據
# q.put("hello",block=False)
# q.put(["1","2","3"],block=False)
# q.put(1,block=False)
# 當容量滿的時候 再執行put 默認會阻塞直到執行力了get為止
# 如果修改block=False 直接報錯 因為沒地方放了
# q.put({},block=False)
#
#  取出數據
# print(q.get(block=False))
# print(q.get(block=False))
# print(q.get(block=False))
# # 對於get   當隊列中中沒有數據時默認是阻塞的  直達執行了put
# # 如果修改block=False 直接報錯 因為沒數據可取了
# print(q.get(block=False))

八、生產消費者模型:

1.生產者消費者模型
模型 設計模式 三層結構 等等表示的都是一種編程套路
生產者指的是能夠產生數據的一類任務
消費者指的是處理數據的一類任務

需求: 文件夾里有十個文本文檔 要求你找出文件中包含習大大關鍵字的文件打開並讀取文件數據就是生產者查找關鍵字的過程就是消費者

生產者消費者模型為什么出現?
生產者的處理能力與消費者的處理能力 不匹配不平衡 導致了一方等待另一方 浪費時間
目前我們通過多進程將生產 和 消費 分開處理
然后將生產者生產的數據通過隊列交給消費者

總結一下在生產者消費者模型中 不僅需要生產者消費者 還需要一個共享數據區域
1.將生產方和消費方耦合度降低
2.平衡雙方的能力 提高整體效率

上代碼 :
搞兩個進程 一個負責生產 一個負責消費

from multiprocessing import Process,Queue
# 制作熱狗
def make_hotdog(queue,name):
    for i in range(3):
        time.sleep(random.randint(1,2))
        print("%s 制作了一個熱狗 %s" % (name,i))
        # 生產得到的數據
        data = "%s生產的熱狗%s" % (name,i)
        # 存到隊列中
        queue.put(data)
    # 裝入一個特別的數據 告訴消費方 沒有了
    #queue.put(None)


# 吃熱狗
def eat_hotdog(queue,name):
    while True:
        data = queue.get()
        if not data:break
        time.sleep(random.randint(1, 2))
        print("%s 吃了%s" % (name,data))

if __name__ == '__main__':
    #創建隊列
    q = Queue()
    p1 = Process(target=make_hotdog,args=(q,"邵鑽鑽的熱狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店"))
    p3 = Process(target=make_hotdog, args=(q, "老王的熱狗店"))


    c1 = Process(target=eat_hotdog, args=(q,"思聰"))
    c2 = Process(target=eat_hotdog, args=(q, "李哲"))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    # 讓主進程等三家店全都做完后....
    p1.join()
    p2.join()
    p3.join()

    # 添加結束標志   注意這種方法有幾個消費者就加幾個None 不太合適 不清楚將來有多少消費者
    q.put(None)
    q.put(None)


    # 現在 需要知道什么時候做完熱狗了 生產者不知道  消費者也不知道
    # 只有隊列知道

print("主進程over")
# 生產方不生產了 然而消費方不知道 所以已知等待 get函數阻塞 # 三家店都放了一個空表示沒熱狗了 但是消費者只有兩個 他們只要看見None 就認為沒有了 # 於是進程也就結束了 造成一些數據沒有被處理 # 等待做有店都做完熱狗在放None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')
模型

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM