背景
生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞。
為什么要使用生產者和消費者模式:
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題,於是引入了生產者和消費者模式。
生產者——消費者模型 會遇到的問題
生產者——消費者模型中,生產者和消費者線程之間需要傳遞一定量的數據,兩個線程會使用一個特定大小的共享環形緩沖器。
生產者向緩沖器中寫入數據,直到它到達緩沖器的終點;然后它會再次從起點重新開始,覆蓋已經存在的數據。消費者線程則會讀取生成的數據。
在生產者——消費者實例中,對於同步的需求有兩個部分:
1.如果生產者線程生成數據的速度太快,那么將會把消費者線程還沒有讀取的數據覆蓋;
2.如果消費者線程讀取數據的速度過快,那么它就會越過生產者線程而讀取一些垃圾數據。
生產者——消費者模型 解決方法:
容易導致死鎖的實現:
讓生產者線程填滿緩沖器,然后等待消費者線程讀取完緩沖器中全部數據。
使用2個信號量 解決 單消費者單生產者的問題
使用兩個信號量:freeSpace 與 usedSpace
freeSpace 信號量控制生產者線程寫入數據的那部分緩沖器, usedSpace 信號量則控制消費者線程讀取數據的那部分緩沖器區域。
這兩個區域是相互補充的。
常用緩沖區容量值初始化 freeSpace 信號量,意味着它最多可以獲取的緩沖器資源量。
在啟動這個應用程序時,消費者線程就會獲得自由的字節並把它們轉換為用過的字節。
用0初始化usedSpace信號量,以確保消費者線程不會在一開始就讀取到垃圾數據。
在生產者線程中,每次反復寫入都是從獲取一個 freeSpace 開始。
如果該緩沖器中充滿了消費者線程還沒有讀取的數據,那么對acquire()的調用就會被阻塞,直到消費者線程開始消費這些數據。
一旦生產者線程獲取這一字節,就寫入數據,並將這個字節釋放為 usedSpace ,以讓消費者線程讀取到。
在消費者線程中,我們從獲取一個 usedSpace 開始。
如果緩沖器中還沒有任何可用的數據,那么將會阻塞對acquire()調用,直到生產者線程生產數據。
一旦獲取到這個字節,就使用數據,並把字節釋放為 freeSpace ,這樣,生產者線程就可以再次寫入。
上述方法在只有一個生產者和一個消費者時能解決問題。對於多個生產者或者多個消費者共享緩沖區的情況,該算法也會導致競爭條件,出現兩個或以上的進程同時讀或寫同一個緩沖區槽的情況。
解決多生產者、多消費者 的問題
為了保證同一時刻只有一個生產者能夠執行 putItemIntoBuffer()。也就是說,需要尋找一種方法來互斥地執行臨界區的代碼。為了達到這個目的,可引入一個二值信號燈 mutex,其值只能為 1 或者 0。如果把線程放入 down(mutex) 和 up(mutex) 之間,就可以限制只有一個線程能被執行。
#include <QtCore/QCoreApplication>
#include <QThread>
#include <QSemaphore>
#include <QDebug>
const int SIZE = 5;
static unsigned char g_buff[SIZE] = {0};
static QSemaphore g_sem_free(SIZE);
static QSemaphore g_sem_used(0);
static QSemaphore mutex(1);
class Producer : public QThread
{
protected:
void run()
{
while( true )
{
int value = qrand() % 256;
g_sem_free.acquire();
mutex.acquire();
for(int i=0; i<SIZE; i++)
{
if( !g_buff[i] )
{
g_buff[i] = (unsigned char)value;
qDebug() << objectName() << " generate: {" << i << ", " << value << "}";
break;
}
}
mutex.release();
g_sem_used.release();
sleep(2);
}
}
};
class Customer : public QThread
{
protected:
void run()
{
while( true )
{
g_sem_used.acquire();
mutex.acquire();
for(int i=0; i<SIZE; i++)
{
if( g_buff[i] )
{
int value = g_buff[i];
g_buff[i] = 0;
qDebug() << objectName() << " consume: {" << i << ", " << value << "}";
break;
}
}
mutex.release();
g_sem_free.release();
sleep(1);
}
}
};
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
Producer p1;
Producer p2;
Producer p3;
p1.setObjectName("p1");
p2.setObjectName("p2");
p3.setObjectName("p3");
Customer c1;
Customer c2;
c1.setObjectName("c1");
c2.setObjectName("c2");
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
return a.exec();
}
使用 互斥量 + 條件變量 解決 多生產者多消費者的問題
#include <QtCore/QCoreApplication>
#include <QWaitCondition>
#include <QThread>
#include <QMutex>
#include <iostream>
const int DataSize = 100;
const int BufferSize = 1;
// static char buffer[BufferSize];
static QWaitCondition bufferIsNotFull;
static QWaitCondition bufferIsNotEmpty;
static QMutex mutex;
static int usedSpace;
class Producer : public QThread
{
protected:
void run()
{
for (int i = 0; i < DataSize; ++i)
{
mutex.lock();
while (usedSpace == BufferSize)
{
bufferIsNotFull.wait(&mutex);
}
std::cerr<<"P";
++usedSpace;
bufferIsNotEmpty.wakeAll();
mutex.unlock();
}
}
};
class Consumer : public QThread
{
protected:
void run()
{
for (int i = 0; i < DataSize; ++i)
{
mutex.lock();
while (usedSpace == 0)
{
bufferIsNotEmpty.wait(&mutex);
}
std::cerr<<"C";
--usedSpace;
bufferIsNotFull.wakeAll();
mutex.unlock();
}
std::cerr<<std::endl;
}
};
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
Producer producer;
Consumer consumer;
producer.start();
consumer.start();
producer.wait();
consumer.wait();
return a.exec();
}