1.什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。
2.生產消費者模型
生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過內存緩沖區進行通信,生產者生產消費者需要的資料,消費者把資料做成產品。生產消費者模式如下圖。
在日益發展的服務類型中,譬如注冊用戶這種服務,它可能解耦成好幾種獨立的服務(賬號驗證,郵箱驗證碼,手機短信碼等)。它們作為消費者,等待用戶輸入數據,在前台數據提交之后會經過分解並發送到各個服務所在的url,分發的那個角色就相當於生產者。消費者在獲取數據時候有可能一次不能處理完,那么它們各自有一個請求隊列,那就是內存緩沖區了。做這項工作的框架叫做消息隊列。
3.生產者消費者模型的實現
生產者是一堆線程,消費者是另一堆線程,內存緩沖區可以使用List數組隊列,數據類型只需要定義一個簡單的類就好。關鍵是如何處理多線程之間的協作。這其實也是多線程通信的一個范例。
在這個模型中,最關鍵就是內存緩沖區為空的時候消費者必須等待,而內存緩沖區滿的時候,生產者必須等待。其他時候可以是個動態平衡。值得注意的是多線程對臨界區資源的操作時候必須保證在讀寫中只能存在一個線程,所以需要設計鎖的策略。
4.為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式。
為了不至於太抽象,我們舉一個寄信的例子(雖說這年頭寄信已經不時興,但這個例子還是比較貼切的)。假設你要寄一封平信,大致過程如下:
1、你把信寫好——相當於生產者制造數據
2、你把信放入郵筒——相當於生產者把數據放入緩沖區
3、郵遞員把信從郵筒取出——相當於消費者把數據取出緩沖區
4、郵遞員把信拿去郵局做相應的處理——相當於消費者處理數據
4.1優點
解耦
假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。
接着上述的例子,如果不使用郵筒(也就是緩沖區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產生和你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者代碼)。而郵筒相對來說比較固定,你依賴它的成本就比較低(相當於和緩沖區之間的弱耦合)。
支持並發(concurrency)
生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。
使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的並發主體(常見並發類型有進程和線程兩種)。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。
支持忙閑不均
緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
為了充分復用,我們再拿寄信的例子來說事。假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節(也可能是聖誕節)送賀卡,需要寄出去的信超過1000封,這時候郵筒這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。
5.多生產者和多消費者場景
在多核時代,多線程並發處理速度比單線程處理速度更快,所以我們可以使用多個線程來生產數據,同樣可以使用多個消費線程來消費數據。而更復雜的情況是,消費者消費的數據,有可能需要繼續處理,於是消費者處理完數據之后,它又要作為生產者把數據放在新的隊列里,交給其他消費者繼續處理。如下圖:
6.線程池與生產消費者模式
Java中的線程池類其實就是一種生產者和消費者模式的實現方式,但是我覺得其實現方式更加高明。生產者把任務丟給線程池,線程池創建線程並處理任務,如果將要運行的任務數大於線程池的基本線程數就把任務扔到阻塞隊列里,這種做法比只使用一個阻塞隊列來實現生產者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產者先存,消費者再取這種方式顯然慢一些。
我們的系統也可以使用線程池來實現多生產者消費者模式。比如創建N個不同規模的Java線程池來處理不同性質的任務,比如線程池1將數據讀到內存之后,交給線程池2里的線程繼續處理壓縮數據。線程池1主要處理IO密集型任務,線程池2主要處理CPU密集型任務。
7.內存緩沖區
最傳統、最常見的方式:隊列(FIFO)作緩沖。
7.1 線程方式
並發線程中使用隊列的優缺點
內存分配的性能
在線程方式下,生產者和消費者各自是一個線程。生產者把數據寫入隊列頭(以下簡稱push),消費者從隊列尾部讀出數據(以下簡稱pop)。當隊列為空,消費者就稍息(稍事休息);當隊列滿(達到最大長度),生產者就稍息。整個流程並不復雜。
上述過程會有一個主要的問題是關於內存分配的性能開銷。對於常見的隊列實現:在每次push時,可能涉及到堆內存的分配;在每次pop時,可能涉及堆內存的釋放。假如生產者和消費者都很勤快,頻繁地push、pop,那內存分配的開銷就很可觀了。對於內存分配的開銷,可查找Java性能優化相關知識。
解決辦法:環形緩沖區。
同步和互斥的性能
另外,由於兩個線程共用一個隊列,自然就會涉及到線程間諸如同步、互斥、死鎖等等。這會兒要細談的是,同步和互斥的性能開銷。在很多場合中,諸如信號量、互斥量等的使用也是有不小的開銷的(某些情況下,也可能導致用戶態/核心態切換)。如果像剛才所說,生產者和消費者都很勤快,那這些開銷也不容小覷。
解決辦法:雙緩沖區。
適用於隊列的場合
由於隊列是很常見的數據結構,大部分編程語言都內置了隊列的支持,有些語言甚至提供了線程安全的隊列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,開發人員可以撿現成,避免了重新發明輪子。
所以,假如你的數據流量不是很大,采用隊列緩沖區的好處還是很明顯的:邏輯清晰、代碼簡單、維護方便。比較符合KISS原則。
7.2 進程方式
跨進程的生產者/消費者模式,非常依賴於具體的進程間通訊(IPC)方式。而IPC的種類很多。下面介紹比較常用的跨平台、且編程語言支持較多的IPC方式。
匿名管道
感覺管道是最像隊列的IPC類型。生產者進程在管道的寫端放入數據;消費者進程在管道的讀端取出數據。整個的效果和線程中使用隊列非常類似,區別在於使用管道就無需操心線程安全、內存分配等瑣事(操作系統暗中都幫你搞定了)。
管道又分命名管道和匿名管道兩種,今天主要聊匿名管道。因為命名管道在不同的操作系統下差異較大(比如Win32和POSIX,在命名管道的API接口和功能實現上都有較大差異;有些平台不支持命名管道,比如Windows CE)。除了操作系統的問題,對於有些編程語言(比如Java)來說,命名管道是無法使用的。
其實匿名管道在不同平台上的API接口,也是有差異的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一樣)。但是我們可以僅使用標准輸入和標准輸出(以下簡稱stdio)來進行數據的流入流出。然后利用shell的管道符把生產者進程和消費者進程關聯起來。實際上,很多操作系統(尤其是POSIX風格的)自帶的命令都充分利用了這個特性來實現數據的傳輸(比如more、grep等),如此優點:
1、基本上所有操作系統都支持在shell方式下使用管道符。因此很容易實現跨平台。
2、大部分編程語言都能夠操作stdio,因此跨編程語言也就容易實現。
3、管道方式省卻了線程安全方面的瑣事。有利於降低開發、調試成本。
當然,這種方式也有自身的缺點:
1、生產者進程和消費者進程必須得在同一台主機上,無法跨機器通訊。這個缺點比較明顯。
2、在一對一的情況下,這種方式挺合用。但如果要擴展到一對多或者多對一,那就有點棘手了。所以這種方式的擴展性要打個折扣。假如今后要考慮類似的擴展,這個缺點就比較明顯。
3、由於管道是shell創建的,對於兩邊的進程不可見(程序看到的只是stdio)。在某些情況下,導致程序不便於對管道進行操縱(比如調整管道緩沖區尺寸)。這個缺點不太明顯。
4、最后,這種方式只能單向傳數據。好在大多數情況下,消費者進程不需要傳數據給生產者進程。萬一你確實需要信息反饋(從消費者到生產者),那就費勁了。可能得考慮換種IPC方式。
注意事項:
1、對stdio進行讀寫操作是以阻塞方式進行。比如管道中沒有數據,消費者進程的讀操作就會一直停在哪兒,直到管道中重新有數據。
2、由於stdio內部帶有自己的緩沖區(這緩沖區和管道緩沖區是兩碼事),有時會導致一些不太爽的現象(比如生產者進程輸出了數據,但消費者進程沒有立即讀到)。
SOCKET(TCP方式)
基於TCP方式的SOCKET通訊是又一個類似於隊列的IPC方式。它同樣保證了數據的順序到達;同樣有緩沖的機制。而且跨平台和跨語言,和剛才介紹的shell管道符方式類似。
SOCKET相比shell管道符的方式,主要有如下幾個優點:
1、SOCKET方式可以跨機器(便於實現分布式)。這是主要優點。
2、SOCKET方式便於將來擴展成為多對一或者一對多。這也是主要優點。
3、SOCKET可以設置阻塞和非阻塞方法,用起來比較靈活。這是次要優點。
4、SOCKET支持雙向通訊,有利於消費者反饋信息。
當然有利就有弊。相對於上述shell管道的方式,使用SOCKET在編程上會更復雜一些。好在前人已經做了大量的工作,可借助於這些第三方的庫和框架,比如C++的ACE庫、Python的Twisted。
雖然TCP在很多方面比UDP可靠,但鑒於跨機器通訊先天的不可預料性,可以在生產者進程和消費者進程內部各自再引入基於線程的"生產者/消費者模式",如下圖:
這么做的關鍵點在於把代碼分為兩部分:生產線程和消費線程屬於和業務邏輯相關的代碼(和通訊邏輯無關);發送線程和接收線程屬於通訊相關的代碼(和業務邏輯無關)。
這樣的好處是很明顯的,具體如下:
1、能夠應對暫時性的網絡故障。並且在網絡故障解除后,能夠繼續工作。
2、網絡故障的應對處理方式(比如斷開后的嘗試重連),只影響發送和接收線程,不會影響生產線程和消費線程(業務邏輯部分)。
3、具體的SOCKET方式(阻塞和非阻塞)只影響發送和接收線程,不影響生產線程和消費線程(業務邏輯部分)。
4、不依賴TCP自身的發送緩沖區和接收緩沖區。(默認的TCP緩沖區的大小可能無法滿足實際要求)
5、業務邏輯的變化(比如業務需求變更)不影響發送線程和接收線程。
針對上述的最后一條,如果整個業務系統中有多個進程是采用上述的模式,那或許可以重構:在業務邏輯代碼和通訊邏輯代碼之間,把業務邏輯無關的部分封裝成一個通訊中間件。
7.3 環形緩沖區
使用場景:當存儲空間(不僅包括內存,還可能包括諸如硬盤之類的存儲介質)的分配/釋放非常頻繁並且確實產生了明顯的影響,才應該考慮環形緩沖區的使用。否則的話,還是選用最基本、最簡單的隊列緩沖區。
環形緩沖區 vs 隊列緩沖區
1.外部接口相似
普通的隊列有一個寫入端和一個讀出端。隊列為空的時候,讀出端無法讀取數據;當隊列滿(達到最大尺寸)時,寫入端無法寫入數據。
對於使用者來講,環形緩沖區和隊列緩沖區是一樣的。它也有一個寫入端(用於push)和一個讀出端(用於pop),也有緩沖區“滿”和“空”的狀態。所以,從隊列緩沖區切換到環形緩沖區,對於使用者來說能比較平滑地過渡。
2.內部結構迥異
雖然兩者的對外接口差不多,但是內部結構和運作機制有很大差別。重點介紹一下環形緩沖區的內部結構。
可以把環形緩沖區的讀出端(以下簡稱R)和寫入端(以下簡稱W)想象成是兩個人在體育場跑道上追逐(R追W)。當R追上W的時候,就是緩沖區為空;當W追上R的時候(W比R多跑一圈),就是緩沖區滿。
為了形象起見,如下:
從上圖可以看出,環形緩沖區所有的push和pop操作都是在一個固定的存儲空間內進行。而隊列緩沖區在push的時候,可能會分配存儲空間用於存儲新元素;在pop時,可能會釋放廢棄元素的存儲空間。所以環形方式相比隊列方式,少掉了對於緩沖區元素所用存儲空間的分配、釋放。這是環形緩沖區的一個主要優勢。
環形緩沖區的實現
1.數組方式 vs 鏈表方式
環形緩沖區的內部實現,即可基於數組(此處的數組,泛指連續存儲空間)實現,也可基於鏈表實現。
數組在物理存儲上是一維的連續線性結構,可以在初始化時,把存儲空間一次性分配好,這是數組方式的優點。但是要使用數組來模擬環,你必須在邏輯上把數組的頭和尾相連。在順序遍歷數組時,對尾部元素(最后一個元素)要作一下特殊處理。訪問尾部元素的下一個元素時,要重新回到頭部元素(第0個元素)。如下圖所示:
使用鏈表的方式,正好和數組相反:鏈表省去了頭尾相連的特殊處理。但是鏈表在初始化的時候比較繁瑣,而且在有些場合(比如跨進程的IPC)不太方便使用。
2.讀寫操作
環形緩沖區要維護兩個索引,分別對應寫入端(W)和讀取端(R)。寫入(push)的時候,先確保環沒滿,然后把數據復制到W所對應的元素,最后W指向下一個元素;讀取(pop)的時候,先確保環沒空,然后返回R對應的元素,最后R指向下一個元素。
3.判斷“空”和“滿”
上述的操作並不復雜,不過有一個小小的麻煩:空環和滿環的時候,R和W都指向同一個位置!這樣就無法判斷到底是“空”還是“滿”。大體上有兩種方法可以解決該問題。
辦法1:始終保持一個元素不用
當空環的時候,R和W重疊。當W比R跑得快,追到距離R還有一個元素間隔的時候,就認為環已經滿。當環內元素占用的存儲空間較大的時候,這種辦法顯得很土(浪費空間)。
辦法2:維護額外變量
如果不喜歡上述辦法,還可以采用額外的變量來解決。比如可以用一個整數記錄當前環中已經保存的元素個數(該整數>=0)。當R和W重疊的時候,通過該變量就可以知道是“空”還是“滿”。
4.元素的存儲
由於環形緩沖區本身就是要降低存儲空間分配的開銷,因此緩沖區中元素的類型要選好。盡量存儲值類型的數據,而不要存儲指針(引用)類型的數據。因為指針類型的數據又會引起存儲空間(比如堆內存)的分配和釋放,使得環形緩沖區的效果打折扣。
應用場合
如果所使用的編程語言和開發庫中帶有現成的、成熟的環形緩沖區,建議使用現成的庫,不要重新制造輪子;確實找不到現成的,才考慮自己實現。
1.用於並發線程
和線程中的隊列緩沖區類似,線程中的環形緩沖區也要考慮線程安全的問題。除非使用的環形緩沖區的庫已經實現了線程安全,否則還是得自己動手搞定。線程方式下的環形緩沖區用得比較多,相關的網上資料也多,下面就大致介紹幾個。
對於C++的程序員,強烈推薦使用boost提供的circular_buffer模板,該模板最開始是在boost 1.35版本中引入的。鑒於boost在C++社區中的地位,大伙兒應該可以放心使用該模板。
對於C程序員,可以去看看開源項目circbuf,不過該項目是GPL協議的,不太爽;而且活躍度不太高;而且只有一個開發人員。大伙兒慎用!建議只拿它當參考。
對於C#程序員,可以參考CodeProject上的一個示例。
2.用於並發進程
進程間的環形緩沖區,似乎少有現成的庫可用。
適用於進程間環形緩沖的IPC類型,常見的有共享內存和文件。在這兩種方式上進行環形緩沖,通常都采用數組的方式實現。程序事先分配好一個固定長度的存儲空間,然后具體的讀寫操作、判斷“空”和“滿”、元素存儲等細節就可參照前面所說的來進行。
共享內存方式的性能很好,適用於數據流量很大的場景。但是有些語言(比如Java)對於共享內存不支持。因此,該方式在多語言協同開發的系統中,會有一定的局限性。
而文件方式在編程語言方面支持很好,幾乎所有編程語言都支持操作文件。但它可能會受限於磁盤讀寫(Disk I/O)的性能。所以文件方式不太適合於快速數據傳輸;但是對於某些“數據單元”很大的場合,文件方式是值得考慮的。
對於進程間的環形緩沖區,同樣要考慮好進程間的同步、互斥等問題。
8.生產者消費者模式三種實現方式代碼示例
8.1 synchronized、wait和notify
public class SynchronizedProducerConsumerDemo { public static void main(String[] args) { MyResource myResource = new MyResource(); Thread thread = new Thread(new myProducer(myResource),"producerAAAAA"); Thread thread1 = new Thread(new myProducer(myResource),"producerBBBBB"); Thread thread2 = new Thread(new myProducer(myResource),"producerCCCCC"); Thread thread3 = new Thread(new myConsumer(myResource),"consumerDDDDD"); Thread thread4 = new Thread(new myConsumer(myResource),"consumerEEEEE"); thread.start(); thread1.start(); thread2.start(); thread3.start(); thread4.start(); } } /** * 資源類 */ class MyResource{ //當前的資源數 private int num = 0; //最大資源數 private int MAX_RESOURCE = 10; //生產方法 public synchronized void add(){ //資源沒有滿就生產 if (num < MAX_RESOURCE){ num ++ ; System.out.println(Thread.currentThread().getName() +"\t produce resource" +num); notifyAll(); } else { //資源滿就等待 try { wait(); System.out.println(Thread.currentThread().getName() +"\t wait to produce"); } catch (InterruptedException e) { e.printStackTrace(); } } } //消費方法 public synchronized void remove(){ //不空就消費 if (num > 0){ num -- ; System.out.println(Thread.currentThread().getName() +"\t consume resource "+(num+1)); notifyAll(); } else{ //資源空,消費就等待 try { wait(); System.out.println(Thread.currentThread().getName() +"\t wait to consume"); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 消費者 */ class myProducer implements Runnable{ private MyResource myResource; public myProducer(MyResource myResource) { this.myResource = myResource; } @Override public void run() { while (true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myResource.add(); } } } /** * 生產者 */ class myConsumer implements Runnable{ private MyResource myResource; public myConsumer(MyResource myResource) { this.myResource = myResource; } @Override public void run() { while (true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myResource.remove(); } } }
8.2 lock和condition的await、signalAll
public class LockProducerConsumerDemo { public static void main(String[] args) { MyResouece2 myResouece2 = new MyResouece2(); for (int i=1;i<=5;i++){ new Thread(new myProducer2(myResouece2),String.valueOf(i)).start(); } for (int i=1;i<=5;i++){ new Thread(new myConsumer2(myResouece2), String.valueOf(i)).start(); } } } /** * 資源類 */ class MyResouece2{ private int num = 0; private int MAX_RESOURCE = 10; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void add(){ lock.lock(); try { if (num < MAX_RESOURCE){ num ++; System.out.println(Thread.currentThread().getName() + "\t produce resource" +num); condition.signalAll(); }else{ try { condition.await(); System.out.println(Thread.currentThread().getName() +"\t await to produce"); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } } public void remove(){ lock.lock(); try { if (num > 0){ num -- ; System.out.println(Thread.currentThread().getName()+"\t consume resource" +(num +1)); condition.signalAll(); } else{ try { condition.await(); System.out.println(Thread.currentThread().getName()+"\t await to consume"); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } } } /** * 消費者 */ class myConsumer2 implements Runnable{ private MyResouece2 myResouece2; public myConsumer2(MyResouece2 myResouece2) { this.myResouece2 = myResouece2; } @Override public void run() { while (true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myResouece2.remove(); } } } class myProducer2 implements Runnable{ private MyResouece2 myResouece2; public myProducer2(MyResouece2 myResouece2) { this.myResouece2 = myResouece2; } @Override public void run() { while (true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } myResouece2.add(); } } }
8.3 使用阻塞隊列BlockingQueue解決生產者消費者
public class BlockQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); Thread thread = new Thread(new Producer(blockingQueue)); Thread thread1 = new Thread(new Consumer(blockingQueue)); Thread thread2 = new Thread(new Consumer(blockingQueue)); thread.start(); thread1.start(); thread2.start(); } } class Producer implements Runnable{ private BlockingQueue blockingQueue; public Producer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (true){ try { String charSequence = UUID.randomUUID().toString().subSequence(0, 3).toString(); blockingQueue.put(charSequence); System.out.println(Thread.currentThread().getName() + "produce ====================" +charSequence); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable{ private BlockingQueue blockingQueue; public Consumer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (true) { try { Object take = blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "consuner =================" + take); } catch (InterruptedException e) { e.printStackTrace(); } } } }