由來:
最近一直在想怎么高效率的在IO線程接收到數據時通知邏輯線程(基於線程池)工作的問題,像網絡編程的服務器模型的一些模型都需要用到這個實現,下面我這里簡單的羅列一個多線程的網絡服務器模型
半同步/半異步(half-sync/half-async):
許多餐廳使用 半同步/半異步 模式的變體。例如,餐廳常常雇佣一個領班負責迎接顧客,並在餐廳繁忙時留意給顧客安排桌位,為等待就餐的顧客按序排隊是必要的。領班由所有顧客“共享”,不能被任何特定顧客占用太多時間。當顧客在一張桌子入坐后,有一個侍應生專門為這張桌子服務。
對於上面羅列的這種模型,本文討論的問題是當領班接到客人時,如何高效率的通知侍應生去服務顧客.
在我們使用很廣泛的線程池實現中,也會有一樣的問題
方法實現:
1.使用鎖+輪詢
使用這種方法可以很簡單的實現,但是會有一定的性能消耗,其還有一個點要好好把握,就是一次輪詢沒有結果后相隔多久進行下一次的輪詢,間隔時間太短,消耗的CPU資源較多,間隔時間太長,不能很及時的響應請求。這就相當於上面的這個例子,侍應生時不時的取詢問領班有沒有顧客到來
2.使用條件變量的線程同步
線程條件變量pthread_cond_t
線程等待某個條件
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
通知函數
通知所有的線程
int pthread_cond_broadcast(pthread_cond_t *cond);
只通知一個線程
int pthread_cond_signal(pthread_cond_t *cond);
正確的使用方法
pthread_cond_wait用法:
pthread_mutex_lock(&mutex);
while(condition_is_false)
{
pthread_cond_wait(&cond,&mutex);
}
condition_is_false=true; //此操作是帶鎖的,也就是說只有一個線程同時進入這塊
pthread_mutex_unlock(&mutex);
pthread_cond_signal用法:
pthread_mutex_lock(&mutex);
condition_is_false=false;
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
我剛初用的時候,覺得非常的奇怪,為什么要這樣用,加了mutex后還需要一個condition_is_false變量來表示有沒有活干。其實這樣子的一個操作主要是為了解決“假激活”問題,因為我么您這里的使用場景,只需要激活一個線程,因為一個線程干一個活,而不是多個線程干一個活,所以為了避免線程被激活了,但實際又沒有事情干,所以使用了這么一套機制。
實際上,信號和pthread_cond_broadcast是兩個常見的導致假喚醒的情況。假如條件變量上有多個線程在等待,pthread_cond_broadcast會喚醒所有的等待線程,而pthread_cond_signal只會喚醒其中一個等待線程。這樣,pthread_cond_broadcast的情況也許要在pthread_cond_wait前使用while循環來檢查條件變量。
來個例子:
View Code
1 #include <pthread.h>
2 #include <stdio.h>
3 #include<stdlib.h>
4 #include<unistd.h>
5
6 /* For safe condition variable usage, must use a boolean predicate and */
7 /* a mutex with the condition. */
8 int workToDo = 0; 9 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 10 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 11
12 #define NTHREADS 20
13
14 static void checkResults(char *string, int rc) { 15 if (rc) { 16 printf("Error on : %s, rc=%d", 17 string, rc); 18 exit(EXIT_FAILURE); 19 } 20 return; 21 } 22
23 void *threadfunc(void *parm) 24 { 25 int rc; 26
27 while (1) { 28 /* Usually worker threads will loop on these operations */
29 rc = pthread_mutex_lock(&mutex); 30 checkResults("pthread_mutex_lock()\n", rc); 31
32 while (!workToDo) { 33 printf("Thread blocked\n"); 34 rc = pthread_cond_wait(&cond, &mutex); 35 checkResults("pthread_cond_wait()\n", rc); 36 } 37 printf("Thread awake, finish work!\n"); 38 sleep(2); 39 /* Under protection of the lock, complete or remove the work */
40 /* from whatever worker queue we have. Here it is simply a flag */
41 workToDo = 0; 42 printf("In mutex lock\n"); 43 rc = pthread_mutex_unlock(&mutex); 44 sleep(2); 45 printf("Out mutex lock\n"); 46 checkResults("pthread_mutex_lock()\n", rc); 47 } 48 return NULL; 49 } 50
51 int main(int argc, char **argv) 52 { 53 int rc=0; 54 int i; 55 pthread_t threadid[NTHREADS]; 56
57 printf("Enter Testcase - %s\n", argv[0]); 58
59 printf("Create %d threads\n", NTHREADS); 60 for(i=0; i<NTHREADS; ++i) { 61 rc = pthread_create(&threadid[i], NULL, threadfunc, NULL); 62 checkResults("pthread_create()\n", rc); 63 } 64
65 sleep(5); /* Sleep is not a very robust way to serialize threads */
66
67 for(i=0; i<5; ++i) { 68 printf("Wake up a worker, work to do...\n"); 69
70 rc = pthread_mutex_lock(&mutex); 71 checkResults("pthread_mutex_lock()\n", rc); 72
73 /* In the real world, all the threads might be busy, and */
74 /* we would add work to a queue instead of simply using a flag */
75 /* In that case the boolean predicate might be some boolean */
76 /* statement like: if (the-queue-contains-work) */
77 if (workToDo) { 78 printf("Work already present, likely threads are busy\n"); 79 } 80 workToDo = 1; 81 rc = pthread_cond_broadcast(&cond); 82 // rc = pthread_cond_signal(&cond);
83 checkResults("pthread_cond_broadcast()\n", rc); 84
85 rc = pthread_mutex_unlock(&mutex); 86 checkResults("pthread_mutex_unlock()\n", rc); 87 sleep(5); /* Sleep is not a very robust way to serialize threads */
88 } 89
90 printf("Main completed\n"); 91 exit(0); 92 return 0; 93 }
事實上上面的例子無論是使用pthread_cond_signal還是pthread_cond_broadcast,都只會打印Thread awake, finish work5次,大家可能會覺得非常奇怪,但是實際情況就是這樣的。 為了明白其pthread_cont_wait內部干了什么工作,有必要深入一下其內部實現。
關於其內部實現偽代碼如下:
1 pthread_cond_wait(mutex, cond): 2 value = cond->value; /* 1 */ 3 pthread_mutex_unlock(mutex); /* 2 */ 4 pthread_mutex_lock(cond->mutex); /* 10 */ pthread_cond_t自帶一個mutex來互斥對waiter等待鏈表的操作 5 if (value == cond->value) { /* 11 */ 檢查一次是不是cond有被其他線程設置過,相當於單例模式的第二次檢測是否為NULL 6 me->next_cond = cond->waiter; 7 cond->waiter = me;//鏈表操作 8 pthread_mutex_unlock(cond->mutex); 9 unable_to_run(me); 10 } else 11 pthread_mutex_unlock(cond->mutex); /* 12 */ 12 pthread_mutex_lock(mutex); /* 13 */ 13 14 pthread_cond_signal(cond): 15 pthread_mutex_lock(cond->mutex); /* 3 */ 16 cond->value++; /* 4 */ 17 if (cond->waiter) { /* 5 */ 18 sleeper = cond->waiter; /* 6 */ 19 cond->waiter = sleeper->next_cond; /* 7 */ //鏈表操作 20 able_to_run(sleeper); /* 8 */ 運行sleep的線程,即上面的me 21 } 22 pthread_mutex_unlock(cond->mutex); /* 9 */
pthread_cond_broadcast雖然能夠激活所有的線程,但是激活之后會有mutex鎖,也就是說他的激活是順序進行的,只有第一個激活的線程調用pthread_mutex_unlock(&mutex)后,后一個等待的線程才會繼續運行.因為從pthread_cond_wait(&cond,&mutex)到pthread_mutex_unlock(&mutex)區間是加的獨占鎖,從wait激活后的第一個線程占用了這個鎖,所以其他的線程不能運行,只能等待。所以當第一個被激活的線程修改了condition_is_false后(上面測試代碼的workToDo),接着調用pthread_mutex_unlock(&mutex)后,此時其他等待在cond的一個線程會激活,但是此時condition_is_false已經被設置,所以他跑不出while循環,當調用pthread_cond_wait時,其內部pthread_mutex_unlock(mutex)調用會導致另一個在它后面的等待在cond的線程被激活。
所以,通過這種方式,即便是誤調用了pthread_cond_broadcast或者由於信號中斷的原因激活了所有在等待條件的線程,也能保證其結果是正確的。
另外說一句題外話,很多人寫的基於條件變量線程同步的框架,說自己是無鎖的,其實這是不對的,只是內部鎖的機制在pthread_cond_wait實現了而已,其還是基於互斥鎖的實現。真正想要達到無鎖的可以關注一下lockfree相關的CAS算法,其內部使用一個intel CPU的cmpxchg8指令完成的,其實這種實現個人認為和傳統鎖相比只是一個非阻塞鎖和阻塞鎖的區別。

