多進程操作-進程隊列multiprocess.Queue的使用


一、ipc機制 進程通訊

管道:pipe 基於共享的內存空間
隊列:pipe+鎖 queue

下面拿代碼來實現Queue如何使用:

案例一:

from multiprocessing import Queue
q = Queue()     # 實例產生一個q隊列
q.put('蔡徐坤')  # 將括號內的數據加入隊列中,先進先出
q.put([1,2,3])
q.put(3)
print(q.get())  # 將隊列里的數據取出來,先進先出
print(q.get())
print(q.get())
# q.put(5)
print(q.get())  # 如果隊列里面沒有值,就會一直等待隊0列有值。

案例二:

from multiprocessing import Queue
q = Queue(4)  # 4 代表隊列最大項數為4,不寫則為無限制大小
q.put('蔡徐坤')  # 將括號內的數據加入隊列中,先進先出
q.put([1,2,3])
q.put(3)
q.put(3)
q.put(3)    # 隊列滿了的話,會阻塞,等待q.get()放值后,才能加入隊列


案例三:(從這往下都是了解)

from multiprocessing import Queue
q = Queue(3)
q.put('zhao')
q.put('zhao')
q.put('zhao')

q.put('zhao',block=True,timeout=5) # put里的  block=True(默認) 如果滿了會等待,timeout最多等待n s,如果ns還是隊列還是滿的就報錯了,如果block=False,隊列滿了直接報錯。

案例四:

from multiprocessing import Queue
q = Queue()
q.put('yyyy')
q.get()
q.get(block=True,timeout=5) # block=True 阻塞等待,timeout最多等5s, 剩下同上

案例五:

from multiprocessing import Queue
q = Queue(3)
q.put('qwe')
q.put('qwe')
q.put('qwe')
q.put('qwe',block=False) # 對於put來說block=False 如果隊列滿了就直接報錯

q = Queue(3)
q.put('qwe')
q.get()
q.get(block=False)  # 對於get來說:block = Flase 拿不到不阻塞,直接報錯

案例六:

from multiprocessing import Queue
q = Queue(1)
q.put('123')
q.get()
q.put_nowait('666') # 相當於block = False
q.get_nowait() #     block = False



二、生產者消費者模型:

​ 在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度

2.1 為什么要使用生產者和消費者模式?

​ 在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

2.2什么是生產者消費者模式?

​ 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

簡述:

​ 生產者: 生產數據的任務
​ 消費者: 處理數據的任務

​ 生產者--隊列(盆)-->消費者

​ 生產者可以不停的生產,達到了自己最大的生產效率,消費者可以不停的消費,也達到了自己最大的消 費效率.
​ 生產者消費者模型大大提高了生產者生產的效率和消費者消費的效率.

​ 補充: queue不適合傳大文件,通產傳一些消息.

生產者消費者模型一:

from multiprocessing import Process,Queue

def producer(q,name,food):
    '''生產者'''
    for i in range(10):
        print(f'{name}生產了{food}{i}')
        res = f'{food}{i}'
        q.put(res)
    q.put(None)  # 發送結束信號

def consumer(q,name):
    '''消費者'''
    while True:
        res = q.get(timeout=5)
        if res is None:
            break
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q=Queue()
    p1 = Process(target=producer,args=(q,'蔡徐坤','包子'))
    c1 = Process(target=consumer,args=(q,'周琦'))
    p1.start()
    c1.start()
    # p1.join()
    # q.put(None)  # 不一定由生產者發送結束信號,也可以由主進程來發送

多個消費者例子:有幾個消費者就需要發送幾次結束信號:

from multiprocessing import Process,Queue
import time,random

def producer(q,name,food):
    '''生產者'''
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        time.sleep(random.randint(1,3))
        res = f'{food}{i} '
        q.put(res)

def consumer(q,name):
    '''消費者'''
    while True:
        res = q.get(timeout=5)
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q =Queue()
    p1 = Process(target=producer,args=(q,'蔡徐坤','包子'))
    p2 = Process(target=producer,args=(q,'周琦','手抓餅'))
    p3 = Process(target=producer,args=(q,'吳亦凡','羊肉串'))
    c1 = Process(target=consumer,args=(q,'葉問'))
    c2 = Process(target=consumer,args=(q,'黃飛鴻'))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()  # 生產者生產完畢
    q.put(None) # 幾個消費者就put幾次None
    q.put(None)

JoinableQueue隊列實現消費者生產者模型:

from multiprocessing import Process,JoinableQueue
import time,random

def producer(q,name,food):
    '''生產者'''
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        time.sleep(random.randint(1,3))
        res = f'{food}{i} '
        q.put(res)

def consumer(q,name):
    '''消費者'''
    while True:
        res = q.get(timeout=5)
        time.sleep(random.randint(1,3))
        print(f'{name}吃了{res}')
        q.task_done()   #向q.join()發送一次信號,證明一個數據已經被取走了

if __name__ == '__main__':
    q =JoinableQueue()
    p1 = Process(target=producer,args=(q,'蔡徐坤','包子'))
    p2 = Process(target=producer,args=(q,'周琦','手抓餅'))
    p3 = Process(target=producer,args=(q,'吳亦凡','羊肉串'))
    c1 = Process(target=consumer,args=(q,'葉問'))
    c2 = Process(target=consumer,args=(q,'黃飛鴻'))
    p1.start()
    p2.start()
    p3.start()

    c1.daemon = True  # 定義消費者為守護進程
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()  # 生產者生產完畢
    # q.put(None) # 幾個消費者就put幾次None
    # q.put(None)
    q.join()  # 生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。
    # 分析:
    #  生產者生產完畢---》消費者已經取干凈了---》q.join()消費者已經取干凈了,沒有存在的意義了
    # 這是主進程最后一行代碼結束,消費者已經取干凈了,沒有存在的意義了.守護進程的概念.

    # 分析2:
    # #主進程等--->p1,p2,p3等---->c1,c2
    #     #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
    #     #因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,所以設置成守護進程就可以了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM