用信號量和讀寫鎖解決讀者寫者問題


讀者寫者問題是非常經典的同步問題,本文首先用信號量來解決這個問題,並結合代碼分析什么是讀者優先、什么是寫者優先,然后給出讀寫鎖的解決方案,並指出在Linux下讀寫鎖的注意事項。

 

讀者寫者問題

  讀者寫者問題描述的是這么一種情況:對象在多個線程(或者進程)之間共享,其中一些線程只會讀數據,另外一些線程只會寫數據。為了保證寫入和讀取的正確性,我們需要保證,只要有線程在寫,那么其他線程不能讀,否則可能讀到寫了一半的數據;另外,也不能有兩個線程同時寫,否則導致數據錯亂。當然,多個線程是可以同時讀數據。

  讀者寫者問題在計算機領域非常普遍,大家最容易想到的就是數據庫,數據庫的讀寫分離也是為了減少因為讀者寫者問題加鎖帶來的對並發的影響。

  讀者寫者問題的解決方案一般都有兩種不同的側重:讀者優先或者寫者優先。簡單來說,讀者優化就是盡量滿足並發的讀操作,當已經有線程在讀數據的時候,其他讀線程無需等待,而寫線程需要等待所有正在進行的讀操作之后才能執行。寫者優先就是盡量滿足寫操作,盡管寫操作不能並發,但是可以排隊,優先於等待的讀線程獲得執行權。

信號量解決讀者寫者問題

  關於信號量的基礎知識,可以參見上一篇文章《並發與同步、信號量與管程、生產者消費者問題》的講解。這個章節通過信號量來解決讀者寫者問題,為了更好的分析,可以用下面的情況來概括所有的並發的可能。首先r代表讀線程 w代表寫線程,我們給出三元組(X, Y Z), X代表當前已經獲取資源訪問權的線程, Y和Z代表隨后並發到達的線程(注意,YZ是並發的),那么綜合有以下情況:

  (r rr)
  (r rw)
  (r wr)
  (r ww)
  (w rr)
  (w rw)
  (w wr)
  (w ww)

  另外,當V操作釋放信號量時,如果有多個線程阻塞在當前信號量,那么該喚醒哪一個線程呢?有的教程說有隊列,先來的先服務(FCFS);也有文章說,it is undefined,所以不要做任何假設。為了方便后文分析,假設喚醒的線程是隨機的。因此上面的八種情況中YZ的值為 rw 和 wr是一樣的。

  另外YZ為rr 和 ww的情況也很簡單,因此實際上需要分析的是下面兩種情況:

  (r1 r2w2): 當已經有寫線程執行的時候,同時來了一個讀線程(w2)和一個寫線程(r2),那么是讀線程w2先執行還是寫線程r2先執行

  (w1 r2w2):當已經有讀線程執行的時候,同時來了一個讀線程和一個寫線程,那么是讀線程w2先執行還是寫線程r2先執行

 

  后面的分析都會結合這兩種情況。

  

Linux下信號量API

  Linux下提供了信號量相關的API,定義在/usr/include/semaphore.h, 我們常用的是以下四個函數

int sem_init (sem_t *__sem, int __pshared, unsigned int __value);  
int sem_destroy (sem_t *__sem);
int sem_wait (sem_t *__sem);
int sem_post (sem_t *__sem);

  這些API都是需要配對使用,如果配合C++,可以使用RAII來保證。sem_init初始化信號量,注意第二個參數,代表了該信號量是否與其他進程共享,第三個參數是共享資源的數量。sem_wait即P操作,嘗試獲取資源;sem_wait即V操作,釋放資源。個人感覺sem_wait這個名字不是很友好,讓人以為會wait,但事實上不一定會wait,所以感覺叫acquire好一些。

  

讀者優先:

  由於讀者寫者問題運行並發的讀操作,寫操作必須是互斥的,所以寫出來的程序最直觀的就是讀者優先。源代碼(rw_semaphore_rp.cpp)

  1 #include <stdio.h>
  2 #include <pthread.h>
  3 #include <time.h>
  4 #include <stdlib.h>
  5 #include <unistd.h>
  6 #include <semaphore.h>
  7 #include <time.h>
  8 struct data{
  9     int read_counter;
 10     int write_counter;
 11     int value;
 12 };
 13 
 14 data share_data;
 15 
 16 int READ_THREAD_NUM = 5;
 17 int WRITE_THREAD_NUM = 3;
 18 int TOTAL_TRY = 10000; // 嘗試這么多次程序就結束吧
 19 
 20 bool stop = false;
 21 
 22 int reader_num = 0; 
 23 sem_t lock;
 24 sem_t lock_writer;
 25 sem_t w_or_r;
 26 
 27 
 28 void* func_read(void *ptr){
 29     while(!stop){
 30         sem_wait(&lock);
 31         reader_num += 1;
 32         share_data.read_counter += 1;
 33         if(reader_num == 1)
 34             sem_wait(&w_or_r);
 35         sem_post(&lock);
 36 
 37         // do sth with share_data
 38         int tmp = share_data.value;        
 39 
 40         sem_wait(&lock);
 41         reader_num -= 1;
 42         if(reader_num == 0)
 43             sem_post(&w_or_r);
 44         sem_post(&lock);
 45         sleep(0.1);
 46     }
 47     return NULL;
 48 }
 49 
 50 void* func_write(void *ptr){
 51     int idx = *(int*)ptr;
 52     while(!stop){
 53 #ifdef LOCK_WRITER
 54         sem_wait(&lock_writer);
 55 #endif
 56         sem_wait(&w_or_r);
 57 
 58         share_data.write_counter += 1;
 59         share_data.value = idx;
 60 
 61         if(share_data.write_counter >= TOTAL_TRY)
 62             stop = true;
 63 
 64         sem_post(&w_or_r);
 65  #ifdef LOCK_WRITER
 66         sem_post(&lock_writer);
 67 #endif
 68 
 69 
 70         sleep(0.1);
 71     }
 72     return NULL;
 73 }
 74 
 75 int main(int argc, char * argv[]){
 76     share_data.read_counter = 0;
 77     share_data.write_counter = 0;
 78 
 79     sem_init(&lock, 0, 1);
 80 #ifdef LOCK_WRITER
 81     sem_init(&lock_writer, 0, 1);
 82 #endif
 83     sem_init(&w_or_r, 0, 1);
 84 
 85     pthread_t readers[READ_THREAD_NUM];
 86     pthread_t writers[WRITE_THREAD_NUM];
 87 
 88     for(int i = 0; i < READ_THREAD_NUM; i++){
 89         pthread_create(&readers[i], NULL, func_read, NULL);
 90     }
 91 
 92     int thread_args[WRITE_THREAD_NUM];
 93     for(int i = 0; i < WRITE_THREAD_NUM; i++){
 94         thread_args[i] = i + 1;
 95         pthread_create(&writers[i], NULL, func_write, (thread_args + i));
 96     }
 97 
 98     for(int i = 0; i < READ_THREAD_NUM; i++)
 99         pthread_join(readers[i],0);
100 
101     //for(int i = 0; i < WRITE_THREAD_NUM; i++)
102     //   pthread_join(writers[i],0);
103     
104     sem_destroy(&lock);
105 #ifdef LOCK_WRITER
106     sem_destroy(&lock_writer);
107 #endif
108     sem_destroy(&w_or_r);
109     printf("Finally read count %d, write count %d\n", share_data.read_counter, share_data.write_counter);
110     return 0;
111 }

  首先忽略掉宏定義LOCK_WRITER, 那么使用的就是lockw_or_r這兩個信號量。w_or_r使用來保護對共享資源(讀寫的資源)的訪問,在func_write中對share_data的寫操作之前,sem_wait(w_or_r)即信號量的P操作,操作完成之后sem_post。在讀線程func_read中,用reader_num記錄並發的讀線程數量,對這個變量的操作用lock這個信號量來互斥(在並發的讀線程之間互斥)。從代碼可以看到,只有當第一個讀線程發起讀操作時(reader_num==1)才會去sem_wait(w_or_r),而且只有當所有的讀線程都結束時(reader_num == 0),才會釋放w_or_r。

  我們來考慮本章開始提出的問題:

  (r1 r2w2),r1(讀線程)獲取了管程的執行權,那么w2(寫線程)一定會阻塞,而r2不用對w_or_r執行P操作,因此可以立即執行,那么后面r2w2的執行順序一定是r2 然后是 w2

  (w1 r2w2),w1獲取了管程的執行權,那么r2和w2都一定會阻塞在w_or_r,前面提到多個線程被阻塞時,V操作喚醒的線程是隨機的,因此r2 w2各有50%的概率先執行

  用以下命令可編譯並執行代碼 EXE=rw_semaphore && g++  -g -lpthread $EXE.cpp -o $EXE && ./$EXE,下面是執行了5次的結果:

Finally read count 14378, write count 10001
Finally read count 3870, write count 10002
Finally read count 8040, write count 10001
Finally read count 9077, write count 10001
Finally read count 14386, write count 10002

  

  接下來考慮宏定義LOCK_WRITER, 在這種情況下有lock_writer對func_write加鎖,所以當寫線程正在寫數據時,后面來的讀線程阻塞在lock_writer,而不是w_or_r。

  我們來考慮本章開始提出的問題:

  (r1 r2w2),r1(讀線程)獲取了管程的執行權,那么w2(寫線程)一定會阻塞,而r2不用對w_or_r執行P操作,因此可以立即執行,那么后面r2w2的執行順序一定是r2 然后是 w2

  (w1 r2w2),w1獲取了管程的執行權,那么r2會阻塞在w_or_r,此時w2阻塞在lock_writer,當w1釋放w_or_r時,一定是r2獲得w_or_r, 那么后面r2w2的執行順序一定是r2 然后是 w2

  用以下命令可編譯並執行代碼 EXE=rw_semaphore && g++ -DLOCK_WRITER -g -lpthread $EXE.cpp -o $EXE && ./$EXE,下面是執行了5次的結果:

Finally read count 25983, write count 10001
Finally read count 14378, write count 10001
Finally read count 54344, write count 10001
Finally read count 35147, write count 10001
Finally read count 27105, write count 10002

  可以看到,read操作的數量明顯高於之前的版本。

 

寫者優先:

  按照第一章節對讀者寫者問題的描述,所謂的寫者優先即如果當前是寫線程獲得了執行權,那么后續並發有讀請求和寫請求到來的時候,我們優先考慮后續的讀請求。為了達到這個目的,我們可以記錄當前正在嘗試寫操作的請求數量,當這個數量大於1時,讀操作需要被阻塞,當所有寫操作都完成,再通知等到的讀操作請求,這個解決思路比較簡單,和讀者優先的解決方法也很類似,下面是具體的代碼(rw_semaphore_wnp.cpp):

  1 #include <stdio.h>
  2 #include <pthread.h>
  3 #include <time.h>
  4 #include <stdlib.h>
  5 #include <unistd.h>
  6 #include <semaphore.h>
  7 #include <time.h>
  8 struct data{
  9     int read_counter;
 10     int write_counter;
 11     int value;
 12     int value1;
 13 };
 14 
 15 data share_data;
 16 
 17 int READ_THREAD_NUM = 5;
 18 int WRITE_THREAD_NUM = 3;
 19 int TOTAL_TRY = 10000; // 嘗試這么多次程序就結束吧
 20 
 21 bool stop = false;
 22 int reader_num = 0;
 23 int writer_num = 0;
 24 sem_t lock, wlock;
 25 sem_t w_or_r;
 26 sem_t try_read;
 27 
 28 void* func_read(void *ptr){
 29     while(!stop){
 30         sem_wait(&try_read);
 31 
 32         sem_wait(&lock);
 33 
 34         reader_num += 1;
 35         share_data.read_counter += 1;
 36         if(reader_num == 1){
 37             sem_wait(&w_or_r);
 38         }
 39 
 40         sem_post(&lock);
 41 
 42         sem_post(&try_read);
 43 
 44         // do sth with share_data
 45         int tmp = share_data.value;        
 46         sleep(0);
 47         if(tmp != share_data.value1){
 48             printf("Error happens %d %d \n", tmp, share_data.value1);
 49             stop = true;
 50         }
 51 
 52         sem_wait(&lock);
 53         reader_num -= 1;
 54         if(reader_num == 0){
 55             sem_post(&w_or_r);
 56         }
 57         sem_post(&lock);
 58 
 59         sleep(0.1);
 60     }
 61     return NULL;
 62 }
 63 
 64 void* func_write(void *ptr){
 65     int idx = *(int*)ptr;
 66     while(!stop){
 67         sem_wait(&wlock);
 68         writer_num += 1;
 69         if(writer_num == 1)
 70             sem_wait(&try_read);
 71         sem_post(&wlock);        
 72 
 73         sem_wait(&w_or_r);
 74 
 75         share_data.write_counter += 1;
 76         share_data.value = idx;
 77         share_data.value1 = idx;
 78 
 79         if(share_data.write_counter >= TOTAL_TRY)
 80             stop = true;
 81         sem_post(&w_or_r);
 82 
 83         sem_wait(&wlock);
 84         writer_num -= 1;
 85         if(writer_num == 0)
 86             sem_post(&try_read);
 87         sem_post(&wlock);
 88         sleep(0.1);
 89     }
 90     return NULL;
 91 }
 92 
 93 int main(int argc, char * argv[]){
 94     share_data.read_counter = 0;
 95     share_data.write_counter = 0;
 96 
 97     sem_init(&lock, 0, 1);
 98     sem_init(&wlock, 0, 1);
 99     sem_init(&w_or_r, 0, 1);
100     sem_init(&try_read, 0, 1);
101 
102     pthread_t readers[READ_THREAD_NUM];
103     pthread_t writers[WRITE_THREAD_NUM];
104 
105     for(int i = 0; i < READ_THREAD_NUM; i++){
106         pthread_create(&readers[i], NULL, func_read, NULL);
107     }
108 
109     int thread_args[WRITE_THREAD_NUM];
110     for(int i = 0; i < WRITE_THREAD_NUM; i++){
111         thread_args[i] = i + 1;
112         pthread_create(&writers[i], NULL, func_write, (thread_args + i));
113     }
114 
115     for(int i = 0; i < READ_THREAD_NUM; i++)
116         pthread_join(readers[i],0);
117 
118     //for(int i = 0; i < WRITE_THREAD_NUM; i++)
119     //   pthread_join(writers[i],0);
120     
121     sem_destroy(&lock);
122     sem_destroy(&wlock);
123     sem_destroy(&w_or_r);
124     sem_destroy(&try_read);
125     printf("Finally read count %d, write count %d\n", share_data.read_counter, share_data.write_counter);
126     return 0;
127 }

 

   對比讀者優先的代碼,主要是增加了一個計數器(writer_num)和兩個信號量(wlock, try_read)。writer_num用來記錄排隊的寫線程數目,wlock用來保證writer_num的互斥修改。try_read的作用是保證讀線程每次操作之前都得獲得這個信號量,而在func_write中,只有當所有的讀線程都結束(writer_num == 0)時才會釋放try_write這個信號量,這樣就達到了讀操作優先的目的。

  同樣我們做一下理論上的分析:

  (r1 r2w2),r1(讀線程)獲取了管程的執行權,此時W2和r2都會try_lock(分別是第70 和 第30行),當r1釋放try_read時,喚醒r2還是w2是隨機的,因此r2 w2各有50%的概率先執行

  (w1 r2w2),w1獲取了管程的執行權,那么r2會阻塞在try_read,同事w2阻塞在w_or_r。w1是先釋放w_or_r,后釋放try_read, 所以w2會先獲得執行權, 那么后面r2w2的執行順序一定是w2 然后是r2

  用以下命令可編譯並執行代碼 EXE=rw_semaphore_wnp && g++ -g -lpthread $EXE.cpp -o $EXE && ./$EXE,下面是執行了5次的結果:

Finally read count 3342, write count 10001
Finally read count 2252, write count 10002
Finally read count 7795, write count 10002
Finally read count 8394, write count 10001
Finally read count 676, write count 10002

 

讀寫鎖解決讀者寫者問題

   讀者寫者問題是一個如此普遍的並發問題,因此很多語言為了便於開發者的使用都提供的個高級的抽象,即讀寫鎖。

Linux下讀寫鎖API

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
int pthread_rwlockattr_getkind_np(const pthread_rwlockattr_t *attr, int *pref);

  前面兩個函數用來初始和銷毀讀寫鎖,注意init函數可以指定屬性參數(attr)。中間有三個函數用於讀寫鎖的lock與unlock,pthread_rwlock_rdlock用來獲取read lock(用於讀數據),pthread_rwlock_wrlock用來獲取write lock(用來寫數據)。最后兩個函數用來設置、判斷是讀者優先,還是寫着優先,在pthread.h源碼枚舉值如下:

enum
{
  PTHREAD_RWLOCK_PREFER_READER_NP,
  PTHREAD_RWLOCK_PREFER_WRITER_NP,
  PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP,
  PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP
};

  從上面可以看到有三種模式:讀者優先,寫者優先,寫者非遞歸優先。但坑爹的是,如果設置讀者優先(即PTHREAD_RWLOCK_PREFER_WRITER_NP)根本不起作用。如果希望寫者優先,那么要使用PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP,這個問題描述可以參見這篇文章man page

實現:

  基於讀寫鎖rw_lock,解決讀者寫者問題就比較簡單了。先設置好優先選項,然后再寫線程中調用pthread_rwlock_wrlock,在讀線程中調用pthread_rwlock_rdlock。具體的代碼如下(rw_rwlock.cpp):

  1 #include <stdio.h>
  2 #include <pthread.h>
  3 #include <time.h>
  4 #include <stdlib.h>
  5 #include <unistd.h>
  6 #include <time.h>
  7 
  8 struct data{
  9     int read_counter;
 10     int write_counter;
 11     int value;
 12 };
 13 
 14 data share_data;
 15 
 16 int READ_THREAD_NUM = 5;
 17 int WRITE_THREAD_NUM = 3;
 18 int TOTAL_TRY = 10000; // 嘗試這么多次程序就結束吧
 19 
 20 bool stop = false;
 21 pthread_rwlock_t rwlock;
 22 pthread_mutex_t lock;
 23 
 24 void* func_read(void *ptr){
 25     while(!stop){
 26         pthread_rwlock_rdlock(&rwlock);
 27 
 28         pthread_mutex_lock(&lock);
 29         share_data.read_counter += 1;
 30         pthread_mutex_unlock(&lock);
 31         // do sth about value
 32         int tmp = share_data.value;
 33 
 34         pthread_rwlock_unlock(&rwlock);
 35         sleep(0.1);
 36     }
 37     return NULL;
 38 }
 39 
 40 void* func_write(void *ptr){
 41     int idx = *(int*)ptr;
 42     while(!stop){
 43         pthread_rwlock_wrlock(&rwlock);
 44 
 45         share_data.write_counter += 1;
 46         share_data.value = idx;
 47         if(share_data.write_counter >= TOTAL_TRY)
 48             stop = true;
 49         pthread_rwlock_unlock(&rwlock);
 50         sleep(0.1);
 51 
 52     }
 53     return NULL;
 54 }
 55 
 56 int main(int argc, char * argv[]){
 57     share_data.read_counter = 0;
 58     share_data.write_counter = 0;
 59 
 60     pthread_rwlockattr_t attr;
 61     pthread_rwlockattr_init(&attr);
 62     
 63 
 64 #ifdef WRITER_NONRECURSIVE_NP
 65     pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
 66 #endif
 67 
 68     int np;
 69     pthread_rwlockattr_getkind_np(&attr, &np);
 70     printf("readnp: %d, writenp: %d, write_nonrecursivenp: %d, and we are using %d\n", \
 71             PTHREAD_RWLOCK_PREFER_READER_NP, PTHREAD_RWLOCK_PREFER_WRITER_NP, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP , np);
 72 
 73     pthread_rwlock_init(&rwlock, &attr);
 74 
 75     pthread_mutex_init(&lock, 0);
 76 
 77     pthread_t readers[READ_THREAD_NUM];
 78     pthread_t writers[WRITE_THREAD_NUM];
 79 
 80     for(int i = 0; i < READ_THREAD_NUM; i++){
 81         pthread_create(&readers[i], NULL, func_read, NULL);
 82     }
 83 
 84     int thread_args[WRITE_THREAD_NUM];
 85     for(int i = 0; i < WRITE_THREAD_NUM; i++){
 86         thread_args[i] = i + 1;
 87         pthread_create(&writers[i], NULL, func_write, (thread_args + i));
 88     }
 89 
 90     for(int i = 0; i < READ_THREAD_NUM; i++)
 91         pthread_join(readers[i],0);
 92 
 93     //for(int i = 0; i < WRITE_THREAD_NUM; i++)
 94     //   pthread_join(writers[i],0);
 95     
 96     pthread_rwlockattr_destroy(&attr);
 97     pthread_rwlock_destroy(&rwlock);
 98     pthread_mutex_destroy(&lock);
 99      printf("Finally read count %d, write count %d\n", share_data.read_counter, share_data.write_counter);
100     return 0;
101 }

 

  上面的代碼中,為了統計讀請求執行的次數,使用了互斥鎖(lock),在編譯的時候通過是否預定於WRITER_NONRECURSIVE_NP這個宏來指定是讀者優先還是寫着優先。

  首先是讀者優先時得指令 EXE=rw_rwlock && g++ -g -lpthread $EXE.cpp -o $EXE && ./$EXE,輸出結果如下(注意,為了統計 所以在讀線程中加了互斥鎖lock,去掉這個互斥鎖 讀操作的次數會更多):

Finally read count 9820, write count 10002
Finally read count 58371, write count 10002
Finally read count 2297, write count 10002
Finally read count 23586, write count 10001
Finally read count 4217, write count 10002

  然后是寫者優先時得指令 EXE=rw_rwlock && g++ -DWRITER_NONRECURSIVE_NP -g -lpthread $EXE.cpp -o $EXE && ./$EXE,輸出結果如下

Finally read count 376, write count 10001
Finally read count 711, write count 10001
Finally read count 928, write count 10001
Finally read count 1468, write count 10001
Finally read count 449, write count 10001

references:

readers writers problem

pthread_rwlockattr_setkind_np.3.html

並發與同步、信號量與管程、生產者消費者問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM