生產者消費者模型應用場景及優勢?


在 工作中,大家可能會碰到這樣一種情況:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。

產 生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。

在生產者與消費者之間在加個緩沖區,我們形象的稱之為倉庫,生產者負責往倉庫了進商 品,而消費者負責從倉庫里拿商品,這就構成了生產者消費者模型。

結構圖如下:

 

 

生產者消費者模型的優點:

1、解耦

假設生產者和消費者分別是兩個類。

如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。

將來如果消費者的代碼發生變化, 可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。

舉個例子,我們去郵局投遞信件,如果不使用郵筒(也就是緩沖區),你必須得把信直接交給郵遞員。

有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。

這就產生和你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。

萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者代碼)。

而郵筒相對來說比較固定,你依賴它的成本就比較低(相當於和緩沖區之間的弱耦合)。

 

2、支持並發

由於生產者與消費者是兩個獨立的並發體,他們之間是用緩沖區作為橋梁連接,生產者只需要往緩沖區里丟數據,

就可以繼續生產下一個數據,而消費者只需要從緩沖區了拿數據即可,這樣就不會因為彼此的處理速度而發生阻塞。

接上面的例子,如果我們不使用郵筒,我們就得在郵局等郵遞員,直到他回來,

我們把信件交給他,這期間我們啥事兒都不能干(也就是生產者阻塞),或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。

 

3、支持忙閑不均

緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。

當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。 等生產者的制造速度慢下來,消費者再慢慢處理掉。

為了充分復用,我們再拿寄信的例子來說事。假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節(也可能是聖誕節)送賀卡,

需要寄出去的信超過1000封,這時 候郵筒這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來 時再拿走。

 

應用場景:

使用多線程,在做爬蟲的時候,生產者用着產生url鏈接,消費者用於獲取url數據,在隊列的幫助下可以使用多線程加快爬蟲速度。

import time
import threading
import Queue
import urllib2
 
class Consumer(threading.Thread):
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self._queue = queue
 
  def run(self):
    while True:
      content = self._queue.get()
      print content
      if isinstance(content, str) and content == 'quit':
        break
      response = urllib2.urlopen(content)
    print 'Bye byes!'
 
 
def Producer():
  urls = [
    'http://211.103.242.133:8080/Disease/Details.aspx?id=2258',
    'http://211.103.242.133:8080/Disease/Details.aspx?id=2258',
    'http://211.103.242.133:8080/Disease/Details.aspx?id=2258',
    'http://211.103.242.133:8080/Disease/Details.aspx?id=2258'
  ]
  queue = Queue.Queue()
  worker_threads = build_worker_pool(queue, 4)
  start_time = time.time()
  for url in urls:
    queue.put(url)
 
  for worker in worker_threads:
    queue.put('quit')
  for worker in worker_threads:
    worker.join()
 
  print 'Done! Time taken: {}'.format(time.time() - start_time)
 
 
def build_worker_pool(queue, size):
  workers = []
  for _ in range(size):
    worker = Consumer(queue)
    worker.start()
    workers.append(worker)
  return workers
 
if __name__ == '__main__':
  Producer()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM