C++11 實現生產者消費者雙緩沖


基礎的生產者消費者模型,生產者向公共緩存區寫入數據,消費者從公共緩存區讀取數據進行處理,兩個線程訪問公共資源,加鎖實現數據的一致性。

通過加鎖來實現

 1 class Produce_1 {  2 public:  3     Produce_1(std::queue<int> * que_, std::mutex * mt_) {  4         m_mt = mt_;  5         m_que = que_;  6         m_stop = false;  7  }  8     void runProduce() {  9         while (!m_stop) { 10             std::this_thread::sleep_for(std::chrono::seconds(1)); 11             std::lock_guard<std::mutex> lgd(*m_mt); 12             m_que->push(1); 13             std::cout << "Produce_1 produce 1" << std::endl; 14  } 15  } 16     void join() { 17         m_trd->join(); 18  m_trd.reset(); 19  } 20     void start() { 21         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this))); 22  } 23     void stop() { 24         m_stop = true; 25  } 26 private: 27     std::mutex * m_mt; 28     std::queue<int> * m_que; 29     volatile bool m_stop; 30     std::shared_ptr<std::thread> m_trd; 31 }; 32 
33 
34 /*
35 *單緩沖一個同步隊列 效率較低 36 */
37 class Consume_1 { 38 public: 39     Consume_1(std::queue<int> * que_, std::mutex * mt_) { 40         m_mt = mt_; 41         m_que = que_; 42         m_stop = false; 43  } 44 
45     void runConsume() { 46         while (!m_stop) { 47             std::this_thread::sleep_for(std::chrono::seconds(1)); 48             std::lock_guard<std::mutex> lgd(*m_mt); 49             if (!m_que->empty()) { 50                 m_que->pop(); 51  } 52             std::cout << "Consume_1 consume" << std::endl; 53  } 54  } 55     void join() { 56         m_trd->join(); 57  m_trd.reset(); 58  } 59     void start() { 60         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this))); 61  } 62     void stop() { 63         m_stop = true; 64  } 65 private: 66     std::mutex * m_mt; 67     std::queue<int> * m_que; 68     volatile bool m_stop; 69     std::shared_ptr<std::thread> m_trd; 70 };

 

通過條件變量來實現

 1 typedef struct Mutex_Condition{  2  std::mutex mt;  3  std::condition_variable cv;  4 }Mutex_Condition;  5 
 6 class Produce {  7 public:  8     Produce(std::queue<int> * que_, Mutex_Condition * mc_) {  9         m_que = que_; 10         m_mc = mc_; 11         m_stop = false; 12  } 13     void join() { 14         m_trd->join(); 15  m_trd.reset(); 16  } 17     void produce(int enter) { 18         std::lock_guard<std::mutex> lgd(m_mc->mt); 19         m_que->push(enter); 20         m_mc->cv.notify_one(); 21  } 22 
23     void runProduce() { 24         while (!m_stop) { 25             std::this_thread::sleep_for(std::chrono::seconds(1)); 26             produce(1); 27             std::cout << "Produce Thread produce 1 " << std::endl; 28  } 29  } 30 
31     void start() { 32         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce::runProduce), this))); 33  } 34     void stop() { 35         m_stop = true; 36  } 37 
38 private: 39     std::queue<int> * m_que; 40     Mutex_Condition * m_mc; 41     std::shared_ptr<std::thread> m_trd; 42     volatile bool m_stop; 43 }; 44 
45 
46 class Consume { 47 public: 48     Consume(std::queue<int> * que_, Mutex_Condition * mc_) { 49         m_que = que_; 50         m_mc = mc_; 51         m_stop = false; 52  } 53     void join() { 54         m_trd->join(); 55  m_trd.reset(); 56  } 57     void consume() { 58         std::unique_lock<std::mutex> lgd(m_mc->mt); 59         while (m_que->empty()) { 60             int i = 0; 61             m_mc->cv.wait(lgd); 62  } 63         m_que->pop(); 64         std::cout << "Consume Thread consume " << std::endl; 65  } 66     void runConsume() { 67         while (!m_stop) { 68             std::this_thread::sleep_for(std::chrono::seconds(1)); 69  consume(); 70  } 71  } 72     void start() { 73         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume::runConsume), this))); 74  } 75     void stop() { 76         m_stop = true; 77  } 78 
79 private: 80     std::queue<int> * m_que; 81     Mutex_Condition * m_mc; 82     std::shared_ptr<std::thread> m_trd; 83     volatile bool m_stop; 84 
85 };

 

二、生產者消費者-雙緩沖

一個公共緩存區,由於多線程訪問的鎖沖突較大,可以采取雙緩沖手段來解決鎖的沖突

雙緩沖的關鍵:雙緩沖隊列的數據交換

1)生產者線程不斷的向生產者隊列A寫入數據,當隊列中有數據時,進行數據的交換,交換開始啟動時通過條件變量通知交換線程來處理最先的數據交換。

2)數據交換完成后,通過條件變量通知消費者處理數據,此時交換線程阻塞到消費者數據處理完成時通知的條件變量上。

3)消費者收到數據交換后的通知后,進行數據的處理,數據處理完成后,通知交換線程進行下一輪的雙緩沖區的數據交換。

要點:

生產者除了在數據交換時,其余時刻都在不停的生產數據。

數據交換隊列需要等待消費者處理數據完成的通知,以進行下一輪交換。

消費者處理數據時,不進行數據交換,生產者同時會不斷的生產數據,消費者需要等待數據交換完成的通知,並且發送消費完成的通知給交換線程

 

 使用條件變量的版本實現

 

  1 typedef struct Mutex_Condition{
  2     std::mutex mt;
  3     std::condition_variable cv;
  4 }Mutex_Condition;
  5 
  6 class Produce_1 {
  7 public:
  8     Produce_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1 , Mutex_Condition * mc_2) {
  9         m_read_que   = que_1;
 10         m_writer_que = que_2;
 11         m_read_mc    = mc_1;
 12         m_writer_mc  = mc_2;
 13         m_stop       = false;
 14 
 15     }
 16     void runProduce() {
 17         while (!m_stop) {
 18             std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000));
 19             std::lock_guard<std::mutex> lgd(m_writer_mc->mt);
 20             m_writer_que->push(1);
 21             m_writer_mc->cv.notify_one();
 22             std::cout << "m_writer push" << std::endl;
 23         }
 24         
 25     }
 26     void join() {
 27         m_trd->join();
 28         m_trd.reset();
 29     }
 30     void start() {
 31         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this)));
 32     }
 33     void stop() {
 34         m_stop = true;
 35     }
 36 private:
 37     Mutex_Condition * m_read_mc;
 38     Mutex_Condition * m_writer_mc;
 39     std::queue<int> * m_read_que;
 40     std::queue<int> * m_writer_que;
 41     volatile bool m_stop;
 42     std::shared_ptr<std::thread> m_trd;
 43 };
 44 
 45 
 46 class Consume_1 {
 47 public:
 48     Consume_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1,Mutex_Condition * mc_2,Mutex_Condition * switch_mc) {
 49         m_read_que    = que_1;
 50         m_writer_que  = que_2;
 51         m_read_mc     = mc_1;
 52         m_writer_mc   = mc_2;
 53         m_stop        = false;
 54         m_switch_mc = switch_mc;
 55     }
 56 
 57     void runConsume() {
 58         while (!m_stop) {
 59             while (true) {
 60                 std::unique_lock<std::mutex> ulg(m_read_mc->mt);
 61                 while (m_read_que->empty()) {
 62                     m_read_mc->cv.wait(ulg);
 63                 }
 64                 //deal data
 65                 //std::lock_guard<std::mutex> ulg(m_read_mc->mt);
 66                 while (!m_read_que->empty()) {
 67                     m_read_que->pop();
 68                     std::cout << "m_read_queue pop" << std::endl;
 69                 }
 70                 m_switch_mc->cv.notify_one();
 71             }
 72         }
 73     }
 74     void join() {
 75         m_trd->join();
 76         m_trd.reset();
 77     }
 78     void start() {
 79         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this)));
 80     }
 81     void stop() {
 82         m_stop = true;
 83     }
 84 private:
 85     Mutex_Condition * m_read_mc;
 86     Mutex_Condition * m_writer_mc;
 87     Mutex_Condition * m_switch_mc;
 88     std::queue<int> * m_read_que;
 89     std::queue<int> * m_writer_que;
 90     volatile bool m_stop;
 91     std::shared_ptr<std::thread> m_trd;
 92 };
 93 void que_switch_trd(std::queue<int> * read_que, std::queue<int> * writer_que, Mutex_Condition * read_mc, Mutex_Condition * writer_mc,Mutex_Condition * switch_mc) {
 94     while (true) {
 95         {
 96             std::unique_lock<std::mutex> ulg(writer_mc->mt);
 97             while (writer_que->empty()) {
 98                 writer_mc->cv.wait(ulg);
 99             }
100             std::lock_guard<std::mutex> ulg_2(read_mc->mt);
101             std::swap(*read_que, *writer_que);
102             std::cout << "switch queue" << std::endl;
103             if (!read_que->empty()) {
104                 read_mc->cv.notify_one();
105             }
106         }
107         std::unique_lock<std::mutex> ulg_2(switch_mc->mt);
108         while (!read_que->empty()) {
109             switch_mc->cv.wait(ulg_2);
110         }
111     }
112 }
113 int main(){
114 
115     Mutex_Condition mc_1;
116     Mutex_Condition mc_2;
117     Mutex_Condition mc_3;
118     std::queue<int> que_1;
119     std::queue<int> que_2;
120 
121     Produce_1 produce_1(&que_1, &que_2, &mc_1, &mc_2);
122     Consume_1 consume_1(&que_1, &que_2, &mc_1, &mc_2,&mc_3);
123 
124     std::thread trd(std::bind(&que_switch_trd, &que_1, &que_2, &mc_1, &mc_2,&mc_3));
125     produce_1.start();
126     consume_1.start();
127     
128     produce_1.join();
129     consume_1.join();
130     trd.join();
131 
132     return 0;
133 }

 

 

 

使用互斥鎖的實現

 1 #include<mutex>
 2 #include<thread>
 3 #include<queue>
 4 #include<iostream>
 5 #include<chrono>
 6 
 7 class DBQueue{
 8 public:
 9     void push(int i_) {
10         std::lock_guard<std::mutex> lock(m_mt);
11         std::cout << "write_que push " << i_ << std::endl;
12         m_write_que.push(i_);
13     }
14     void swap(std::queue<int> & read_que) {
15         std::lock_guard<std::mutex> lock(m_mt);
16         std::swap(m_write_que,read_que);
17         std::cout << "switch swap" << std::endl;
18     }
19 private:
20     std::queue<int> m_write_que;
21     std::mutex m_mt;
22 };
23 void produce(DBQueue * que) {
24     while (true) {
25         std::this_thread::sleep_for(std::chrono::microseconds(20*1000));
26         que->push(1);
27     }
28 }
29 void consume(DBQueue * que) {
30     std::queue<int> read_que;
31     while (true) {
32         std::this_thread::sleep_for(std::chrono::microseconds(20*1000));
33         if (read_que.empty()) {
34             que->swap(read_que);
35             //xxoo
36             while (!read_que.empty()) {
37                 std::cout << "read_que pop" << std::endl;
38                 read_que.pop();
39             }
40         }
41     }
42 }
43 int main()
44 {
45     DBQueue que;
46     std::thread trd_1(std::bind(&produce, &que));
47     std::thread trd_2(std::bind(&consume, &que));
48     trd_1.join();
49     trd_2.join();
50     return 0;
51 }

 兩個版本的區別 sleep的區別,sleep處理的時效性較差,不加sleep,cpu占用率又比較高,所以條件變量是比較好的選擇。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM