使用互斥量和條件變量實現線程同步控制


管程(monitor)說明

在並發編程中,管程(monitor)是一個同步構件,管程實現了同一時間點,最多只有一個線程可以執行管程的某個子程序。與那些通過修改數據結構實現互斥訪問的並發程序設計相比,管程的實現很大程度上簡化了程序設計。

管程可以確保一次只有一個進程執行管程中的程序,因此程序員不需要顯式地編寫同步代碼,但是如果需要就某些特定條件上的同步,則需要定義一些條件結構(condition variable)來實現,並且對條件變量的操作僅有wait()和signal(),如下:

condition x, y;

x.wait();

...

x.signal();

調用x.wait()操作可能會使得一個進程掛起,直到另一個進程調用x.signal()操作。與信號量中的signal()操作相比,管程中如果在沒有任何進程掛起的情況下調用signal()沒有任何作用,而在信號量中,則必然會改變信號量的狀態。

一個管程(mointor)的示意圖如下所示:

 

一個mointor中的程序運行前必須首先獲取mutex,直至程序運行完成或者線程等待的某個條件發生時才釋放mutex。當一個線程執行mointor中的一個子程序時,稱為占用(occupy)該mointor,因此必須等到沒有其他線程執行管程程序時方可調用管程程序,這是互斥保證。在管程的簡單實現中,編譯器為每個管程對象自動加入一把私有的mutex(lock),初始狀態為unlock,管程中的每個對象入口處執行lock操作,出口處執行unlock操作。

因此設計monitor時至少必須包含mutex(lock) object(互斥量)和condition variables(條件變量)。一個條件變量可以看作是等待該條件發生的線程集合。

注:monitor也稱為<線程安全對象/類/模塊>。

 

條件變量

為何需要條件變量?

考慮如下一個busy waiting loop:

while not(P)

    do skip

如果僅有mutex,則線程必須等待P為真時才能繼續執行。如此,將會導致其他線程無法進入臨界區使得條件P為真,因此該管程可能發生死鎖。

可以用條件變量解決。一個條件變量C可以看作是一個線程隊列,其中存放的線程正在等待與之關聯的條件變為真。當一個線程等待一個條件變量C時,其將mutex釋放,然后其他線程就可以進入該管程中,通過改變C的值可以使得條件C滿足,因此對條件變量C可以有如下操作:

(1)wait(c, m):線程調用該操作,等待條件C滿足后繼續執行,在等待過程中,釋放mutex,因此此過程中,該線程不占用管程。

(2)signal(c):線程調用該操作表明此時條件C為真。

一個線程發生signal()后,至少有兩個線程想要占用包含條件變量的管程:發出signal()操作的線程P,等待條件變量的線程Q,此時有兩種選擇:

1.非阻塞式條件變量:Q繼續等待直到P完成。

2.阻塞式條件變量:P繼續等待直到Q完成。

兩種條件變量類型

阻塞式條件變量

也被稱為霍爾風格(Hoare-style)管程,如下圖所示:

 

每個管程包含兩個線程隊列e,s,其中:

e:入口隊列

s:發出signal的線程隊列

對於每個條件變量C,有一個線程隊列,用C.q表示,如上圖的a.q、b.q,這些隊列很多情況下可以實現為FIFO模式。

阻塞式條件變量實現如下:

 

非阻塞式條件變量

也稱為Mesa風格管程,如下圖所示:

 

該模型中,發出signal()操作的線程不會失去管程的占用權,被notified()的線程將會被移到隊列e中,相較於阻塞式條件變量,該模型不需要隊列s。例如Pthread中的條件變量就采用這種非阻塞模式,即發出signal()操作的線程優先級高於被notified()的線程,要使用這種條件變量:首先利用pthread_mutex_lock獲取互斥鎖,然后調用pthread_cond_wait在線程睡眠等待之前先釋放互斥鎖,在其被喚醒后再重新獲取互斥鎖。關於pthread條件變量如下會有詳細介紹。

非阻塞條件變量實現如下:

 

 

POSIX同步之互斥鎖和條件變量的使用

如下為經典的有界緩沖區問題,可以用生產者/消費者模型描述,示意圖如下:

 

采用互斥量的生產者/消費者代碼如下:

  1 [root@bogon unp]# cat producer_consumer_mutex.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9 
 10 #define CONSUMER_COUNT 1        /* 1個消費者線程 */
 11 #define PRODUCER_COUNT 3        /* 3個生產者線程 */
 12 #define BUFFERSIZE 10
 13 
 14 int g_buffer[BUFFERSIZE];
 15 
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18 
 19 pthread_mutex_t g_mutex;
 20 
 21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生產者和消費者的線程號 */
 22 
 23 void* consumer(void* arg)
 24 {
 25         int num = (int)arg;
 26         /* 不斷消費 */
 27         while (1)
 28         {
 29                 pthread_mutex_lock(&g_mutex);
 30 
 31                 /* 打印倉庫當前狀態 */
 32                 int i;
 33                 for (i = 0; i < BUFFERSIZE; i++) 
 34                 {
 35                         if (g_buffer[i] == -1)
 36                                 printf("g_buffer[%d] = %s\n", i, "null");
 37                         else
 38                                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 39 
 40                         if (i == out)
 41                                 printf("g_buffer[%d]可以消費\n", i);
 42                 }
 43 
 44                 /* 消費產品 */
 45                 printf("thread %d 開始消費產品 %d\n", num, g_buffer[out]);
 46 sleep(4);       /* 消費一個產品需要4秒 */
 47                 g_buffer[out] = -1;
 48                 printf("消費完畢\n");
 49                 out = (out + 1) % BUFFERSIZE;
 50 
 51                 pthread_mutex_unlock(&g_mutex);
 52         }
 53 
 54         return NULL;
 55 }
 56 
 57 void* producer(void* arg)
 58 {
 59         int num = (int)arg;
 60         /* 不斷生產 */
 61         while (1)
 62         {
 63                 pthread_mutex_lock(&g_mutex);
 64 
 65                 /* 打印倉庫當前狀態 */
 66                 int i;
 67                 for (i = 0; i < BUFFERSIZE; i++)
 68         {
 69                 if (g_buffer[i] == -1)
 70                 printf("g_buffer[%d] = %s\n", i, "null");
 71             else
 72                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 73   
 74             if (i == in)
 75                 printf("g_buffer[%d]可以生產\n", i);
 76         }
 77 
 78                 /* 生產產品 */
 79                 g_buffer[in]++;
 80                 printf("thread %d 開始生產產品 %d\n", num, g_buffer[in]);
 81                 sleep(2);       /* 生產一個產品需要2秒 */
 82                 printf("生產完畢\n");
 83                 in = (in + 1) % BUFFERSIZE;
 84 
 85                 pthread_mutex_unlock(&g_mutex);
 86         }
 87 
 88         return NULL;
 89 }
 90 
 91 int main(void)
 92 {
 93         /* 初始化倉庫 */
 94         int i;
 95         for (i = 0; i < BUFFERSIZE; i++)
 96                 g_buffer[i] = -1;
 97 
 98         /* 創建消費者線程,線程號為:[0, CONSUMER_COUNT) */
 99         for (i = 0; i < CONSUMER_COUNT; i++)
100         {
101                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
102         }
103 
104         /* 創建生產者線程,線程號為:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
105         for (i = 0; i < PRODUCER_COUNT; i++)
106         {
107                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
108         }
109 
110         /* 等待創建的所有線程退出 */
111         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
112         {
113                 pthread_join(g_thread[i], NULL);
114         }
115 
116         return 0;
117 }
118 
119 // output
120 ...
121 thread 2 開始生產產品 4
122 生產完畢
123 g_buffer[0] = 4
124 g_buffer[1] = 4
125 g_buffer[2] = 4
126 g_buffer[3] = 2
127 g_buffer[3]可以生產
128 g_buffer[4] = 2
129 g_buffer[5] = 1
130 g_buffer[6] = 1
131 g_buffer[7] = 0
132 g_buffer[8] = 0
133 g_buffer[9] = 4
134 thread 1 開始生產產品 3
135 生產完畢
136 g_buffer[0] = 4
137 g_buffer[1] = 4
138 g_buffer[2] = 4
139 g_buffer[3] = 3
140 g_buffer[4] = 2
141 g_buffer[5] = 1
142 g_buffer[6] = 1
143 g_buffer[7] = 0
144 g_buffer[8] = 0
145 g_buffer[9] = 4
146 g_buffer[9]可以消費
147 thread 0 開始消費產品 4
148 消費完畢
149 ...
View Code

但是上述程序中存在一個問題,就是當生產者線程未准備好產品時,消費者線程卻在不斷執行循環,這種被稱為輪轉(spinning)或者輪詢(polling)的現象是對CPU資源的一大浪費。如下引入條件變量與互斥鎖共同工作,互斥鎖用於加鎖互斥,而條件變量則專注於等待,每個條件變量總是和一個互斥鎖關聯。

采用條件變量的生產者/消費者代碼如下:

  1 [root@bogon unp]# cat producer_consumer_condition.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9 
 10 #define CONSUMER_COUNT 1        /* 1個消費者線程 */
 11 #define PRODUCER_COUNT 3        /* 3個生產者線程 */
 12 #define BUFFERSIZE 10
 13 
 14 int g_buffer[BUFFERSIZE];
 15 
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18 
 19 pthread_mutex_t g_mutex;
 20 
 21 typedef struct
 22 {
 23         pthread_mutex_t mutex;
 24         pthread_cond_t cond;
 25 } Condition;
 26 
 27 Condition not_empty = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
 28 Condition not_full = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
 29 
 30 int nready;             /* 可以消費的產品數量 */
 31 
 32 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生產者和消費者的線程號 */
 33 
 34 void* consumer(void* arg)
 35 {
 36         int num = (int)arg;
 37         /* 不斷消費 */
 38         while (1)
 39         {
 40                 pthread_mutex_lock(&g_mutex);
 41 
 42                 /* 打印倉庫當前狀態,(為了便於比較,這段打印臨界區依然只使用互斥鎖保護) */
 43                 int i;
 44         for (i = 0; i < BUFFERSIZE; i++) 
 45         {
 46                 if (g_buffer[i] == -1)
 47                 printf("g_buffer[%d] = %s\n", i, "null");
 48             else
 49                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 50 
 51             if (i == out)
 52                 printf("g_buffer[%d]可以消費\n", i);
 53         }
 54 
 55                 pthread_mutex_unlock(&g_mutex);
 56 
 57                 /* 消費產品 */
 58                 pthread_mutex_lock(&not_empty.mutex);
 59 
 60                 while (nready == 0)
 61                         pthread_cond_wait(&not_empty.cond, &not_empty.mutex);
 62                 printf("thread %d 開始消費產品 %d\n", num, g_buffer[out]);
 63                 sleep(4);       /* 消費一個產品需要4秒 */
 64                 g_buffer[out] = -1;
 65                 printf("消費完畢\n");
 66                 --nready;
 67                 out = (out + 1) % BUFFERSIZE;
 68 
 69                 pthread_cond_signal(&not_full.cond);
 70                pthread_mutex_unlock(&not_empty.mutex);
 71         }
 72 
 73         return NULL;
 74 }
 75 
 76 void* producer(void* arg)
 77 {
 78         int num = (int)arg;
 79         /* 不斷生產 */
 80         while (1)
 81         {
 82                 pthread_mutex_lock(&g_mutex);
 83 
 84                 /* 打印倉庫當前狀態 */
 85                 int i;
 86                 for (i = 0; i < BUFFERSIZE; i++)
 87         {
 88                 if (g_buffer[i] == -1)
 89                 printf("g_buffer[%d] = %s\n", i, "null");
 90             else
 91                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 92   
 93             if (i == in)
 94                 printf("g_buffer[%d]可以生產\n", i);
 95         }
 96 
 97                 pthread_mutex_unlock(&g_mutex);
 98 
 99                 /* 生產產品 */
100                 pthread_mutex_lock(&not_full.mutex);
101 
102                 while (nready == BUFFERSIZE)
103                         pthread_cond_wait(&not_full.cond, &not_full.mutex);
104                 g_buffer[in]++;
105                 printf("thread %d 開始生產產品 %d\n", num, g_buffer[in]);
106                 sleep(2);       /* 生產一個產品需要2秒 */
107                 printf("生產完畢\n");
108                 ++nready;
109                 in = (in + 1) % BUFFERSIZE;
110 
111                 pthread_cond_signal(&not_empty.cond);
112                 pthread_mutex_unlock(&not_full.mutex);
113         }
114 
115         return NULL;
116 }
117 
118 int main(void)
119 {
120         /* 初始化倉庫 */
121         int i;
122         for (i = 0; i < BUFFERSIZE; i++)
123                 g_buffer[i] = -1;
124 
125         /* 創建消費者線程,線程號為:[0, CONSUMER_COUNT) */
126         for (i = 0; i < CONSUMER_COUNT; i++)
127         {
128                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
129         }
130 
131         /* 創建生產者線程,線程號為:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
132         for (i = 0; i < PRODUCER_COUNT; i++)
133         {
134                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
135         }
136 
137         /* 等待創建的所有線程退出 */
138         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
139         {
140                 pthread_join(g_thread[i], NULL);
141         }
142 
143         return 0;
144 }
145 
146 // output is the same as above
View Code

 

條件變量使用說明:

一個條件變量的改變是原子性的,因此需要一個互斥鎖來保證,因此,條件變量的使用代碼可以如下:

1 typedef struct
2 {
3     pthread_mutex_t mutex;
4     pthread_cond_t cond;
5     // 與條件變量相關的變量聲明
6 } Condition;
7 Condition cond_a = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
8 Condition cond_b = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
9 ...

1.執行signal操作的線程中流程如下:

pthread_mutex_lock(&cond_a.mutex);
// 設置條件為真
pthread_cond_signal(&cond_a.cond);
pthread_mutex_unlock(&cond_a.mutex);

說明:

pthread_cond_signal與pthread_mutex_unlock的順序:如果先signal后unlock,則可以確定signal操作是由lock住cond_a.mutex的線程調用的;如果先unlock后signal,則任一線程都可調用signal操作。如果需要可預見的調度行為,最好先signal后unlock,就像上面那樣。

2.執行wait操作的線程中流程如下:

pthread_mutex_lock(&cond_a.mutex);
while (條件為假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
// 修改條件
pthread_mutex_unlock(&cond_a.mutex);

說明:

(1)pthread_cond_wait執行如下3個操作:

  • 解鎖cond_a.mutex,使得其他線程可以進入以便改變條件
  • 將調用線程阻塞在條件變量cond_a上(睡眠了),直到某個線程將條件設為真
  • 成功返回后(此時某個線程調用了pthread_cond_signal/broadcast)重新對cond_a.mutex加鎖。

(2)是否可以將:

while (條件為假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

替換為:

if (條件為假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

答案是如果將while替換為if,可以發生虛假(spurious)喚醒:即發出signal的線程並為將條件設為真就調用了pthread_cond_signal,此時pthread_cond_wait卻成功返回了,如此將導致后續的代碼執行失敗。因此必須在pthread_cond_wait返回后再次判斷條件是否確實為真,即必須使用循環而非條件判斷。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM