1. 介紹
生產者消費者問題屬於有界緩沖區問題。我們現在講述多個生產者向一個緩沖區中存入數據,多個生產者從緩沖區中取數據。
共享緩沖區作為一個環繞緩沖區,存數據到頭時再從頭開始。
2. 實現
我們使用一個互斥量保護生產者向緩沖區中存入數據。
由於有多個生產者,因此需要記住現在向緩沖區中存入的位置。
使用一個互斥量保護緩沖區中消息的數目,這個生產的數據數目作為生產者和消費者溝通的橋梁。
使用一個條件變量用於喚醒消費者。由於有多個消費者,同樣消費者也需要記住每次取的位置。
4.代碼
在選項中選擇生產條目的數目,生產者的線程數目,消費者的線程數目。
生產者將條目數目循環放入緩沖區中,消費者從緩沖區中循環取出並在屏幕上打印出來。
#include "unp.h"
/* * 多個生產者——多個消費者 * 使用條件變量和互斥鎖的演示 */
static const int NBUFF = 10000; static const int MAXNTHREADS = 100; //總共生產的條目數
static int nitems; //生產者向其中放數據,消費者從中取數據
static int buff[NBUFF]; //生產者使用的結構 //向其中互斥的放數據
static struct put { pthread_mutex_t mutex; int nput; //net position to put
int nval; //next value to store
} put = { PTHREAD_MUTEX_INITIALIZER }; //記錄緩沖區的狀態 //准備好的數目 //消費者唯一關注的結構 //當然生產者也會使用
static struct nready { pthread_mutex_t mutex; pthread_cond_t cond; int nget; int nready; //number ready for consumer
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER }; void *produce(void*); void *consume(void*); int main(int argc, char **argv) { if (argc != 4) { err_quit("Usage: a.out <#items> <#produce_nthreads> <#consume_nthreads>"); } nitems = atoi(argv[1]); int produce_nthreads = min(atoi(argv[2]), MAXNTHREADS); int consume_nthreads = min(atoi(argv[3]), MAXNTHREADS); //Solaris 2.6需要設置線程並發數 // Set_concurrency(nthreads + 1);
pthread_t tid_produce[MAXNTHREADS]; for (int i = 0; i < produce_nthreads; ++i) { Pthread_create(&tid_produce[i], NULL, produce, NULL); } pthread_t tid_consume[MAXNTHREADS]; for (int i = 0; i < consume_nthreads; ++i) { Pthread_create(&tid_consume[i], NULL, consume, NULL); } //等待線程終止
for (int i = 0; i < produce_nthreads; ++i) { Pthread_join(tid_produce[i], NULL); } for (int i = 0; i < consume_nthreads; ++i) { Pthread_join(tid_consume[i], NULL); } exit(0); } void *produce(void *arg) { printf("producd\n"); //多個生產者
for ( ; ; ) { Pthread_mutex_lock(&put.mutex); //已存了需要多的數
if (put.nval >= nitems) { Pthread_mutex_unlock(&put.mutex); return NULL; } buff[put.nput] = put.nval; if (++put.nput >= NBUFF) { put.nput = 0; } ++put.nval; Pthread_mutex_unlock(&put.mutex); //當生產了數據后通知條件變量 //應該使臨界區盡量短,寧願使用多個互斥量
Pthread_mutex_lock(&nready.mutex); if (nready.nready == 0) { Pthread_cond_signal(&nready.cond); } ++nready.nready; Pthread_mutex_unlock(&nready.mutex); } //end for(;;)
return NULL; } void *consume(void *argv) { printf("consume\n"); //多個消費者 //只生產nitems個選項
for ( ; ; ) { Pthread_mutex_lock(&nready.mutex); //while避免虛假喚醒
while (nready.nready == 0) { Pthread_cond_wait(&nready.cond, &nready.mutex); } //int ival = buff[nready.nget]; //if (++nready.nget == NBUFF) { // nready.nget = 0; //}
if (++nready.nget >= nitems) { //nget比較的取值為1..nitems //當為nitems時少操作了一次, //總共操作nitems次
if (nready.nget == nitems) { printf("buff[%d] = %d\n", nready.nget - 1, buff[(nready.nget - 1) % NBUFF]); } Pthread_cond_signal(&nready.cond); Pthread_mutex_unlock(&nready.mutex); return NULL; } --nready.nready; Pthread_mutex_unlock(&nready.mutex); //僅僅讀數據不許要互斥 // if (buff[nready.nget - 1] != nready.nget - 1) {
printf("buff[%d] = %d\n", nready.nget - 1, buff[(nready.nget - 1) % NBUFF]); //printf("buff[%d] = %d\n", nready.nget, buff[nready.nget]); // }
} //end for(i:0..nitems)
return NULL; }