並發與同步:

臨界區
new_pid = next_pid++
entry sectioncritical sectionexit section
- 禁用硬件中斷:
我們知道,系統調用以及執行流程的切換都是依靠軟中斷。禁用中斷之后,進程(線程)就不會被切換出去,從而保證代碼段能執行結束。但壞處也很明顯,由於中斷被禁用,如果臨界區代碼一直執行,其他進程就沒機會執行了。而且,只能禁止單個CPU的中斷。
- 基於軟件同步:
即基於代碼實現同步互斥,比較有名的是peterson算法,用來解決兩個進程對臨界區的互斥訪問問題。
- 基於原子操作原語的方法:
上述兩種方式都比較復雜,我們需要更加高級的武器。支持並發的語言都提供了鎖(lock)這個概念,在現實生活中也很好理解,如果只能一個人在屋子里,那么進去之后就鎖上,出來的時候再打開鎖;沒有鎖的人只能在外面等着。在編程語言中,大概是這樣樣子的:
acquire(lock)critical sectionrelease(lock)
Spinlock實現
1 #define LOCKED 1 2 int TestAndSet(int* lockPtr) { 3 int oldValue; 4 5 oldValue = *lockPtr; 6 *lockPtr = LOCKED; 7 8 return oldValue; 9 }
void acquire(int *lock){ while(TestAndSet(*lock)); } void release(int *lock){ *lock = 0; }
信號量與管程
信號量
Procedure P(Var S:Semaphore);
Begin
S:=S-1;
If S<0 then w(S) {執行P操作的線程插入等待隊列}
End;
Procedure V(Var S:Semaphore);
Begin
S:=S+1
If S<=0 then R(s) {從阻塞隊列中喚醒一個線程}
End;
信號量在現實生活中很容易找到對比的例子,比如銀行的窗口數量就是S,在窗口辦理業務就是P操作,業務辦理結束就是V操作。
根據S初始值的不同,semaphore就有不同的作用。如果S初始值為1,那么這個semaphore就是一個mutex semaphore,效果就是臨界區的互斥訪問。如果S初始值為0,那么就是用來做條件同步,效果就是必須等待某些條件發生。如果S初始值為N(N一般大於1),那么就是用來限制並發數目,也被稱之為counting semaphone。
后文會利用具體的例子(生產者消費者問題)來闡述semaphore上面的三種用法。
管程
mutex global_mutex
condition global_cv
bool p
另外一個線程acquire(global_mutex)
while(!p): # 如果條件不滿足,則需要等待
global_cv.wait(global_mutex) # wait操作將當前線程阻塞在等到隊列中,釋放對管程的互斥訪問// do a lot of thing
release(global_mutex)
acquire(global_mutex)
// do sth make "p" is true
global_cv.signal() # 喚醒在管程中等待的線程release(global_mutex)
contidion::wait(mutex& mut){
release(mut)
yield current thread # 當前線程釋放控制權
acquire(mut)
}

可以看到,在T2線程調用x.signal之后,hansen模式會繼續執行,所以當重新回到wait線程的時候,可能情況已經發生了變化,所以需要重新判斷;而Hoare模式會立刻從T2線程切換到T1線程。Hansen看起來變得復雜,引入了不確定性,但是相比hoare模式,少了一次線程的切換,在真實的操作系統中就是這么實現的,所以我們編碼的時候都需要用while循環判斷條件是否成立。
信號量 VS 管程
- 信號量本質是可共享的資源的數量; 而管程是一種抽象數據結構用來限制同一時刻只有一個線程進入臨界區
- 信號量是可以並發的,並發量取決於S初始值;而管程內部同一時刻最多只能有一個線程執行
- 信號量與管理的資源緊耦合(即信號量S的初始值等同於資源的數目,且通過P V操作修改剩余可用的資源數量);而在管程中需自行判斷是否還有可共享的資源。這一點可以參見下面生產者消費者的實現代碼
- 信號量的P操作可能阻塞,也可能不阻塞;而管程的wait操作一定會阻塞
- 信號量的V操作如果喚醒了其他線程,當前線程與被喚醒線程並發執行;對於管程的signal操作,要么當前線程繼續執行(Hansen),要么被喚醒線程繼續執行(Hoare),二者不能並發。
生產者消費者問題
- 對消息隊列的操作必須是互斥的,需要加鎖(如果是lockfree的數據結構,就不用加鎖,如boost::lockfree::queue)
- 消息隊列中沒有數據時,消費者需要等待生產者產生數據,這就是條件同步
- 消息隊列滿時,生產者需要等到消費者消費數據,這也是條件同步
信號量實現
從上面分析生產者消費者需要解決的同步問題(互斥與條件同步),都能用信號量來解決。對於互斥,信號量S為1就可以;對於消費者等待的情況,信號量初始值為0即可;對於生產者等待的情況,信號量初始值為消息隊列長度即可。linux下提供了信號量的實現,頭文件在/usr/include/semaphore.h,代碼實現如下(producer_consumer_semaphore.cpp):
1 #include <stdio.h> 2 #include <pthread.h> 3 #include <time.h> 4 #include <stdlib.h> 5 #include <unistd.h> 6 #include <semaphore.h> 7 8 #define BUFF_SIZE 3 9 #define PRODUCE_THREAD_SIZE 5 10 int g_buff[BUFF_SIZE]; 11 int g_write_index = 0; 12 int g_read_index = 0; 13 14 sem_t lock; 15 sem_t consume_sem, produce_sem; 16 17 18 void* produce(void *ptr){ 19 int idx = *(int*)ptr; 20 printf("in produce %d %d %d\n",idx, g_write_index, g_read_index); 21 while(1){ 22 sem_wait(&produce_sem); # 限制了生產者並發的數目 23 24 sem_wait(&lock); # 對臨界區的訪問要加鎖 25 g_buff[g_write_index] = idx; 26 g_write_index = (g_write_index + 1) % BUFF_SIZE; 27 sem_post(&lock); 28 29 sem_post(&consume_sem); 30 } 31 return NULL; 32 } 33 34 void* consume(void *ptr){ 35 while(1){ 36 sem_wait(&consume_sem); 37 sem_wait(&lock); 38 int data = g_buff[g_read_index]; 39 g_buff[g_read_index] = -1; 40 g_read_index = (g_read_index + 1) % BUFF_SIZE; 41 printf("consume %d %d %d\n", data, g_read_index, g_write_index); 42 sem_post(&lock); 43 sem_post(&produce_sem); 44 } 45 return NULL; 46 } 47 48 int main(int argc, char * argv[]){ 49 pthread_t con; 50 pthread_t pros[PRODUCE_THREAD_SIZE]; 51 sem_init(&lock, 0, 1); 52 sem_init(&consume_sem,0, 0); 53 sem_init(&produce_sem,0, BUFF_SIZE); 54 55 pthread_create(&con, NULL, consume, NULL); 56 int thread_args[PRODUCE_THREAD_SIZE]; 57 for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){ 58 thread_args[i] = i + 1; 59 pthread_create(&pros[i], NULL, produce, (thread_args + i)); 60 } 61 62 pthread_join(con,0); 63 for(int i = 0; i < PRODUCE_THREAD_SIZE; i++) 64 pthread_join(pros[i],0); 65 66 sem_destroy(&lock); 67 sem_destroy(&consume_sem); 68 sem_destroy(&produce_sem); 69 70 return 0; 71 }
代碼中,消息隊列的大小為3,produce_sem的初始值一定與消息隊列的大小相同。總共有5個生產者線程,多余可並發的數量(produce_sem),因此很大概率會有生產者線程阻塞在produce_sem對應的等待隊列。 另外兩點需要注意:第一點在produce和consume線程中都是需要加鎖(互斥鎖lock),因為信號量是可以並發的,需要對臨界資源(g_buff,g_read_index,g_write_index)互斥訪問。另外,在produce線程,需要先判斷能否並發,然后再對臨界區加鎖,二者的順序不能反過來,否則會產生死鎖。
上面的代碼用:g++ -lpthread producer_consumer_semaphore.cpp producer_consumer_semaphore, 然后運行 ./producer_consumer_semaphore 即可
管程實現
1 #include <stdio.h> 2 #include <pthread.h> 3 #include <time.h> 4 #include <stdlib.h> 5 #include <unistd.h> 6 7 8 #define BUFF_SIZE 3 9 #define PRODUCE_THREAD_SIZE 5 10 int g_buff[BUFF_SIZE]; 11 int g_write_index = 0; 12 int g_read_index = 0; 13 14 pthread_mutex_t lock; 15 pthread_cond_t consume_cond, produce_cond; 16 17 18 void* produce(void *ptr){ 19 int idx = *(int*)ptr; 20 printf("in produce %d %d %d\n",idx, g_write_index, g_read_index); 21 while(1){ 22 pthread_mutex_lock(&lock); 23 while((g_write_index + 1) % BUFF_SIZE == g_read_index) 24 pthread_cond_wait(&produce_cond, &lock); 25 26 g_buff[g_write_index] = idx; 27 g_write_index = (g_write_index + 1) % BUFF_SIZE; 28 29 pthread_cond_signal(&consume_cond); 30 pthread_mutex_unlock(&lock); 31 32 } 33 return NULL; 34 } 35 36 void* consume(void *ptr){ 37 while(1){ 38 pthread_mutex_lock(&lock); 39 while(g_read_index == g_write_index) 40 pthread_cond_wait(&consume_cond, &lock); 41 42 int data = g_buff[g_read_index]; 43 g_buff[g_read_index] = -1; 44 g_read_index = (g_read_index + 1) % BUFF_SIZE; 45 printf("consume %d\n", data); 46 47 pthread_cond_signal(&produce_cond); 48 pthread_mutex_unlock(&lock); 49 } 50 return NULL; 51 } 52 53 int main(int argc, char * argv[]){ 54 pthread_t con; 55 pthread_t pros[PRODUCE_THREAD_SIZE]; 56 57 srand((unsigned)time(NULL)); 58 pthread_mutex_init(&lock, 0); 59 pthread_cond_init(&consume_cond,0); 60 pthread_cond_init(&produce_cond,0); 61 62 pthread_create(&con, NULL, consume, NULL); 63 int thread_args[PRODUCE_THREAD_SIZE]; 64 for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){ 65 thread_args[i] = i + 1; 66 pthread_create(&pros[i], NULL, produce, (thread_args + i)); 67 } 68 69 pthread_join(con,0); 70 for(int i = 0; i < PRODUCE_THREAD_SIZE; i++) 71 pthread_join(pros[i],0); 72 73 pthread_mutex_destroy(&lock); 74 pthread_cond_destroy(&consume_cond); 75 pthread_cond_destroy(&produce_cond); 76 77 return 0; 78 }
上面的代碼用:g++ -lpthread producer_consumer_monitor.cpp producer_consumer_monitor, 然后運行 ./producer_consumer_monitor 即可
總結:
並發可能帶來互斥、飢餓、死鎖等問題,操作系統為了解決這些問題提供了很多的支持,從最底層的禁止硬件終端和原子操作指令(test-and-set),到更高級的同步原語:鎖(互斥鎖、自旋鎖)、信號量、管程。管程是一種抽象數據結構,編程中使用互斥鎖配合信號量使用。管程有兩種不同的模式:hansen vs hoare,區別在於signal操作之后是否立即切換到被喚醒線程,實際的操作系統中,多使用hansen模式。