簡介
使用條件變量,信號量,兩種示例方式去實現生產者和消費者模型
1、條件變量 QWaitCondition
1 #ifndef MUTEXWAITCONDITION 2 #define MUTEXWAITCONDITION 3 4 #include <QCoreApplication> 5 #include <iostream> 6 #include <QThread> 7 #include <QMutex> 8 #include <QWaitCondition> 9 10 using namespace std; 11 12 class MutexWaitCondition { 13 public: 14 //預計生產(或消費)數量 15 int loopCount; 16 //當前產品數量 17 int product; 18 //倉庫能容納的最大產品數量 19 int capacity; 20 21 QMutex mutex; 22 /* 23 The QWaitCondition class provides a condition variable for synchronizing threads 24 QWaitCondition類為線程同步提供了一個條件變量 25 */ 26 QWaitCondition productIsNotFull; 27 QWaitCondition productIsNotEmpty; 28 29 //生產者 30 class Producer : public QThread { 31 public: 32 Producer(MutexWaitCondition *manager) : QThread() { 33 this->manager = manager; 34 } 35 protected: 36 void run() { 37 for(int i=0; i<manager->loopCount; i++) { 38 manager->mutex.lock(); 39 while(manager->product == manager->capacity) { 40 /* 41 bool QWaitCondition::wait(QReadWriteLock * lockedReadWriteLock, unsigned long time = ULONG_MAX) 42 Releases the lockedReadWriteLock and waits on the wait condition 43 釋放該鎖,並且阻塞當前線程,直到條件滿足(即調用wake方法被喚醒) 44 */ 45 manager->productIsNotFull.wait(&manager->mutex); 46 } 47 manager->product++; //當前產品數量++之前產品就要入隊 48 //cout<<"P"; 49 cout<<i<<".P="<<manager->product<<", "; 50 manager->productIsNotEmpty.wakeAll(); 51 manager->mutex.unlock(); 52 } 53 } 54 private: 55 MutexWaitCondition *manager; 56 }; 57 58 //消費者 59 class Consumer : public QThread { 60 public: 61 Consumer(MutexWaitCondition *manager) : QThread() { 62 this->manager = manager; 63 } 64 protected: 65 void run() { 66 for(int i=0; i<manager->loopCount; i++) { 67 manager->mutex.lock(); 68 while(manager->product == 0) { 69 manager->productIsNotEmpty.wait(&manager->mutex); 70 } 71 manager->product--; //當前產品數量--之前產品就要出隊 72 //cout<<"C"; 73 cout<<i<<".C="<<manager->product<<", "; 74 manager->productIsNotFull.wakeAll(); 75 manager->mutex.unlock(); 76 } 77 } 78 private: 79 MutexWaitCondition *manager; 80 }; 81 82 //無修飾的方法,默認是private的 83 public: 84 void test(int loopCount, int capacity) 85 { 86 this->loopCount = loopCount; 87 this->capacity = capacity; 88 this->product = 0; 89 90 Producer producer(this); 91 Consumer consumer(this); 92 //thread.start會調用thread內部的run方法 93 producer.start(); 94 consumer.start(); 95 /* 96 Blocks the thread until either of these conditions is met: 97 阻塞該線程直到所有條件都滿足 98 */ 99 producer.wait(); 100 consumer.wait(); 101 cout<<endl; 102 } 103 }; 104 #endif // MUTEXWAITCONDITION
2、信號量 QSemaphore
1 #ifndef SEMAPHOREMUTEX 2 #define SEMAPHOREMUTEX 3 4 #include <QCoreApplication> 5 #include <iostream> 6 #include <QThread> 7 #include <QSemaphore> 8 //注意:網上很多Semaphore不用mutex的做法都是錯誤的 9 #include <QMutex> 10 11 using namespace std; 12 13 class SemaphoreMutex { 14 public: 15 //預計生產(或消費)數量 16 int loopCount; 17 //當前產品數量: productSemphore.avaliable() 18 //int product; 19 //倉庫能容納的最大產品數量 20 int capacity; 21 22 QMutex mutex; 23 /* 24 The QSemaphore class provides a general counting semaphore 25 QSemaphore類提供了一個通用的計數信號量 26 */ 27 QSemaphore *productSemphore; 28 QSemaphore *leftSpaceSemaphore; 29 30 //生產者 31 class Producer : public QThread { 32 public: 33 Producer(SemaphoreMutex *manager) : QThread() { 34 this->manager = manager; 35 } 36 protected: 37 void run() { 38 for(int i=0; i<manager->loopCount; i++) { 39 /* 40 Tries to acquire n resources guarded by the semaphore. If n > available(), this call will block until enough resources are available. 41 嘗試去獲取(減去)n個被信號量控制的資源。如果n>可用資源數量,它就會阻塞直到有足夠的資源為止。 42 */ 43 manager->leftSpaceSemaphore->acquire(); 44 //之所以lock要在acquire后面是因為: 如果消費者拿到了鎖,那么又沒有商品,那么久會導致死鎖 45 manager->mutex.lock(); //加鎖之后產品就要入隊,然后再release 46 manager->productSemphore->release(); 47 //cout<<"P"; 48 cout<<i<<".P="<<manager->productSemphore->available()<<", "; 49 manager->mutex.unlock(); 50 } 51 } 52 private: 53 SemaphoreMutex *manager; 54 }; 55 56 //消費者 57 class Consumer : public QThread { 58 public: 59 Consumer(SemaphoreMutex *manager) : QThread() { 60 this->manager = manager; 61 } 62 protected: 63 void run() { 64 for(int i=0; i<manager->loopCount; i++) { 65 manager->productSemphore->acquire(); 66 manager->mutex.lock(); //加鎖之后產品就要出隊,然后再release 67 manager->leftSpaceSemaphore->release(); 68 //cout<<"C"; 69 cout<<i<<".C="<<manager->productSemphore->available()<<", "; 70 manager->mutex.unlock(); 71 } 72 } 73 private: 74 SemaphoreMutex *manager; 75 }; 76 77 //無修飾的方法,默認是private的 78 public: 79 void test(int loopCount, int capacity) 80 { 81 this->loopCount = loopCount; 82 this->capacity = capacity; 83 84 //參數為: 信號量的當前值 85 productSemphore = new QSemaphore(0); 86 leftSpaceSemaphore = new QSemaphore(capacity); 87 88 Producer producer(this); 89 Consumer consumer(this); 90 //thread.start會調用thread內部的run方法 91 producer.start(); 92 consumer.start(); 93 /* 94 Blocks the thread until either of these conditions is met: 95 阻塞該線程直到所有條件都滿足 96 */ 97 producer.wait(); 98 consumer.wait(); 99 cout<<endl; 100 } 101 }; 102 103 #endif // SEMAPHOREMUTEX
