生產者-消費者問題是一個經典的進程同步問題,該問題最早由Dijkstra提出,用以演示他提出的信號量機制。在同一個進程地址空間內執行的兩個線程。生產者線程生產物品,然后將物品放置在一個空緩沖區中供消費者線程消費。消費者線程從緩沖區中獲得物品,然后釋放緩沖區。當生產者線程生產物品時,如果沒有空緩沖區可用,那么生產者線程必須等待消費者線程釋放出一個空緩沖區。當消費者線程消費物品時,如果沒有滿的緩沖區,那么消費者線程將被阻塞,直到新的物品被生產出來。
#include <windows.h> #include <iostream> const unsigned short SIZE_OF_BUFFER = 10; //緩沖區長度 unsigned short ProductID = 0; //產品號 unsigned short ConsumeID = 0; //將被消耗的產品號 unsigned short in = 0; //產品進緩沖區時的緩沖區下標 unsigned short out = 0; //產品出緩沖區時的緩沖區下標 int g_buffer[SIZE_OF_BUFFER]; //緩沖區是個循環隊列 bool g_continue = true; //控制程序結束 HANDLE g_hMutex; //用於線程間的互斥 HANDLE g_hFullSemaphore; //當緩沖區滿時迫使生產者等待 HANDLE g_hEmptySemaphore; //當緩沖區空時迫使消費者等待 DWORD WINAPI Producer(LPVOID); //生產者線程 DWORD WINAPI Consumer(LPVOID); //消費者線程 int main() { //創建各個互斥信號 g_hMutex = CreateMutex(NULL,FALSE,NULL); g_hFullSemaphore = CreateSemaphore(NULL,SIZE_OF_BUFFER-1,SIZE_OF_BUFFER-1,NULL); g_hEmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL); //調整下面的數值,可以發現,當生產者個數多於消費者個數時, //生產速度快,生產者經常等待消費者;反之,消費者經常等待 const unsigned short PRODUCERS_COUNT = 3; //生產者的個數 const unsigned short CONSUMERS_COUNT = 1; //消費者的個數 //總的線程數 const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT; HANDLE hThreads[PRODUCERS_COUNT]; //各線程的handle DWORD producerID[CONSUMERS_COUNT]; //生產者線程的標識符 DWORD consumerID[THREADS_COUNT]; //消費者線程的標識符 //創建生產者線程 for (int i=0;i <PRODUCERS_COUNT;++i){ hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]); if (hThreads[i]==NULL) return -1; } //創建消費者線程 for (int i=0;i <CONSUMERS_COUNT;++i){ hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]); if (hThreads[i]==NULL) return -1; } while(g_continue){ if(getchar()){ //按回車后終止程序運行 g_continue = false; } } return 0; } //生產一個產品。簡單模擬了一下,僅輸出新產品的ID號 void Produce() { std::cerr < < "Producing " < < ++ProductID < < " ... "; std::cerr < < "Succeed " < < std::endl; } //把新生產的產品放入緩沖區 void Append() { std::cerr < < "Appending a product ... "; g_buffer[in] = ProductID; in = (in+1)%SIZE_OF_BUFFER; std::cerr < < "Succeed " < < std::endl; //輸出緩沖區當前的狀態 for (int i=0;i <SIZE_OF_BUFFER;++i){ std::cout < < i < < ": " < < g_buffer[i]; if (i==in) std::cout < < " <-- 生產 "; if (i==out) std::cout < < " <-- 消費 "; std::cout < < std::endl; } } //從緩沖區中取出一個產品 void Take() { std::cerr < < "Taking a product ... "; ConsumeID = g_buffer[out]; out = (out+1)%SIZE_OF_BUFFER; std::cerr < < "Succeed " < < std::endl; //輸出緩沖區當前的狀態 for (int i=0;i <SIZE_OF_BUFFER;++i){ std::cout < < i < < ": " < < g_buffer[i]; if (i==in) std::cout < < " <-- 生產 "; if (i==out) std::cout < < " <-- 消費 "; std::cout < < std::endl; } } //消耗一個產品 void Consume() { std::cerr < < "Consuming " < < ConsumeID < < " ... "; std::cerr < < "Succeed " < < std::endl; } //生產者 DWORD WINAPI Producer(LPVOID lpPara) { while(g_continue){ WaitForSingleObject(g_hFullSemaphore,INFINITE); WaitForSingleObject(g_hMutex,INFINITE); Produce(); Append(); Sleep(1500); ReleaseMutex(g_hMutex); ReleaseSemaphore(g_hEmptySemaphore,1,NULL); } return 0; } //消費者 DWORD WINAPI Consumer(LPVOID lpPara) { while(g_continue){ WaitForSingleObject(g_hEmptySemaphore,INFINITE); WaitForSingleObject(g_hMutex,INFINITE); Take(); Consume(); Sleep(1500); ReleaseMutex(g_hMutex); ReleaseSemaphore(g_hFullSemaphore,1,NULL); } return 0; }
