並發與同步、信號量與管程、生產者消費者問題


   計算機硬件發展到今天,不管是專業服務器還是PC,甚至於最普遍的移動設備基本上都是多核CPU,程序的並發執行可以更加充分利用這些計算資源。除此之后,為了協調CPU與外設(如磁盤)的速度差異,我們也需要並發。本文是筆者學習清華大學和UCSD(加州大學聖迭戈分校)的操作系統課程的筆記和總結,以及自己的思考和實踐。

並發與同步:

    並發不是多核時代的產物,在早期的多核CPU已經通過時分復用來實現程序之間的並發。並發帶來的好處很多,比如資源共享、提高程序執行效率、模塊化等等。但並發對編程帶來了很多挑戰,比如互斥、死鎖。
 
    所謂同步互斥,就是在並發的情況下,保證一些操作的原子性。 原子操作(Atomic Operation)即在一次執行過程中要么全部成功,要么全部失敗的操作,不存在部分執行的情況。原子性在各種應用場景都非常重要,比如數據庫的ACID,其中A就是原子性。生活中,也要很多原子性的例子,比如銀行卡之間轉賬,分成兩個操作:A扣錢,B加錢,但這兩個操作必須滿足原子性,不然就出大問題了。
 
    我們常見意義上的並發一般是指多進程之間或者多線程之間。當然,近些年越來越流行的協程也算一種並發。對於非操作系統內核開發人員,需要同步互斥的場景大多數都是線程之間的並發。更好的資源共享是線程的特性之一,下圖(來自 UCSD:Principles of Operating Systems )所示是同一個進程內三個線程的內存使用情況:
 
    
 
    對於每一個進程內的多個線程,static data segment(包括全局變量、static對象)、Heap(堆,malloc和new分配的空間)是共享的。每個線程有自己獨立的Stack(棧),存儲局部變量。
    后文主要以多線程為例,但同樣適用於多進程。

臨界區

    提到並發編程,首先就想到 臨界區(critical section)這個概念,臨界區是線程中訪問臨界資源的一段需要互斥執行的代碼。臨界資源是指線程之間共享的資源,但不同的執行序列結果不確定的,這也叫做 競態條件(race condition)。舉個例子,我們都知道linux的系統調用fork會創建一個子進程,該調用在父進程會返回子進程的PID,操作系統中每個進程的PID必須是獨一無二的。假設pid的生成是這樣的:
new_pid = next_pid++
    上面的代碼中next_pid就是臨界資源,多個進程可能同時訪問這個資源, 但上述代碼不是原子的(在匯編下是幾條指令),但是需要互斥執行的,因此需要臨界區。當然,臨界區只是一個概念,具體怎么實現依賴於系統與編程語言。在編碼中使用臨界區的偽代碼如下:
entry section        
     critical section 
exit section         
    分為三部分:
    entry section: 判斷能否進入,如果能則設置標志,不能則等待
    critical section:需要互斥訪問的代碼段
    exit section: 清除設置的標志,使得其他進程(線程)可以進入臨界區
 
    臨界區的實現有幾種方式:
  • 禁用硬件中斷:

  我們知道,系統調用以及執行流程的切換都是依靠軟中斷。禁用中斷之后,進程(線程)就不會被切換出去,從而保證代碼段能執行結束。但壞處也很明顯,由於中斷被禁用,如果臨界區代碼一直執行,其他進程就沒機會執行了。而且,只能禁止單個CPU的中斷。

  • 基於軟件同步:

  即基於代碼實現同步互斥,比較有名的是peterson算法,用來解決兩個進程對臨界區的互斥訪問問題。

  • 基於原子操作原語的方法:

  上述兩種方式都比較復雜,我們需要更加高級的武器。支持並發的語言都提供了鎖(lock)這個概念,在現實生活中也很好理解,如果只能一個人在屋子里,那么進去之后就鎖上,出來的時候再打開鎖;沒有鎖的人只能在外面等着。在編程語言中,大概是這樣樣子的:

acquire(lock)
  critical section 
release(lock)
  acquire,release實現的也就是entry section和 exit section的功能,上面的代碼是面向過程的寫法,面向對象一般寫成lock.acquire和lock.release,或者使用RALL(Resource Acquisition Is Initialization)來避免release函數未被調用到的問題。
 
    lock的實現需要基於硬件提供的“ 原子操作指令”,這些操作雖然理解起來是幾步操作,但硬件保證其原子性。比較常見的原子操作指令包括 test and setcompare and swap,接下來通過test and set來看看lock的實現。
 

Spinlock實現

  test-and-set是一個原子操作,其作用對某個變量賦值為1(set),並返回變量之前的值,下面用C語言描述這個過程
1     #define LOCKED 1
2     int TestAndSet(int* lockPtr) {
3         int oldValue;
4          
5         oldValue = *lockPtr;
6         *lockPtr = LOCKED;
7 
8         return oldValue;
9     }

 

    對應的acqurie和release的偽碼如下:
void acquire(int *lock){
    while(TestAndSet(*lock));
}

void release(int *lock){
    *lock = 0;
}
  在acquire函數中,如果TestAndSet返回1,那么while循環就一直執行(也就是在這里等待),直到另一個線程調用release。當然,這個實現看起來不太好,主要是等待的線程會不停的檢查,浪費CPU,這個問題稱之為 忙等待(busy-wait or spin-wait),所以這個lock的實現也叫自旋鎖spinlock。解決辦法是如果需要等待,那么該線程主動交出執行權,讓其他線程有機會執行,這種方式稱之為 讓權等待(yield-wait or sleep-wait),應用開發人員使用的互斥鎖一都是指這種情況。

 

信號量與管程

  在上一部分介紹了並發、同步互斥、臨界區等基本概念,也介紹了利用原子操作指令實現鎖(lock)的機制與實現。但這個的lock(spinlock)是最基礎的同步原語,實際操作系統需要封裝更高級的同步原語來實現更復雜、更實用的功能。信號量(semaphore)和管程(monitor)就是操作系統提供的兩種更高級的同步方式,在操作系統(linux)和編程語言都有對應的實現和封裝,本章節對二者進行介紹和對比。

信號量

  semaphore是大牛 Dijkstra(沒錯,就是出現在數據結構或者圖論中的Dijkstra)在20世紀60年代發明的概念,用來協調並發程序對共享資源的訪問。注意,這里是協調,而不是互斥,協調的概念更廣泛一些,也包括並發程序之間的協作。semaphore有一個整型變量(S)和兩個 原子操作組成。S了資源的數量,而兩個原子操作一般被成為P操作和V操作(有時也被成為wait、signal)。
  P操作表示申請一個資源,P操作的定義:S=S-1,若S>=0,則執行P操作的線程繼續執行;若S<0,則置該線程為阻塞狀態,並將其插入阻塞隊列。偽碼如下:
  

Procedure P(Var S:Semaphore);
  Begin
    S:=S-1;
    If S<0 then w(S) {執行P操作的線程插入等待隊列}
  End;

  V操作表示釋放一個資源,V操作定義:S=S+1,若S>0則執行V操作的線程繼續執行;若S<0,則從阻塞狀態喚醒一個線程,並將其插入就緒隊列。偽代碼如下:
  

Procedure V(Var S:Semaphore);
  Begin
    S:=S+1
    If S<=0 then R(s) {從阻塞隊列中喚醒一個線程}
  End;

  信號量在現實生活中很容易找到對比的例子,比如銀行的窗口數量就是S,在窗口辦理業務就是P操作,業務辦理結束就是V操作。

  根據S初始值的不同,semaphore就有不同的作用。如果S初始值為1,那么這個semaphore就是一個mutex semaphore,效果就是臨界區的互斥訪問。如果S初始值為0,那么就是用來做條件同步,效果就是必須等待某些條件發生。如果S初始值為N(N一般大於1),那么就是用來限制並發數目,也被稱之為counting semaphone。

  后文會利用具體的例子(生產者消費者問題)來闡述semaphore上面的三種用法。

管程

   管程是編程語言提供的一種抽象數據結構,用於多線程互斥訪問共享資源。首先, 是互斥訪問,即任一時刻只有一個線程在執行管程代碼;第二, 正在管程內的線程可以放棄對管程的控制權,等待某些條件發生再繼續執行。第二點就厲害了,不管是之前提到的互自旋鎖還是信號量,進入了臨界區域除非代碼執行完,否則是不會出現線程切換的,而管程可以主動放棄執行權,這反映到編碼上也會有一些差異。
  條件變量(condition variable)是管程內部的實現機制,每個條件變量都代表一種等待的原因,也對應一個等待隊列。條件變量有兩個操作:wait和signal,或者在加上一個signal_all操作。條件變量都是配合互斥鎖一起使用,互斥鎖保證了對臨界資源的互斥訪問。簡單來說,管程就是互斥鎖(稱之為monitor's lock)與條件變量的配合使用。下面是基本編程范式:
  首先是線程共享的變量,分別是互斥鎖、條件變量、是否需要等待的原因
  

mutex global_mutex
condition global_cv
bool p

  可能需要等待的線程(code snippet1):
  

acquire(global_mutex) 
while(!p): # 如果條件不滿足,則需要等待
  global_cv.wait(global_mutex) # wait操作將當前線程阻塞在等到隊列中,釋放對管程的互斥訪問

// do a lot of thing
release(global_mutex)

  另外一個線程
  

acquire(global_mutex)
// do sth make "p" is true
global_cv.signal() # 喚醒在管程中等待的線程

release(global_mutex)

  看了上面的代碼,可能會產生兩個疑問:第一個是code snippet1中,global-wait在互斥鎖內,那怎么實現將當前線程阻塞的呢,必須要釋放(release)global_mutex才行啊?可以看到,wait操作其中一個參數是global_mutex, 可以把wait操作理解成下面的偽碼:

contidion::wait(mutex& mut){
  release(mut)
  yield current thread # 當前線程釋放控制權
  acquire(mut)
}

  另外一個疑問,對條件p的判斷用了while循環,為什么if不行呢?這個疑問,大多數人(包括寫這篇文章之前的我:-D)都是知其然而不知其所以然。while形式稱之為hansen模式,if形式稱之為hoare模式。這兩種模式對應着不同的管程實現,具體來說是調用signal操作時,是繼續在當前線程執行,還是切換到被喚醒的線程執行?下圖來自清華大學的網絡課程《 操作系統》:

  可以看到,在T2線程調用x.signal之后,hansen模式會繼續執行,所以當重新回到wait線程的時候,可能情況已經發生了變化,所以需要重新判斷;而Hoare模式會立刻從T2線程切換到T1線程。Hansen看起來變得復雜,引入了不確定性,但是相比hoare模式,少了一次線程的切換,在真實的操作系統中就是這么實現的,所以我們編碼的時候都需要用while循環判斷條件是否成立。

信號量 VS 管程

  上面的介紹可以看到,信號量與管程都能夠解決我們再編程中遇到的同步互斥問題,所以,難免需要對二者進行對比。首先,二者本質上時互通的,hoare在 論文中也證明了可以用信號量實現管程、也可以用管程實現信號量。下面簡單歸納一下二者的區別:
  • 信號量本質是可共享的資源的數量; 而管程是一種抽象數據結構用來限制同一時刻只有一個線程進入臨界區
  • 信號量是可以並發的,並發量取決於S初始值;而管程內部同一時刻最多只能有一個線程執行
  • 信號量與管理的資源緊耦合(即信號量S的初始值等同於資源的數目,且通過P V操作修改剩余可用的資源數量);而在管程中需自行判斷是否還有可共享的資源。這一點可以參見下面生產者消費者的實現代碼
  • 信號量的P操作可能阻塞,也可能不阻塞;而管程的wait操作一定會阻塞
  • 信號量的V操作如果喚醒了其他線程,當前線程與被喚醒線程並發執行;對於管程的signal操作,要么當前線程繼續執行(Hansen),要么被喚醒線程繼續執行(Hoare),二者不能並發。
  簡而言之,管程相比信號量更好實用,也更容易出錯。大多數支持多線程的編程語言會同時提供管程與信號量,如Java、Python、Linux C;而在在C++11中,提供了mutex和contidion_variable,但是沒有提供semaphore。管程(互斥鎖 + 條件變量)是並發編程中最為常見的同步方式,在陳碩大牛的《 多線程服務器的常用編程模型》一文中提到,如果需要使用底層同步原語,也是推薦互斥鎖+條件變量。
 
  

生產者消費者問題

   生產者消費者是非常經典的同步模型,在編程中也有非常多的應用。生產者消費者問題描述如下:一個或者多個生產者往消息隊列里面插入數據;單個消費者從消息隊列里面取出數據處理;並且對消息隊列的操作必須是互斥的。分析問題可知需要解決三個同步問題:
  • 對消息隊列的操作必須是互斥的,需要加鎖(如果是lockfree的數據結構,就不用加鎖,如boost::lockfree::queue)
  • 消息隊列中沒有數據時,消費者需要等待生產者產生數據,這就是條件同步
  • 消息隊列滿時,生產者需要等到消費者消費數據,這也是條件同步

信號量實現

   從上面分析生產者消費者需要解決的同步問題(互斥與條件同步),都能用信號量來解決。對於互斥,信號量S為1就可以;對於消費者等待的情況,信號量初始值為0即可;對於生產者等待的情況,信號量初始值為消息隊列長度即可。linux下提供了信號量的實現,頭文件在/usr/include/semaphore.h,代碼實現如下(producer_consumer_semaphore.cpp):

 1 #include <stdio.h>
 2 #include <pthread.h>
 3 #include <time.h>
 4 #include <stdlib.h>
 5 #include <unistd.h>
 6 #include <semaphore.h>
 7 
 8 #define BUFF_SIZE  3
 9 #define PRODUCE_THREAD_SIZE 5
10 int g_buff[BUFF_SIZE];
11 int g_write_index = 0;
12 int g_read_index = 0;
13 
14 sem_t lock;
15 sem_t consume_sem, produce_sem;
16 
17 
18 void* produce(void *ptr){
19     int idx = *(int*)ptr;
20     printf("in produce %d %d %d\n",idx, g_write_index, g_read_index);
21     while(1){
22         sem_wait(&produce_sem); # 限制了生產者並發的數目 23 
24         sem_wait(&lock);     # 對臨界區的訪問要加鎖 25         g_buff[g_write_index] = idx;
26         g_write_index = (g_write_index + 1) % BUFF_SIZE;
27         sem_post(&lock);
28 
29         sem_post(&consume_sem);
30     }
31     return NULL;
32 }
33 
34 void* consume(void *ptr){
35     while(1){
36         sem_wait(&consume_sem);
37         sem_wait(&lock);
38         int data = g_buff[g_read_index];
39         g_buff[g_read_index] = -1;
40         g_read_index = (g_read_index + 1) % BUFF_SIZE;
41         printf("consume %d %d %d\n", data, g_read_index, g_write_index);
42         sem_post(&lock);
43         sem_post(&produce_sem);
44     }
45     return NULL;
46 }
47 
48 int main(int argc, char * argv[]){
49     pthread_t con;
50     pthread_t pros[PRODUCE_THREAD_SIZE];
51     sem_init(&lock, 0, 1);
52     sem_init(&consume_sem,0, 0);
53     sem_init(&produce_sem,0, BUFF_SIZE);
54 
55     pthread_create(&con, NULL, consume, NULL);
56     int thread_args[PRODUCE_THREAD_SIZE];
57     for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){
58         thread_args[i] = i + 1;
59         pthread_create(&pros[i], NULL, produce, (thread_args + i));
60     }
61 
62     pthread_join(con,0);
63     for(int i = 0; i < PRODUCE_THREAD_SIZE; i++)
64         pthread_join(pros[i],0);
65 
66     sem_destroy(&lock);
67     sem_destroy(&consume_sem);
68     sem_destroy(&produce_sem);
69 
70     return 0;
71 }

   代碼中,消息隊列的大小為3,produce_sem的初始值一定與消息隊列的大小相同。總共有5個生產者線程,多余可並發的數量(produce_sem),因此很大概率會有生產者線程阻塞在produce_sem對應的等待隊列。 另外兩點需要注意:第一點在produce和consume線程中都是需要加鎖(互斥鎖lock),因為信號量是可以並發的,需要對臨界資源(g_buff,g_read_index,g_write_index)互斥訪問。另外,在produce線程,需要先判斷能否並發,然后再對臨界區加鎖,二者的順序不能反過來,否則會產生死鎖。

  上面的代碼用:g++ -lpthread producer_consumer_semaphore.cpp producer_consumer_semaphore, 然后運行 ./producer_consumer_semaphore 即可

  

管程實現

   生產者消費者問題用管程(互斥鎖+條件變量)來實現也很簡單,而且個人感覺更直觀,linux下提供了條件變量和互斥鎖的實現,頭文件在/usr/include/pthread.h,代碼實現如下(producer_consumer_monitor.cpp):
  
 1 #include <stdio.h>
 2 #include <pthread.h>
 3 #include <time.h>
 4 #include <stdlib.h>
 5 #include <unistd.h>
 6 
 7 
 8 #define BUFF_SIZE  3
 9 #define PRODUCE_THREAD_SIZE 5
10 int g_buff[BUFF_SIZE];
11 int g_write_index = 0;
12 int g_read_index = 0;
13 
14 pthread_mutex_t lock;
15 pthread_cond_t consume_cond, produce_cond;
16 
17 
18 void* produce(void *ptr){
19     int idx = *(int*)ptr;
20     printf("in produce %d %d %d\n",idx, g_write_index, g_read_index);
21     while(1){
22         pthread_mutex_lock(&lock);
23         while((g_write_index + 1) % BUFF_SIZE == g_read_index)
24             pthread_cond_wait(&produce_cond, &lock);
25 
26         g_buff[g_write_index] = idx;
27         g_write_index = (g_write_index + 1) % BUFF_SIZE;
28 
29         pthread_cond_signal(&consume_cond);
30         pthread_mutex_unlock(&lock);
31 
32     }
33     return NULL;
34 }
35 
36 void* consume(void *ptr){
37     while(1){
38         pthread_mutex_lock(&lock);
39         while(g_read_index == g_write_index)
40              pthread_cond_wait(&consume_cond, &lock);
41 
42         int data = g_buff[g_read_index];
43         g_buff[g_read_index] = -1;
44         g_read_index = (g_read_index + 1) % BUFF_SIZE;
45         printf("consume %d\n", data);
46 
47         pthread_cond_signal(&produce_cond);
48         pthread_mutex_unlock(&lock);
49     }
50     return NULL;
51 }
52 
53 int main(int argc, char * argv[]){
54     pthread_t con;
55     pthread_t pros[PRODUCE_THREAD_SIZE];
56 
57     srand((unsigned)time(NULL));
58     pthread_mutex_init(&lock, 0);
59     pthread_cond_init(&consume_cond,0);
60     pthread_cond_init(&produce_cond,0);
61 
62     pthread_create(&con, NULL, consume, NULL);
63     int thread_args[PRODUCE_THREAD_SIZE];
64     for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){
65         thread_args[i] = i + 1;
66         pthread_create(&pros[i], NULL, produce, (thread_args + i));
67     }
68 
69     pthread_join(con,0);
70     for(int i = 0; i < PRODUCE_THREAD_SIZE; i++)
71         pthread_join(pros[i],0);
72 
73     pthread_mutex_destroy(&lock);
74     pthread_cond_destroy(&consume_cond);
75     pthread_cond_destroy(&produce_cond);
76 
77     return 0;
78 }

   上面的代碼用:g++ -lpthread producer_consumer_monitor.cpp producer_consumer_monitor, 然后運行 ./producer_consumer_monitor 即可

總結:

  並發可能帶來互斥、飢餓、死鎖等問題,操作系統為了解決這些問題提供了很多的支持,從最底層的禁止硬件終端和原子操作指令(test-and-set),到更高級的同步原語:鎖(互斥鎖、自旋鎖)、信號量、管程。管程是一種抽象數據結構,編程中使用互斥鎖配合信號量使用。管程有兩種不同的模式:hansen vs hoare,區別在於signal操作之后是否立即切換到被喚醒線程,實際的操作系統中,多使用hansen模式。

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM