Python3學習之路~9.4 隊列、生產者消費者模型


一 隊列queue

當必須在多個線程之間安全地交換信息時,隊列在線程編程中特別有用。

隊列的作用:1.解耦,使程序直接實現松耦合 2.提高處理效率

列表與隊列都是有順序的,但是他們之間有一個很大的區別:從列表中取出一個數據,數據還在列表中,從隊列中取出一個數據,隊列中就減少一個數據。class queue.Queue(maxsize=0) #先入先出

class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列。
#maxsize是一個整數,用於設置可以放入隊列的數據的上限。達到此大小后,插入將阻止,直到消耗隊列項。
#如果maxsize小於或等於零,則隊列大小為無限大。
#PriorityQueue首先檢索priority_number最低值的數據(最低值是由sorted(list(entries))[0]返回)。
#放入PriorityQueue中的數據的典型模式是以下形式的元組:(priority_number,data)。

exception queue.Empty #在對空隊列調用非阻塞的get()或get_nowait()時引發的異常。
exception queue.Full  #在對已滿的隊列調用非阻塞put()或put_nowait()時引發的異常。
Queue.qsize() #返回隊列長度
Queue.empty() #如果隊列為空,返回True 
Queue.full() #如果隊列已滿,返回True 

Queue.put(item, block=True, timeout=None) 
#將數據放入隊列。如果block=True且timeout=None(默認值),則在必要時(隊列滿時進行put操作)阻塞,直到隊列有空閑可用。
#若timeout=正數,它會阻塞最多超時秒,如果在該時間內隊列沒有空閑可用,則會引發Full異常。
#若block=False,則在必要時直接拋出Full異常(在這種情況下忽略超時)。

Queue.put_nowait(item) #相當於put(item, False)

Queue.get(block=True, timeout=None) 
#從隊列中取出一個數據。如果block=True且timeout=None(默認值),則在必要時(隊列空時進行get操作)阻塞,直到隊列有數據可取。
#若timeout=正數,它會阻塞最多超時秒,如果在該時間內隊列沒有數據可取,則會引發Empty異常。
#若block=False,則在必要時直接拋出Empty異常(在這種情況下忽略超時)。

Queue.get_nowait() #相當於get(False)

#Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
#Indicate that a formerly enqueued task is complete. Used by queue consumer threads. 
#For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. #If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call
#was received for every item that had been put() into the queue). #Raises a ValueError if called more times than there were items placed in the queue. Queue.join() #block直到queue被消費完畢

 queue簡單操作

C:\Users\Administrator>python3
Python 3.6.5 (v3.6.5:f59c0932b4, Mar 28 2018, 17:00:18) [MSC v.1900 64 bit (AMD6
4)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import queue
>>> q = queue.Queue() #實例化一個隊列
>>> q.put("d1") #放入d1
>>> q.put("d2") #放入d2
>>> q.put("d3") #放入d3
>>> q.get() #不能指定取出哪個數據,必須按照先入先出規則
'd1'
>>> q.get()
'd2'
>>> q.get()
'd3'
>>> q.get() #當隊列中沒數據可取時,會卡住,無限等待

#如果不想卡住,可在get之前通過q.qsize()來判斷隊列長度,或者通過q.get_nowait()來抓住異常
>>> q.qsize()
0
>>> q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\software\Python3.6.5\lib\queue.py", line 192, in get_nowait
    return self.get(block=False)
  File "D:\software\Python3.6.5\lib\queue.py", line 161, in get
    raise Empty
queue.Empty

#Queue.get(block=True, timeout=None)方法默認有2個參數,可以手動改掉,使當隊列為空時,不卡住。
>>> q.get(block=False) #當隊列為空時,不卡住,直接報異常
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\software\Python3.6.5\lib\queue.py", line 161, in get
    raise Empty
queue.Empty
>>> q.get(timeout=1)  #當隊列為空時,卡住1秒,然后報異常
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\software\Python3.6.5\lib\queue.py", line 172, in get
    raise Empty
queue.Empty

>>> import queue
>>> q = queue.Queue(maxsize=3) #可以設置隊列長度最大為3
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> q.put(4) #卡住,等待另一個線程把1取出來,4才能放進去

#Queue.put(item, block=True, timeout=None),法默認有2個參數,可以手動改掉,使當隊列滿時,不卡住。與get類似
>>> q.put(4,block=False)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\software\Python3.6.5\lib\queue.py", line 130, in put
    raise Full
queue.Full
>>> q.put(4,timeout=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "D:\software\Python3.6.5\lib\queue.py", line 141, in put
    raise Full
queue.Full

>>> import queue
>>> q = queue.LifoQueue() #后進先出
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> q.get()
3
>>> q.get()
2
>>> q.get()
1

>>> import queue
>>> q = queue.PriorityQueue() #優先級隊列
>>> q.put((3,"zhao"))
>>> q.put((10,"qian"))
>>> q.put((6,"sun"))
>>> q.put((-1,"li"))
>>> q.get() #按數字由小到大
(-1, 'li')
>>> q.get()
(3, 'zhao')
>>> q.get()
(6, 'sun')
>>> q.get()
(10, 'qian')
queue實際操作

 

二 生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

下面來學習一個最基本的生產者消費者模型的例子

import threading,queue,time

q = queue.Queue(maxsize=10)

def Producer(name):
    count = 0
    while True:
        q.put("小魚干%s" % count)
        print("[%s]生產了骨頭"%name,count)
        count += 1
        time.sleep(0.5)
        
def Consumer(name):
    while True:
        print("[%s] 取到[%s]並且吃了它..." %(name,q.get()))
        time.sleep(1)

for i in range(2):
    p = threading.Thread(target=Producer, args=("主人%s"%i,))
    p.start()

for i in range(3):
    c = threading.Thread(target=Consumer,args=("大貓%s"%i,))
    c.start()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM