1 #include <thread>
2 #include <iostream>
3 #include <chrono>
4 #include <string>
5 #include <mutex>
6 #include <deque>
7 #include <condition_variable>
8 #include <vector>
9 using namespace std; 10
11 int remain = 5; //控制5個生產者是否結束,當為0時,消費者退出等待。
12
13 /** 14 * 生產者類 15 * 16 * */
17 class producer{ 18 string name; 19 public: 20 thread* prothread; 21 producer(const string& Name ) : name(Name), prothread(nullptr){}; 22 ~producer(){ delete prothread; cout<< name << "銷毀" << endl; } 23 string getName(){ return name; } 24 void produce(deque<string>* task, mutex* m_mutex, condition_variable* m_cond); 25 void run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond); 26 }; 27
28 void producer::produce(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){ 29 using namespace std::chrono; 30 string s; 31 int i = 20; 32 while(i--){ 33 s+="0"; 34 this_thread::sleep_for(chrono::milliseconds(10)); 35 m_mutex->lock(); 36 task->push_back(s); 37 std::cout << name << " : " << s << std::endl; 38 m_mutex->unlock(); 39 m_cond->notify_one(); 40 } 41 m_mutex->lock(); 42 remain--; 43 if(remain == 0) m_cond->notify_all(); 44 m_mutex->unlock(); 45 } 46
47 void producer::run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){ 48 prothread = new thread(produce, this, task, m_mutex, m_cond); 49 } 50
51 /** 52 * 消費者類 53 * 54 * */
55 class consumer{ 56 string name; 57 public: 58 thread* conthread; 59 consumer(const string& Name ) : name(Name), conthread(nullptr){}; 60 ~consumer(){ delete conthread; cout<< name << "銷毀" << endl; } 61 string getName(){ return name; } 62 void consume(deque<string>* task, mutex* m_mutex, condition_variable* m_cond); 63 void run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond); 64 }; 65
66 void consumer::consume(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){ 67 using namespace std::chrono; 68 while(1){ 69 std::unique_lock<std::mutex> locker(*m_mutex); 70 while(task->size() == 0 && remain) 71 m_cond->wait(locker); 72 if(task->size() == 0) break; 73 this_thread::sleep_for(chrono::milliseconds(50)); 74 std::cout << name << " : " << task->front() << std::endl; 75 task->pop_front(); 76 locker.unlock(); 77 } 78 } 79
80 void consumer::run(deque<string>* task, mutex* m_mutex, condition_variable* m_cond){ 81 conthread = new thread(consume, this, task, m_mutex, m_cond); 82 } 83
84
85 int main() { 86 deque<string> task; //任務隊列
87 mutex m_mutex; //互斥量
88 condition_variable m_cond; //條件變量
89 vector<producer*> producers; 90 vector<consumer*> consumers; 91 for(int i = 0; i < 5; i++) 92 { 93 string name = "生產者"; 94 name += '0' + i; 95 producers.push_back(new producer(name)); 96 producers[i]->run(&task, &m_mutex, &m_cond); 97 } 98 for(int i = 0; i < 10; i++){ 99 string name = "消費者"; 100 name += '0' + i; 101 consumers.push_back(new consumer(name)); 102 consumers[i]->run(&task, &m_mutex, &m_cond); 103 } 104 for(int i = 0; i < 5; i++){ 105 producers[i]->prothread->join(); 106 } 107 for(int i = 0; i < 10; i++){ 108 consumers[i]->conthread->join(); 109 } 110 for(int i = 0; i < 5; i++){ 111 delete producers[i]; 112 } 113 for(int i = 0; i < 10; i++){ 114 delete consumers[i]; 115 } 116 getchar(); 117 return 0; 118 }