進程同步:生產者消費者模型 以及解決方法


背景

生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞。

為什么要使用生產者和消費者模式:

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題,於是引入了生產者和消費者模式。

生產者——消費者模型 會遇到的問題

生產者——消費者模型中,生產者和消費者線程之間需要傳遞一定量的數據,兩個線程會使用一個特定大小的共享環形緩沖器。

生產者向緩沖器中寫入數據,直到它到達緩沖器的終點;然后它會再次從起點重新開始,覆蓋已經存在的數據。消費者線程則會讀取生成的數據。

在生產者——消費者實例中,對於同步的需求有兩個部分:
1.如果生產者線程生成數據的速度太快,那么將會把消費者線程還沒有讀取的數據覆蓋;
2.如果消費者線程讀取數據的速度過快,那么它就會越過生產者線程而讀取一些垃圾數據。

生產者——消費者模型 解決方法:

容易導致死鎖的實現:

讓生產者線程填滿緩沖器,然后等待消費者線程讀取完緩沖器中全部數據。

使用2個信號量 解決 單消費者單生產者的問題

使用兩個信號量:freeSpace 與 usedSpace

freeSpace 信號量控制生產者線程寫入數據的那部分緩沖器, usedSpace 信號量則控制消費者線程讀取數據的那部分緩沖器區域。
這兩個區域是相互補充的。

常用緩沖區容量值初始化 freeSpace 信號量,意味着它最多可以獲取的緩沖器資源量。
在啟動這個應用程序時,消費者線程就會獲得自由的字節並把它們轉換為用過的字節。
用0初始化usedSpace信號量,以確保消費者線程不會在一開始就讀取到垃圾數據。

在生產者線程中,每次反復寫入都是從獲取一個 freeSpace 開始。
如果該緩沖器中充滿了消費者線程還沒有讀取的數據,那么對acquire()的調用就會被阻塞,直到消費者線程開始消費這些數據。
一旦生產者線程獲取這一字節,就寫入數據,並將這個字節釋放為 usedSpace ,以讓消費者線程讀取到。

在消費者線程中,我們從獲取一個 usedSpace 開始。
如果緩沖器中還沒有任何可用的數據,那么將會阻塞對acquire()調用,直到生產者線程生產數據。
一旦獲取到這個字節,就使用數據,並把字節釋放為 freeSpace ,這樣,生產者線程就可以再次寫入。

上述方法在只有一個生產者和一個消費者時能解決問題。對於多個生產者或者多個消費者共享緩沖區的情況,該算法也會導致競爭條件,出現兩個或以上的進程同時讀或寫同一個緩沖區槽的情況。

解決多生產者、多消費者 的問題

為了保證同一時刻只有一個生產者能夠執行 putItemIntoBuffer()。也就是說,需要尋找一種方法來互斥地執行臨界區的代碼。為了達到這個目的,可引入一個二值信號燈 mutex,其值只能為 1 或者 0。如果把線程放入 down(mutex) 和 up(mutex) 之間,就可以限制只有一個線程能被執行。


#include <QtCore/QCoreApplication>
#include <QThread>
#include <QSemaphore>
#include <QDebug>

const int SIZE = 5;
static unsigned char g_buff[SIZE] = {0};
static QSemaphore g_sem_free(SIZE);
static QSemaphore g_sem_used(0);
static QSemaphore mutex(1);

class Producer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            int value = qrand() % 256;

            g_sem_free.acquire();
            mutex.acquire();

            for(int i=0; i<SIZE; i++)
            {
                if( !g_buff[i] )
                {
                    g_buff[i] = (unsigned char)value;

                    qDebug() << objectName() << " generate: {" << i << ", " << value << "}";

                    break;
                }
            }
            mutex.release();

            g_sem_used.release();

            sleep(2);
        }
    }
};

class Customer : public QThread
{
protected:
    void run()
    {
        while( true )
        {
            g_sem_used.acquire();
            mutex.acquire();

            for(int i=0; i<SIZE; i++)
            {
                if( g_buff[i] )
                {
                    int value = g_buff[i];

                    g_buff[i] = 0;

                    qDebug() << objectName() << " consume: {" << i << ", " << value << "}";

                    break;
                }
            }
            mutex.release();

            g_sem_free.release();

            sleep(1);
        }
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    Producer p1;
    Producer p2;
    Producer p3;

    p1.setObjectName("p1");
    p2.setObjectName("p2");
    p3.setObjectName("p3");

    Customer c1;
    Customer c2;

    c1.setObjectName("c1");
    c2.setObjectName("c2");

    p1.start();
    p2.start();
    p3.start();

    c1.start();
    c2.start();

    return a.exec();
}

使用 互斥量 + 條件變量 解決 多生產者多消費者的問題

#include <QtCore/QCoreApplication>
#include <QWaitCondition>
#include <QThread>
#include <QMutex>
#include <iostream>

const int DataSize = 100;
const int BufferSize = 1;
// static char buffer[BufferSize];

static QWaitCondition bufferIsNotFull;
static QWaitCondition bufferIsNotEmpty;
static QMutex mutex;
static int usedSpace;

class Producer : public QThread
{
protected:
    void run()
    {
        for (int i = 0; i < DataSize; ++i)
        {
            mutex.lock();
            while (usedSpace == BufferSize)
            {
                bufferIsNotFull.wait(&mutex);
            }
            std::cerr<<"P";
            ++usedSpace;
            bufferIsNotEmpty.wakeAll();
            mutex.unlock();
        }
    }
};

class Consumer : public QThread
{
protected:
    void run()
    {
        for (int i = 0; i < DataSize; ++i)
        {
            mutex.lock();
            while (usedSpace == 0)
            {
                bufferIsNotEmpty.wait(&mutex);
            }
            std::cerr<<"C";
            --usedSpace;
            bufferIsNotFull.wakeAll();
            mutex.unlock();
        }
        std::cerr<<std::endl;
    }
};

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Producer producer;
    Consumer consumer;
    producer.start();
    consumer.start();
    producer.wait();
    consumer.wait();
    return a.exec();
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM