管程(monitor)說明
在並發編程中,管程(monitor)是一個同步構件,管程實現了同一時間點,最多只有一個線程可以執行管程的某個子程序。與那些通過修改數據結構實現互斥訪問的並發程序設計相比,管程的實現很大程度上簡化了程序設計。
管程可以確保一次只有一個進程執行管程中的程序,因此程序員不需要顯式地編寫同步代碼,但是如果需要就某些特定條件上的同步,則需要定義一些條件結構(condition variable)來實現,並且對條件變量的操作僅有wait()和signal(),如下:
condition x, y;
x.wait();
...
x.signal();
調用x.wait()操作可能會使得一個進程掛起,直到另一個進程調用x.signal()操作。與信號量中的signal()操作相比,管程中如果在沒有任何進程掛起的情況下調用signal()沒有任何作用,而在信號量中,則必然會改變信號量的狀態。
一個管程(mointor)的示意圖如下所示:
一個mointor中的程序運行前必須首先獲取mutex,直至程序運行完成或者線程等待的某個條件發生時才釋放mutex。當一個線程執行mointor中的一個子程序時,稱為占用(occupy)該mointor,因此必須等到沒有其他線程執行管程程序時方可調用管程程序,這是互斥保證。在管程的簡單實現中,編譯器為每個管程對象自動加入一把私有的mutex(lock),初始狀態為unlock,管程中的每個對象入口處執行lock操作,出口處執行unlock操作。
因此設計monitor時至少必須包含mutex(lock) object(互斥量)和condition variables(條件變量)。一個條件變量可以看作是等待該條件發生的線程集合。
注:monitor也稱為<線程安全對象/類/模塊>。
條件變量
為何需要條件變量?
考慮如下一個busy waiting loop:
while not(P)
do skip
如果僅有mutex,則線程必須等待P為真時才能繼續執行。如此,將會導致其他線程無法進入臨界區使得條件P為真,因此該管程可能發生死鎖。
可以用條件變量解決。一個條件變量C可以看作是一個線程隊列,其中存放的線程正在等待與之關聯的條件變為真。當一個線程等待一個條件變量C時,其將mutex釋放,然后其他線程就可以進入該管程中,通過改變C的值可以使得條件C滿足,因此對條件變量C可以有如下操作:
(1)wait(c, m):線程調用該操作,等待條件C滿足后繼續執行,在等待過程中,釋放mutex,因此此過程中,該線程不占用管程。
(2)signal(c):線程調用該操作表明此時條件C為真。
一個線程發生signal()后,至少有兩個線程想要占用包含條件變量的管程:發出signal()操作的線程P,等待條件變量的線程Q,此時有兩種選擇:
1.非阻塞式條件變量:Q繼續等待直到P完成。
2.阻塞式條件變量:P繼續等待直到Q完成。
兩種條件變量類型
阻塞式條件變量
也被稱為霍爾風格(Hoare-style)管程,如下圖所示:
每個管程包含兩個線程隊列e,s,其中:
e:入口隊列
s:發出signal的線程隊列
對於每個條件變量C,有一個線程隊列,用C.q表示,如上圖的a.q、b.q,這些隊列很多情況下可以實現為FIFO模式。
阻塞式條件變量實現如下:
非阻塞式條件變量
也稱為Mesa風格管程,如下圖所示:
該模型中,發出signal()操作的線程不會失去管程的占用權,被notified()的線程將會被移到隊列e中,相較於阻塞式條件變量,該模型不需要隊列s。例如Pthread中的條件變量就采用這種非阻塞模式,即發出signal()操作的線程優先級高於被notified()的線程,要使用這種條件變量:首先利用pthread_mutex_lock獲取互斥鎖,然后調用pthread_cond_wait在線程睡眠等待之前先釋放互斥鎖,在其被喚醒后再重新獲取互斥鎖。關於pthread條件變量如下會有詳細介紹。
非阻塞條件變量實現如下:
POSIX同步之互斥鎖和條件變量的使用
如下為經典的有界緩沖區問題,可以用生產者/消費者模型描述,示意圖如下:
采用互斥量的生產者/消費者代碼如下:

1 [root@bogon unp]# cat producer_consumer_mutex.c 2 #include <unistd.h> 3 #include <sys/types.h> 4 #include <pthread.h> 5 #include <stdlib.h> 6 #include <string.h> 7 #include <errno.h> 8 #include <stdio.h> 9 10 #define CONSUMER_COUNT 1 /* 1個消費者線程 */ 11 #define PRODUCER_COUNT 3 /* 3個生產者線程 */ 12 #define BUFFERSIZE 10 13 14 int g_buffer[BUFFERSIZE]; 15 16 unsigned short in = 0; 17 unsigned short out = 0; 18 19 pthread_mutex_t g_mutex; 20 21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT]; /* 存放生產者和消費者的線程號 */ 22 23 void* consumer(void* arg) 24 { 25 int num = (int)arg; 26 /* 不斷消費 */ 27 while (1) 28 { 29 pthread_mutex_lock(&g_mutex); 30 31 /* 打印倉庫當前狀態 */ 32 int i; 33 for (i = 0; i < BUFFERSIZE; i++) 34 { 35 if (g_buffer[i] == -1) 36 printf("g_buffer[%d] = %s\n", i, "null"); 37 else 38 printf("g_buffer[%d] = %d\n", i, g_buffer[i]); 39 40 if (i == out) 41 printf("g_buffer[%d]可以消費\n", i); 42 } 43 44 /* 消費產品 */ 45 printf("thread %d 開始消費產品 %d\n", num, g_buffer[out]); 46 sleep(4); /* 消費一個產品需要4秒 */ 47 g_buffer[out] = -1; 48 printf("消費完畢\n"); 49 out = (out + 1) % BUFFERSIZE; 50 51 pthread_mutex_unlock(&g_mutex); 52 } 53 54 return NULL; 55 } 56 57 void* producer(void* arg) 58 { 59 int num = (int)arg; 60 /* 不斷生產 */ 61 while (1) 62 { 63 pthread_mutex_lock(&g_mutex); 64 65 /* 打印倉庫當前狀態 */ 66 int i; 67 for (i = 0; i < BUFFERSIZE; i++) 68 { 69 if (g_buffer[i] == -1) 70 printf("g_buffer[%d] = %s\n", i, "null"); 71 else 72 printf("g_buffer[%d] = %d\n", i, g_buffer[i]); 73 74 if (i == in) 75 printf("g_buffer[%d]可以生產\n", i); 76 } 77 78 /* 生產產品 */ 79 g_buffer[in]++; 80 printf("thread %d 開始生產產品 %d\n", num, g_buffer[in]); 81 sleep(2); /* 生產一個產品需要2秒 */ 82 printf("生產完畢\n"); 83 in = (in + 1) % BUFFERSIZE; 84 85 pthread_mutex_unlock(&g_mutex); 86 } 87 88 return NULL; 89 } 90 91 int main(void) 92 { 93 /* 初始化倉庫 */ 94 int i; 95 for (i = 0; i < BUFFERSIZE; i++) 96 g_buffer[i] = -1; 97 98 /* 創建消費者線程,線程號為:[0, CONSUMER_COUNT) */ 99 for (i = 0; i < CONSUMER_COUNT; i++) 100 { 101 pthread_create(&g_thread[i], NULL, consumer, (void*)i); 102 } 103 104 /* 創建生產者線程,線程號為:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */ 105 for (i = 0; i < PRODUCER_COUNT; i++) 106 { 107 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT)); 108 } 109 110 /* 等待創建的所有線程退出 */ 111 for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++) 112 { 113 pthread_join(g_thread[i], NULL); 114 } 115 116 return 0; 117 } 118 119 // output 120 ... 121 thread 2 開始生產產品 4 122 生產完畢 123 g_buffer[0] = 4 124 g_buffer[1] = 4 125 g_buffer[2] = 4 126 g_buffer[3] = 2 127 g_buffer[3]可以生產 128 g_buffer[4] = 2 129 g_buffer[5] = 1 130 g_buffer[6] = 1 131 g_buffer[7] = 0 132 g_buffer[8] = 0 133 g_buffer[9] = 4 134 thread 1 開始生產產品 3 135 生產完畢 136 g_buffer[0] = 4 137 g_buffer[1] = 4 138 g_buffer[2] = 4 139 g_buffer[3] = 3 140 g_buffer[4] = 2 141 g_buffer[5] = 1 142 g_buffer[6] = 1 143 g_buffer[7] = 0 144 g_buffer[8] = 0 145 g_buffer[9] = 4 146 g_buffer[9]可以消費 147 thread 0 開始消費產品 4 148 消費完畢 149 ...
但是上述程序中存在一個問題,就是當生產者線程未准備好產品時,消費者線程卻在不斷執行循環,這種被稱為輪轉(spinning)或者輪詢(polling)的現象是對CPU資源的一大浪費。如下引入條件變量與互斥鎖共同工作,互斥鎖用於加鎖互斥,而條件變量則專注於等待,每個條件變量總是和一個互斥鎖關聯。
采用條件變量的生產者/消費者代碼如下:

1 [root@bogon unp]# cat producer_consumer_condition.c 2 #include <unistd.h> 3 #include <sys/types.h> 4 #include <pthread.h> 5 #include <stdlib.h> 6 #include <string.h> 7 #include <errno.h> 8 #include <stdio.h> 9 10 #define CONSUMER_COUNT 1 /* 1個消費者線程 */ 11 #define PRODUCER_COUNT 3 /* 3個生產者線程 */ 12 #define BUFFERSIZE 10 13 14 int g_buffer[BUFFERSIZE]; 15 16 unsigned short in = 0; 17 unsigned short out = 0; 18 19 pthread_mutex_t g_mutex; 20 21 typedef struct 22 { 23 pthread_mutex_t mutex; 24 pthread_cond_t cond; 25 } Condition; 26 27 Condition not_empty = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER}; 28 Condition not_full = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER}; 29 30 int nready; /* 可以消費的產品數量 */ 31 32 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT]; /* 存放生產者和消費者的線程號 */ 33 34 void* consumer(void* arg) 35 { 36 int num = (int)arg; 37 /* 不斷消費 */ 38 while (1) 39 { 40 pthread_mutex_lock(&g_mutex); 41 42 /* 打印倉庫當前狀態,(為了便於比較,這段打印臨界區依然只使用互斥鎖保護) */ 43 int i; 44 for (i = 0; i < BUFFERSIZE; i++) 45 { 46 if (g_buffer[i] == -1) 47 printf("g_buffer[%d] = %s\n", i, "null"); 48 else 49 printf("g_buffer[%d] = %d\n", i, g_buffer[i]); 50 51 if (i == out) 52 printf("g_buffer[%d]可以消費\n", i); 53 } 54 55 pthread_mutex_unlock(&g_mutex); 56 57 /* 消費產品 */ 58 pthread_mutex_lock(¬_empty.mutex); 59 60 while (nready == 0) 61 pthread_cond_wait(¬_empty.cond, ¬_empty.mutex); 62 printf("thread %d 開始消費產品 %d\n", num, g_buffer[out]); 63 sleep(4); /* 消費一個產品需要4秒 */ 64 g_buffer[out] = -1; 65 printf("消費完畢\n"); 66 --nready; 67 out = (out + 1) % BUFFERSIZE; 68 69 pthread_cond_signal(¬_full.cond); 70 pthread_mutex_unlock(¬_empty.mutex); 71 } 72 73 return NULL; 74 } 75 76 void* producer(void* arg) 77 { 78 int num = (int)arg; 79 /* 不斷生產 */ 80 while (1) 81 { 82 pthread_mutex_lock(&g_mutex); 83 84 /* 打印倉庫當前狀態 */ 85 int i; 86 for (i = 0; i < BUFFERSIZE; i++) 87 { 88 if (g_buffer[i] == -1) 89 printf("g_buffer[%d] = %s\n", i, "null"); 90 else 91 printf("g_buffer[%d] = %d\n", i, g_buffer[i]); 92 93 if (i == in) 94 printf("g_buffer[%d]可以生產\n", i); 95 } 96 97 pthread_mutex_unlock(&g_mutex); 98 99 /* 生產產品 */ 100 pthread_mutex_lock(¬_full.mutex); 101 102 while (nready == BUFFERSIZE) 103 pthread_cond_wait(¬_full.cond, ¬_full.mutex); 104 g_buffer[in]++; 105 printf("thread %d 開始生產產品 %d\n", num, g_buffer[in]); 106 sleep(2); /* 生產一個產品需要2秒 */ 107 printf("生產完畢\n"); 108 ++nready; 109 in = (in + 1) % BUFFERSIZE; 110 111 pthread_cond_signal(¬_empty.cond); 112 pthread_mutex_unlock(¬_full.mutex); 113 } 114 115 return NULL; 116 } 117 118 int main(void) 119 { 120 /* 初始化倉庫 */ 121 int i; 122 for (i = 0; i < BUFFERSIZE; i++) 123 g_buffer[i] = -1; 124 125 /* 創建消費者線程,線程號為:[0, CONSUMER_COUNT) */ 126 for (i = 0; i < CONSUMER_COUNT; i++) 127 { 128 pthread_create(&g_thread[i], NULL, consumer, (void*)i); 129 } 130 131 /* 創建生產者線程,線程號為:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */ 132 for (i = 0; i < PRODUCER_COUNT; i++) 133 { 134 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT)); 135 } 136 137 /* 等待創建的所有線程退出 */ 138 for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++) 139 { 140 pthread_join(g_thread[i], NULL); 141 } 142 143 return 0; 144 } 145 146 // output is the same as above
條件變量使用說明:
一個條件變量的改變是原子性的,因此需要一個互斥鎖來保證,因此,條件變量的使用代碼可以如下:
1 typedef struct 2 { 3 pthread_mutex_t mutex; 4 pthread_cond_t cond; 5 // 與條件變量相關的變量聲明 6 } Condition; 7 Condition cond_a = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...}; 8 Condition cond_b = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...}; 9 ...
1.執行signal操作的線程中流程如下:
pthread_mutex_lock(&cond_a.mutex); // 設置條件為真 pthread_cond_signal(&cond_a.cond); pthread_mutex_unlock(&cond_a.mutex);
說明:
pthread_cond_signal與pthread_mutex_unlock的順序:如果先signal后unlock,則可以確定signal操作是由lock住cond_a.mutex的線程調用的;如果先unlock后signal,則任一線程都可調用signal操作。如果需要可預見的調度行為,最好先signal后unlock,就像上面那樣。
2.執行wait操作的線程中流程如下:
pthread_mutex_lock(&cond_a.mutex); while (條件為假) pthread_cond_wait(&cond_a.cond, &cond_a.mutex); // 修改條件 pthread_mutex_unlock(&cond_a.mutex);
說明:
(1)pthread_cond_wait執行如下3個操作:
- 解鎖cond_a.mutex,使得其他線程可以進入以便改變條件
- 將調用線程阻塞在條件變量cond_a上(睡眠了),直到某個線程將條件設為真
- 成功返回后(此時某個線程調用了pthread_cond_signal/broadcast)重新對cond_a.mutex加鎖。
(2)是否可以將:
while (條件為假) pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
替換為:
if (條件為假) pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
答案是如果將while替換為if,可以發生虛假(spurious)喚醒:即發出signal的線程並為將條件設為真就調用了pthread_cond_signal,此時pthread_cond_wait卻成功返回了,如此將導致后續的代碼執行失敗。因此必須在pthread_cond_wait返回后再次判斷條件是否確實為真,即必須使用循環而非條件判斷。