Python | 面試的常客,經典的生產消費者模式


本文始發於個人公眾號:TechFlow,原創不易,求個關注


今天是Python專題的第23篇文章,我們來聊聊關於多線程的一個經典設計模式

在之前的文章當中我們曾經說道,在多線程並發的場景當中,如果我們需要感知線程之間的狀態,交換線程之間的信息是一件非常復雜和困難的事情。因為我們沒有更高級的系統權限,也沒有上帝視角,很難知道目前運行的狀態的全貌,所以想要設計出一個穩健運行沒有bug的功能,不僅非常困難,而且調試起來非常麻煩。

生產消費者模式

在日常開發當中,從一個線程向另外的線程傳輸數據又是一件家常便飯的事情。舉個最簡單的例子,我們在處理網頁請求的時候,需要打印下來這一次請求的相關日志。打印日志是一次IO行為,這是非常消耗時間的,所以我們不能放在請求當中同步進行,否則會影響系統的性能。最好的辦法就是啟動一系列線程專門負責打印,后端的線程只負責響應請求,相關的日志以消息的形式傳送給打印線程打印。

這個簡單的不能再簡單的功能當中涉及了諸多細節,我們來盤點幾個。首先IO線程的數據都是從后台線程來的,假如一段時間內沒有請求,那么這些線程都應該休眠,應該在有請求的時候才會啟動。其次,如果某一段時間內請求非常多,導致IO線程一時間來不及打印所有的數據,那么當下的請求應該先暫存起來,等IO線程”忙過來“之后再進行處理。

把這些細節都考慮到,自己來設計功能還是挺麻煩的。好在這個問題前人已經替我們想過了,並且得出了一個非常經典的設計模式,使用它可以很好的解決這個問題。這個模式就是生產消費者模式

這個設計模式的原理其實非常簡單,我們來看張圖就明白了。

Java並發-- 生產者-消費者模式| 點滴積累
Java並發-- 生產者-消費者模式| 點滴積累

線程根據和數據的關系分為生產者線程和消費者線程,其中生產者線程負責生產數據,產生了數據之后會存儲到任務隊列當中。消費者線程從這個隊列獲取需要消費的數據,它和生產者線程之間不會直接交互,避免了線程之間互相依賴的問題。

另外一個細節是這里的任務隊列並不是普通的隊列,一般情況下是一個阻塞隊列。也就是說當消費者線程嘗試從其中獲取數據的時候,如果隊列是空的,那么這些消費者線程會自動掛起等待,直到它獲得了數據為止。有阻塞隊列當然也有非阻塞隊列,如果是非阻塞隊列的話,當我們嘗試從其中獲取數據的時候,如果它當中沒有數據的話,並不會掛起等待,而是會返回一個空值。

當然阻塞隊列的掛起等待時間也是可以設置的,我們可以讓它一直等待下去,也可以設置一個最長等待時間。如果超過這個時間也會返回空,不同的隊列應用在不同的場景當中,我們需要根據場景性質做出調整。

代碼實現

看完了設計模式的原理,我們下面來試着用代碼來實現一下。

在一般的高級語言當中都有現成的隊列的庫,由於在生產消費者模式當中用到的是阻塞型queue,有阻塞性的隊列當然也就有非阻塞型的隊列。我們在用之前需要先了解清楚,如果用錯了隊列會導致整個程序出現問題。在Python當中,我們最常用的queue就是一個支持多線程場景的阻塞隊列,所以我們直接拿來用就好了。

由於這個設計模式非常簡單,這個代碼並不長只有幾行:

from queue import Queue
from threading import Thread

def producer(que):
    data = 0
    while True:
        data += 1
        que.put(data)
        
def consumer(que):
    while True:
        data = que.get()
        print(data)
        
        
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

我們運行一下就會發現它是可行的,並且由於隊列先進先出的限制,可以保證了consumer線程讀取到的內容的順序和producer生產的順序是一致的

如果我們運行一下這個代碼會發現它是不會結束的,因為consumer和producer當中都用到了while True構建的死循環,假設我們希望可以控制程序的結束,應該怎么辦?

其實也很簡單,我們也可以利用隊列。我們創建一個特殊的信號量,約定好當consumer接受到這個特殊值的時候就停止程序。這樣當我們要結束程序的時候,我們只需要把這個信號量加入隊列即可。

singal = object()

def producer(que):
    data = 0
    while data < 20:
        data += 1
        que.put(data)
    que.put(singal)
        
def consumer(que):
    while True:
        data = que.get()
        if data is singal:
            # 繼續插入singal
            que.put(singal)
            break
        print(data)

這里有一個細節是我們在consumer當中,當讀取到singal的時候,在跳出循環之前我們又把singal放回了隊列。原因也很簡單,因為有時候consumer線程不止一個,這個singal上游只放置了一個,只會被一個線程讀取進來,其他線程並不會知道已經獲得了singal的消息,所以還是會繼續執行。

而當consumer關閉之前放入singal就可以保證每一個consumer在關閉的之前都會再傳遞一個結束的信號給其他未關閉的consumer讀取。這樣一個一個的傳遞,就可以保證所有consumer都關閉。

這里還有一個小細節,雖然利用隊列可以解決生產者和消費者通信的問題,但是上游的生產者並不知道下游的消費者是否已經執行完成了。假如我們想要知道,應該怎么辦?

Python的設計者們也考慮到了這個問題,所以他們在Queue這個類當中加入了task_done和join方法。利用task_done,消費者可以通知queue這一個任務已經執行完成了。而通過調用join,可以等待所有的consumer完成。

from queue import Queue
from threading import Thread

def producer(que):
    data = 0
    while data < 20:
        data += 1
        que.put(data)
        
def consumer(que):
    while True:
        data = que.get()
        print(data)
        que.task_done()
        
        
que = Queue()
t1 = Thread(target=consumer, args=(que, ))
t2 = Thread(target=producer, args=(que, ))
t1.start()
t2.start()

que.join()

除了使用task_done之外,我們還可以在que傳遞的消息當中加入一個Event,這樣我們還可以繼續感知到每一個Event執行的情況。

優先隊列與其他設置

我們之前在介紹一些分布式調度系統的時候曾經說到過,在調度系統當中,調度者會用一個優先隊列來管理所有的任務。當有機器空閑的時候,會有限調度那些優先級高的任務。

其實這個調度系統也是基於我們剛才介紹的生產消費者模型開發的,只不過將調度隊列從普通隊列換成了優先隊列而已。所以如果我們也希望我們的consumer能夠根據任務的優先級來改變執行順序的話,也可以使用優先隊列來進行管理任務。

關於優先隊列的實現我們已經很熟悉了,但是有一個問題是我們需要實現掛起等待的阻塞功能。這個我們自己實現是比較麻煩的,但好在我們可以通過調用相關的庫來實現。比如threading中的Condition,Condition是一個條件變量可以通知其他線程,也可以實現掛起等待

from threading import Thread, Condition

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._cv = Condition()
        
    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            # 通知下游,喚醒wait狀態的線程
            self._cv.notify()

    def get(self):
        with self._cv:
            # 如果對列為空則掛起
            while len(self._queue) == 0:
                self._cv.wait()
            # 否則返回優先級最大的
            return heapq.heappop(self._queue)[-1]

最后介紹一下Queue的其他設置,比如我們可以通過size參數設置隊列的大小,由於這是一個阻塞式隊列,所以如果我們設置了隊列的大小,那么當隊列被裝滿的時候,往其中插入數據的操作也會被阻塞。此時producer線程會被掛起,一直到隊列不再滿為止。

當然我們也可以通過block參數將隊列的操作設置成非阻塞。比如que.get(block=False),那么當隊列為空的時候,將會拋出一個隊列為空的異常。同樣,que.put(data, block=False)時也一樣會得到一個隊列已滿的異常。

總結

今天這篇文章當中我們主要介紹了多線程場景中經典的生產消費者模式,這個模式在許多場景當中都有使用。比如kafka等消息系統,以及yarn等調度系統等等,幾乎只要是涉及到多線程上下游通信的,往往都會用到。也正因此它的使用場景太廣了,所以它經常在各種面試當中出現,也可以認為是工程師必須知道的幾種基礎設計模式之一。

另外,隊列也是一個在設計模式以及使用場景當中經常出現的數據結構。從側面也說明了,為什么算法和數據結構非常重要,許多大公司喜歡問一些算法題,也是因為有實際的使用場景,並且的的確確能鍛煉工程師的思維能力。經常有同學問我算法和數據結構的使用案例,這就是一個很好的例子。

今天的文章到這里就結束了,如果喜歡本文的話,請來一波素質三連,給我一點支持吧(關注、轉發、點贊)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM