一、什么是生產者-消費者模型
1、簡單理解生產者-消費者模型
假設有兩個進程(或線程)A、B和一個固定大小的緩沖區,A進程生產數據放入緩沖區,B進程從緩沖區中取出數據進行計算,這就是一個簡單的生產者-消費者模型。這里的A進程相當於生產者,B進程相當於消費者。
2、為什么要使用生產者-消費者模型
在多線程開發中,如果生產者生產數據的速度很快,而消費者消費數據的速度很慢,那么生產者就必須等待消費者消費完數據才能夠繼續生產數據,因為生產過多的數據可能會導致存儲不足;同理如果消費者的速度大於生產者那么消費者就會經常處理等待狀態,所以為了達到生產者和消費者生產數據和消費數據之間的平衡,那么就需要一個緩沖區用來存儲生產者生產的數據,所以就引入了生產者-消費者模式
簡單來說,這里緩沖區的作用就是為了平衡生產者和消費者的數據處理能力,一方面起到緩存作用,另一方面達到解耦合作用。
3、生產者-消費者模型特點
-
保證生產者不會在緩沖區滿的時候繼續向緩沖區放入數據,而消費者也不會在緩沖區空的時候,消耗數據
-
當緩沖區滿的時候,生產者會進入休眠狀態,當下次消費者開始消耗緩沖區的數據時,生產者才會被喚醒,開始往緩沖區中添加數據;當緩沖區空的時候,消費者也會進入休眠狀態,直到生產者往緩沖區中添加數據時才會被喚醒
4、生產者-消費者模型的應用場景
生產者-消費者模型一般用於將生產數據的一方和消費數據的一方分割開來,將生產數據與消費數據的過程解耦開來。
1)Excutor任務執行框架:
通過將任務的提交和任務的執行解耦開來,提交任務的操作相當於生產者,執行任務的操作相當於消費者;例如使用Excutor構建web服務器,用於處理線程的請求:生產者將任務提交給線程池,線程池創建線程處理任務,如果需要運行的任務數大於線程池的基本線程數,那么就把任務扔到阻塞隊列(通過線程池 + 阻塞隊列的方式比只使用一個阻塞隊列的效率高很多,因為消費者能夠處理就直接處理掉了,不用每個消費者都要先從阻塞隊列中取出任務再執行)
2)消息中間件active MQ:
雙十一的時候,會產生大量的訂單,那么不可能同時處理那么多的訂單,需要將訂單放入一個隊列里面,然后由專門的線程處理訂單。這里用戶下單就是生產者,處理訂單的線程就是消費者;再比如12306的搶票功能,先由一個容器存儲用戶提交的訂單,然后再由專門處理訂單的線程慢慢處理,這樣可以在短時間內支持高並發服務
3)任務的處理時間比較長的情況下:
比如上傳附近並處理,那么這個時候可以將用戶上傳和處理附件分成兩個過程,用一個隊列暫時存儲用戶上傳的附近,然后立刻返回用戶上傳成功,然后有專門的線程處理隊列中的附近
5、生產者-消費者模型的優點
1)解耦合:將生產者類和消費者類進行解耦,消除代碼之間的依賴性,簡化工作負載的管理。
2)復用:通過將生產者類和消費者類獨立開來,那么可以對生產者類和消費者類進行獨立的復用與擴展。
3)調整並發數:由於生產者和消費者的處理速度是不一樣的,可以調整並發數,給予慢的一方多的並發數,來提高任務的處理速度。
4)異步:對於生產者和消費者來說能夠各司其職,生產者只需要關心緩沖區是否還有數據,不需要等待消費者處理完;同樣的對於消費者來說,也只需要關注緩沖區的內容,不需要關注生產者,通過異步的方式支持高並發,將一個耗時的流程拆成生產和消費兩個階段,這樣生產者因為執行 put() 的時間比較短,而支持高並發。
5)支持分布式:生產者和消費者通過隊列進行通訊,所以不需要運行在同一台機器上,在分布式環境中可以通過 redis 的 list 作為隊列,而消費者只需要輪詢隊列中是否有數據。同時還能支持集群的伸縮性,當某台機器宕掉的時候,不會導致整個集群宕掉。
二、C++實現生產者-消費者模型
1、依賴
1)C++11 提供的 thread 庫
2)互斥鎖 mutex
3)條件變量 condition_variable
4)隊列 queue
5)原子操作 atomic
6)Windows 臨界區
2、實現細節
1)具體的實現邏輯是構建一個queue來存儲生產的數據,queue不滿時可以生產,不空時可以消費。對於這個隊列,采用阻塞隊列的實現思路。
2)先實現構造函數,初始化一個unique_lock供condition_variable使用。如何在類里面使用unique_lock等需要初始化,並且初始化會加鎖的對象。這要研究下。嘗試構造列表初始化,然后函數體里unlock。
3)對於條件變量,申請兩個,分別控制consumer和producer。
4)入隊和出隊列的細節。
5)首先加鎖。
6)循環判斷一下目前的隊列情況,對於各自的特殊情況(隊滿和隊空)進行處理。
6)喚醒一個線程來處理特殊情況。
7)等待處理完畢。
8)處理入和出隊列操作。
9)最后釋放鎖。
10)對於輸出 std::cout 可能因為多線程紊亂的問題,加入了臨界區。另外因為 std::cout 緩存問題,可能存在其他問題。
3、問題
1)出現的 Bug:在多個 consumer 線程情況下,會出現有線程無法退出情況。在析構函數里,加入 stop,並且喚醒因條件變量阻塞的線程。在 pop 函數中,加入對 stop 的判斷,當隊列為空並且 stop 時,退出 pop 函數。對 consumer 的條件變量 wait 調用加入 pred,隊列為空或者沒有停止時阻塞。
2)條件變量的 wait 函數理解:單參數版本,此時傳入一個 unique_lock 類型的變量,並且已經加鎖,調用 wait 之后釋放鎖,並阻塞等待 notify 喚醒,喚醒后加鎖,要注意的是被喚醒后有可能加鎖失敗,此時繼續阻塞;雙參數版本,此時需要再加入一個 Predicate 類型的變量,應該是一個返回 bool 的函數,可用 lamda 表達式代替,返回 false 阻塞,true 解除,要注意這里的意思是即使 notify 了,如果后面的條件不滿足,也不會解除阻塞。
3)對於多 consumer 的消息同步暫時擱置,是在外部程序完成調用的 stop。
4、代碼
1)阻塞隊列
// BlockQueue.h #pragma once #include <iostream> #include <queue> #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #define TASK_NUM 8 using namespace std; class CBlockQueue { private: mutex _mt; condition_variable _cv_con; condition_variable _cv_prod; queue<int> _tasks; atomic<bool> _stopped; const int _capacity; bool stopped() { return _stopped.load(); } bool empty() { return _tasks.size() == 0 ? true : false; } bool full() { return _tasks.size() == _capacity ? true : false; } public: CBlockQueue(); ~CBlockQueue(); void stop() { _stopped.store(true); _cv_con.notify_all(); } bool available() { return !stopped() || !empty(); } void push(const int &data); void pop(int &data); };
// BlockQueue.cpp #include "BlockQueue.h" CBlockQueue::CBlockQueue() : _capacity(TASK_NUM), _stopped(false) {} CBlockQueue::~CBlockQueue() { stop(); _cv_con.notify_all(); _cv_prod.notify_all(); } void CBlockQueue::push(const int &data) { unique_lock<mutex> _lck(_mt); while (full()) { _cv_con.notify_one(); // cout << "Task Queue is full, notify one consumer...\n"; _cv_prod.wait(_lck); } _tasks.push(data); _cv_con.notify_one(); } void CBlockQueue::pop(int &data) { unique_lock<mutex> _lck(_mt); while (empty()) { if (this->stopped()) return; _cv_prod.notify_one(); // cout << "Task Queue is empty, notify one producer...\n"; _cv_con.wait(_lck, [this]() { return this->stopped() || !this->empty(); }); } data = _tasks.front(); _tasks.pop(); _cv_prod.notify_one(); }
2)主函數
#include <iostream> #include "BlockQueue.h" #ifdef WIN32 #include <windows.h> #define sleep(x) (Sleep((x) * 1000)) #else #include <unistd.h> #endif CRITICAL_SECTION cs; // mutex mt_prod; void consumer(CBlockQueue *bq) { // CBlockQueue *bq = static_cast<CBlockQueue *>(arg); while (bq->available()) { int data = -1; bq->pop(data); EnterCriticalSection(&cs); cout << "<" << this_thread::get_id() << ">: " << data << " comsumed.\n"; LeaveCriticalSection(&cs); // sleep(0.5); } cout << "<" << this_thread::get_id() << ">: " << "consumer is done.\n"; } void producer(CBlockQueue *bq, int start, int maxNum) { // CBlockQueue *bq = static_cast<CBlockQueue *>(arg); // unique_lock<mutex> lck(mt_prod); int i = 0; while (i++ < maxNum) { // int data = rand() % 1024; int data = i + start; bq->push(data); EnterCriticalSection(&cs); cout << "[" << this_thread::get_id() << "]: " << data << " produced.\n"; LeaveCriticalSection(&cs); // sleep(0.2); } // if(start + i >= maxNum) bq->stop(); cout << "[" << this_thread::get_id() << "]: " << "producer is done.\n"; } int main() { CBlockQueue bqueue; InitializeCriticalSection(&cs); vector<thread> th_prods; const int num_prod = 3; for (int i = 0; i < num_prod; ++i) { th_prods.emplace_back(producer, &bqueue, i * 100, num_prod * 100); } vector<thread> th_cons; const int num_con = 3; for (int i = 0; i < num_con; ++i) { th_cons.emplace_back(consumer, &bqueue); } for (auto &t : th_prods) { t.join(); } bqueue.stop(); for (auto &t : th_cons) { t.join(); } DeleteCriticalSection(&cs); return 0; }
三、Reference
1、深入理解生產者消費者模型:
2、C++版本的生產者消費者模型: