1.條件變量創建 靜態創建:pthread_cond_t cond=PTHREAD_COND_INITIALIZER; 動態創建:pthread_cond _t cond; pthread_cond_init(&cond,NULL); 其中的第二個參數NULL表示條件變量的屬性,雖然POSIX中定義了條件變量的屬性,但在LinuxThread中並沒有實現,因此常常忽略。 2.條件等待 pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&mutex); while(條件1) pthread_cond_wait(&cond,&mutex); 函數操作1; pthread_mutex_unlock(&mutex); 當條件1成立的時候,執行pthread_cond_wait(&cond,&mutex)這一句,開放互斥鎖,然后線程被掛起。當條件1不成立的時候,跳過while循環體,執行函數操作1,然后開放互斥鎖。 3.條件激發 pthread_mutex_lock(&mutex); 函數操作2; if(條件1不成立) pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); 先執行函數操作2,改變條件狀態,使得條件1不成立的時候,執行pthread_cond_signal(&cond)這句話。這句話的意思是激發條件變量cond,使得被掛起的線程被喚醒。 pthread_cond_broadcast(&cond); 這句話也是激發條件變量cond,但是,這句話是激發所有由於cond條件被掛起的線程。而signal的函數則是激發一個由於條件變量cond被掛起的線程。 4.條件變量的銷毀 pthread_cond_destroy(&cond); 在linux中,由於條件變量不占用任何資源,所以,這句話除了檢查有沒有等待條件變量cond的線程外,不做任何操作。
#include <pthread.h> #include <stdio.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int count = 5; //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //pthread_mutex_lock(&mutex); //while(條件1) // pthread_cond_wait(&cond, &mutex); //函數操作1 //pthread_mutex_unlock(&mutex); //解釋:當條件1成立的時候,執行pthread_cond_wait(&cond, &mutex)這一句,開放互斥鎖,然后線程被掛起。 // 當條件1不成立的時候,跳過while循環體,執行函數操作1,然后開放互斥鎖 // 即無論走哪個分支,都會放開互斥鎖 //此線程先啟動 void* decrement(void* arg) { while(1) { //條件等待 pthread_mutex_lock(&mutex); while(count <= 0) { printf("count<=0,thread is hanging!\n"); pthread_cond_wait(&cond, &mutex); sleep(1); printf("sleep!\n"); } count -= 1; pthread_mutex_unlock(&mutex); if (count == 9) { printf("count==9,thread1 is over!\n"); return NULL; } } } //條件激發 //pthread_mutex_lock(&mutex); //函數操作2; //if(條件1不成立) // pthread_cond_signal(&cond); // pthread_mutex_unlock(&mutex); //解釋:先執行函數操作2,改變條件變量,使得條件1不成立的時候,執行pthread_cond_signal(&cond)這句話的意思是激發條件變量cond,使得被掛起的線程被喚醒。 // pthread_cond_broadcast(&cond); 激發所有由於cond條件而被掛起的線程。而signal的函數則是激發一個由於條件變量cond被掛起的線程 void *increment(void *arg) { sleep(1); while(1) { pthread_mutex_lock(&mutex); count += 1; if(count > 0) { printf("count=%d, change cond state!\n", count); pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex); if (count == 10) { printf("count=10,thread is over!\n"); return NULL; } } } int main(void) { int i1=1, i2=1; pthread_t id1, id2; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); pthread_create(&id1, NULL, decrement, NULL); pthread_create(&id2, NULL, increment, NULL); i2 = pthread_join(id2, NULL); i1 = pthread_join(id1, NULL); if ( (i2==0) && (i1==0) ) { printf("count=%d, the main thread!\n", count); pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); return 0; } return -1; }
#include <stdio.h> #include <pthread.h> #include <queue> #include <string> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; std::queue<std::string> que; void* consumer(void* arg) { while (true) { pthread_mutex_lock(&mutex); while ( que.empty() ) { printf("que is empty , thread consumer hanging!\n"); pthread_cond_wait(&cond, &mutex); } while (!que.empty()) { printf("%s\n", que.front().c_str()); que.pop(); } pthread_mutex_unlock(&mutex); } return NULL; } void* producer(void* arg) { int count = 0; while (true) { pthread_mutex_lock(&mutex); for (int i=0; i<10; i++) { char arr[20]; snprintf(arr, sizeof(arr), "my num is: %d", i); que.push(arr); pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex); count++; if (count > 10) { printf("stop producer %d s\n", 5); sleep(15); count = 0; } } return NULL; } int main(void) { pthread_t arr[2]; pthread_create(&arr[0], NULL, consumer, NULL); sleep(3); printf("sleep 3 s\n"); pthread_create(&arr[1], NULL, producer, NULL); for(int i=0; i< 2; i++) { pthread_join(arr[i], NULL); } pthread_cond_destroy(&cond); pthread_mutex_destroy(&mutex); }
http://blog.csdn.net/hemmanhui/article/details/4417433
互斥鎖:用來上鎖。
條件變量:用來等待,當條件變量用來自動阻塞一個線程,直到某特殊情況發生為止。通常條件變量和互斥鎖同時使用。
函數介紹:
1.
| 名稱: |
pthread_cond_init |
| 目標: |
條件變量初始化 |
| 頭文件: |
#include < pthread.h> |
| 函數原形: |
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); |
| 參數: |
cptr 條件變量 attr 條件變量屬性 |
| 返回值: |
成功返回0,出錯返回錯誤編號。 |
pthread_cond_init函數可以用來初始化一個條件變量。他使用變量attr所指定的屬性來初始化一個條件變量,如果參數attr為空,那么它將使用缺省的屬性來設置所指定的條件變量。
2.
| 名稱: |
pthread_cond_destroy |
| 目標: |
條件變量摧毀 |
| 頭文件: |
#include < pthread.h> |
| 函數原形: |
int pthread_cond_destroy(pthread_cond_t *cond); |
| 參數: |
cptr 條件變量 |
| 返回值: |
成功返回0,出錯返回錯誤編號。 |
pthread_cond_destroy函數可以用來摧毀所指定的條件變量,同時將會釋放所給它分配的資源。調用該函數的進程也並不要求等待在參數所指定的條件變量上。
3.
| 名稱: |
pthread_cond_wait/pthread_cond_timedwait |
| 目標: |
條件變量等待 |
| 頭文件: |
#include < pthread.h> |
| 函數原形: |
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex); int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_t mytex,const struct timespec *abstime); |
| 參數: |
cond 條件變量 mutex 互斥鎖 |
| 返回值: |
成功返回0,出錯返回錯誤編號。 |
第一個參數*cond是指向一個條件變量的指針。第二個參數*mutex則是對相關的互斥鎖的指針。函數pthread_cond_timedwait函數類型與函數pthread_cond_wait,區別在於,如果達到或是超過所引用的參數*abstime,它將結束並返回錯誤ETIME.pthread_cond_timedwait函數的參數*abstime指向一個timespec結構。該結構如下:
typedef struct timespec{
time_t tv_sec;
long tv_nsex;
}timespec_t;
3.名稱:
| pthread_cond_signal/pthread_cond_broadcast |
|
| 目標: |
條件變量通知 |
| 頭文件: |
#include < pthread.h> |
| 函數原形: |
int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond); |
| 參數: |
cond 條件變量 |
| 返回值: |
成功返回0,出錯返回錯誤編號。 |
參數*cond是對類型為pthread_cond_t 的一個條件變量的指針。當調用pthread_cond_signal時一個在相同條件變量上阻塞的線程將被解鎖。如果同時有多個線程阻塞,則由調度策略確定接收通知的線程。如果調用pthread_cond_broadcast,則將通知阻塞在這個條件變量上的所有線程。一旦被喚醒,線程仍然會要求互斥鎖。如果當前沒有線程等待通知,則上面兩種調用實際上成為一個空操作。如果參數*cond指向非法地址,則返回值EINVAL。
下面是一個簡單的例子,我們可以從程序的運行來了解條件變量的作用。
| #include <pthread.h> sleep(1); } if(i%3!=0) sleep(1); } |
程序創建了2個新線程使他們同步運行,實現進程t_b打印20以內3的倍數,t_a打印其他的數,程序開始線程t_b不滿足條件等待,線程t_a運行使a循環加1並打印。直到i為3的倍數時,線程t_a發送信號通知進程t_b,這時t_b滿足條件,打印i值。
下面是運行結果:
#cc –lpthread –o cond cond.c
#./cond
thread1:1
thread1:2
thread2:3
thread1:4
thread1:5
thread2:6
thread1:7
thread1:8
thread2:9
備注:
pthread_cond_wait 執行的流程首先將這個mutex解鎖, 然后等待條件變量被喚醒, 如果沒有被喚醒, 該線程將一直休眠, 也就是說, 該線程將一直阻塞在這個pthread_cond_wait調用中, 而當此線程被喚醒時, 將自動將這個mutex加鎖,然后再進行條件變量判斷(原因是“驚群效應”,如果是多個線程都在等待這個條件,而同時只能有一個線程進行處理,此時就必須要再次條件判斷,以使只有一個線程進入臨界區處理。),如果滿足,則線程繼續執行,最后解鎖,
也就是說pthread_cond_wait實際上可以看作是以下幾個動作的合體:
解鎖線程鎖
等待線程喚醒,並且條件為true
加鎖線程鎖.
pthread_cond_signal僅僅負責喚醒正在阻塞在同一條件變量上的一個線程,如果存在多個線程,系統自動根據調度策略決定喚醒其中的一個線程,在多處理器上,該函數是可能同時喚醒多個線程,同時該函數與鎖操作無關,解鎖是由pthread_mutex_unlock(&mutex)完成
喚醒丟失問題
在線程並沒有阻塞在條件變量上時,調用pthread_cond_signal或pthread_cond_broadcast函數可能會引起喚醒丟失問題。
喚醒丟失往往會在下面的情況下發生:
一個線程調用pthread_cond_signal或pthread_cond_broadcast函數;
另一個線程正處在測試條件變量和調用pthread_cond_wait函數之間;
沒有線程正在處在阻塞等待的狀態下
由來: 最近一直在想怎么高效率的在IO線程接收到數據時通知邏輯線程(基於線程池)工作的問題,像網絡編程的服務器模型的一些模型都需要用到這個實現,下面我這里簡單的羅列一個多線程的網絡服務器模型 半同步/半異步(half-sync/half-async): 許多餐廳使用 半同步/半異步 模式的變體。例如,餐廳常常雇佣一個領班負責迎接顧客,並在餐廳繁忙時留意給顧客安排桌位,為等待就餐的顧客按序排隊是必要的。領班由所有顧客“共享”,不能被任何特定顧客占用太多時間。當顧客在一張桌子入坐后,有一個侍應生專門為這張桌子服務。 對於上面羅列的這種模型,本文討論的問題是當領班接到客人時,如何高效率的通知侍應生去服務顧客. 在我們使用很廣泛的線程池實現中,也會有一樣的問題 方法實現: 1.使用鎖+輪詢 使用這種方法可以很簡單的實現,但是會有一定的性能消耗,其還有一個點要好好把握,就是一次輪詢沒有結果后相隔多久進行下一次的輪詢,間隔時間太短,消耗的CPU資源較多,間隔時間太長,不能很及時的響應請求。這就相當於上面的這個例子,侍應生時不時的取詢問領班有沒有顧客到來 2.使用條件變量的線程同步 線程條件變量pthread_cond_t 線程等待某個條件 int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime); int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 通知函數 通知所有的線程 int pthread_cond_broadcast(pthread_cond_t *cond); 只通知一個線程 int pthread_cond_signal(pthread_cond_t *cond); 正確的使用方法 pthread_cond_wait用法: pthread_mutex_lock(&mutex); while(condition_is_false) { pthread_cond_wait(&cond,&mutex); } condition_is_false=true; //此操作是帶鎖的,也就是說只有一個線程同時進入這塊 pthread_mutex_unlock(&mutex); pthread_cond_signal用法: pthread_mutex_lock(&mutex); condition_is_false=false; pthread_cond_signal(&cond) pthread_mutex_unlock(&mutex) 我剛初用的時候,覺得非常的奇怪,為什么要這樣用,加了mutex后還需要一個condition_is_false變量來表示有沒有活干。其實這樣子的一個操作主要是為了解決“假激活”問題,因為我么您這里的使用場景,只需要激活一個線程,因為一個線程干一個活,而不是多個線程干一個活,所以為了避免線程被激活了,但實際又沒有事情干,所以使用了這么一套機制。 實際上,信號和pthread_cond_broadcast是兩個常見的導致假喚醒的情況。假如條件變量上有多個線程在等待,pthread_cond_broadcast會喚醒所有的等待線程,而pthread_cond_signal只會喚醒其中一個等待線程。這樣,pthread_cond_broadcast的情況也許要在pthread_cond_wait前使用while循環來檢查條件變量。 來個例子: 復制代碼 1 #include <pthread.h> 2 #include <stdio.h> 3 #include<stdlib.h> 4 #include<unistd.h> 5 6 /* For safe condition variable usage, must use a boolean predicate and */ 7 /* a mutex with the condition. */ 8 int workToDo = 0; 9 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 10 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 11 12 #define NTHREADS 20 13 14 static void checkResults(char *string, int rc) { 15 if (rc) { 16 printf("Error on : %s, rc=%d", 17 string, rc); 18 exit(EXIT_FAILURE); 19 } 20 return; 21 } 22 23 void *threadfunc(void *parm) 24 { 25 int rc; 26 27 while (1) { 28 /* Usually worker threads will loop on these operations */ 29 rc = pthread_mutex_lock(&mutex); 30 checkResults("pthread_mutex_lock()\n", rc); 31 32 while (!workToDo) { 33 printf("Thread blocked\n"); 34 rc = pthread_cond_wait(&cond, &mutex); 35 checkResults("pthread_cond_wait()\n", rc); 36 } 37 printf("Thread awake, finish work!\n"); 38 sleep(2); 39 /* Under protection of the lock, complete or remove the work */ 40 /* from whatever worker queue we have. Here it is simply a flag */ 41 workToDo = 0; 42 printf("In mutex lock\n"); 43 rc = pthread_mutex_unlock(&mutex); 44 sleep(2); 45 printf("Out mutex lock\n"); 46 checkResults("pthread_mutex_lock()\n", rc); 47 } 48 return NULL; 49 } 50 51 int main(int argc, char **argv) 52 { 53 int rc=0; 54 int i; 55 pthread_t threadid[NTHREADS]; 56 57 printf("Enter Testcase - %s\n", argv[0]); 58 59 printf("Create %d threads\n", NTHREADS); 60 for(i=0; i<NTHREADS; ++i) { 61 rc = pthread_create(&threadid[i], NULL, threadfunc, NULL); 62 checkResults("pthread_create()\n", rc); 63 } 64 65 sleep(5); /* Sleep is not a very robust way to serialize threads */ 66 67 for(i=0; i<5; ++i) { 68 printf("Wake up a worker, work to do...\n"); 69 70 rc = pthread_mutex_lock(&mutex); 71 checkResults("pthread_mutex_lock()\n", rc); 72 73 /* In the real world, all the threads might be busy, and */ 74 /* we would add work to a queue instead of simply using a flag */ 75 /* In that case the boolean predicate might be some boolean */ 76 /* statement like: if (the-queue-contains-work) */ 77 if (workToDo) { 78 printf("Work already present, likely threads are busy\n"); 79 } 80 workToDo = 1; 81 rc = pthread_cond_broadcast(&cond); 82 // rc = pthread_cond_signal(&cond); 83 checkResults("pthread_cond_broadcast()\n", rc); 84 85 rc = pthread_mutex_unlock(&mutex); 86 checkResults("pthread_mutex_unlock()\n", rc); 87 sleep(5); /* Sleep is not a very robust way to serialize threads */ 88 } 89 90 printf("Main completed\n"); 91 exit(0); 92 return 0; 93 } 復制代碼 事實上上面的例子無論是使用pthread_cond_signal還是pthread_cond_broadcast,都只會打印Thread awake, finish work5次,大家可能會覺得非常奇怪,但是實際情況就是這樣的。 為了明白其pthread_cont_wait內部干了什么工作,有必要深入一下其內部實現。 關於其內部實現偽代碼如下: 復制代碼 1 pthread_cond_wait(mutex, cond): 2 value = cond->value; /* 1 */ 3 pthread_mutex_unlock(mutex); /* 2 */ 4 pthread_mutex_lock(cond->mutex); /* 10 */ pthread_cond_t自帶一個mutex來互斥對waiter等待鏈表的操作 5 if (value == cond->value) { /* 11 */ 檢查一次是不是cond有被其他線程設置過,相當於單例模式的第二次檢測是否為NULL 6 me->next_cond = cond->waiter; 7 cond->waiter = me;//鏈表操作 8 pthread_mutex_unlock(cond->mutex); 9 unable_to_run(me); 10 } else 11 pthread_mutex_unlock(cond->mutex); /* 12 */ 12 pthread_mutex_lock(mutex); /* 13 */ 13 14 pthread_cond_signal(cond): 15 pthread_mutex_lock(cond->mutex); /* 3 */ 16 cond->value++; /* 4 */ 17 if (cond->waiter) { /* 5 */ 18 sleeper = cond->waiter; /* 6 */ 19 cond->waiter = sleeper->next_cond; /* 7 */ //鏈表操作 20 able_to_run(sleeper); /* 8 */ 運行sleep的線程,即上面的me 21 } 22 pthread_mutex_unlock(cond->mutex); /* 9 */ 復制代碼 pthread_cond_broadcast雖然能夠激活所有的線程,但是激活之后會有mutex鎖,也就是說他的激活是順序進行的,只有第一個激活的線程調用pthread_mutex_unlock(&mutex)后,后一個等待的線程才會繼續運行.因為從pthread_cond_wait(&cond,&mutex)到pthread_mutex_unlock(&mutex)區間是加的獨占鎖,從wait激活后的第一個線程占用了這個鎖,所以其他的線程不能運行,只能等待。所以當第一個被激活的線程修改了condition_is_false后(上面測試代碼的workToDo),接着調用pthread_mutex_unlock(&mutex)后,此時其他等待在cond的一個線程會激活,但是此時condition_is_false已經被設置,所以他跑不出while循環,當調用pthread_cond_wait時,其內部pthread_mutex_unlock(mutex)調用會導致另一個在它后面的等待在cond的線程被激活。 所以,通過這種方式,即便是誤調用了pthread_cond_broadcast或者由於信號中斷的原因激活了所有在等待條件的線程,也能保證其結果是正確的。 另外說一句題外話,很多人寫的基於條件變量線程同步的框架,說自己是無鎖的,其實這是不對的,只是內部鎖的機制在pthread_cond_wait實現了而已,其還是基於互斥鎖的實現。真正想要達到無鎖的可以關注一下lockfree相關的CAS算法,其內部使用一個intel CPU的cmpxchg8指令完成的,其實這種實現個人認為和傳統鎖相比只是一個非阻塞鎖和阻塞鎖的區別。
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> // for safe condition variable usage, must use a boolean predicate and // a mutex with the condition int work_to_do = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define NUM_THREADS 20 static void check_result(char *str, int rc) { if (rc) { printf("Error on: %s, rc=%d", str, rc); exit(EXIT_FAILURE); } return; } void *threadfunc(void *param) { int rc; while (1) { //usually woker threads will loop on these operations rc = pthread_mutex_lock(&mutex); check_result("pthread_mutex_lock()\n", rc); while (!work_to_do) { printf("thread hanging\n"); rc = pthread_cond_wait(&cond, &mutex); check_result("pthread_cond_wait()\n", rc); } printf("thread awake, finish work!\n"); sleep(2); // under protection of the lock, complete or remove the work // from whatever worker queue we have, here it is simply a flag work_to_do = 0; printf("in mutex lock\n"); rc = pthread_mutex_unlock(&mutex); sleep(2); printf("out mutex lock\n"); check_result("pthread_mutex_unlock()\n", rc); } return NULL; } int main(void) { int rc = 0; int i; pthread_t threadid[NUM_THREADS]; printf("Enter %d threads\n", NUM_THREADS); for (i=0; i<NUM_THREADS; ++i) { rc = pthread_create(&threadid[i], NULL, threadfunc, NULL); check_result("pthread_create()\n", rc); } sleep(5); //sleep is not a very robust way to serialize threads for(i=0; i<5; ++i) { printf("wake up a worker, work to do...\n"); rc = pthread_mutex_lock(&mutex); check_result("pthread_mutex_lock()\n", rc); // in the real world, all the threads might be busy, and // we would add work to a queue instead of simply using a flag // in that case the boolean predicate might be some boolean // statement like: if (the-queue-contains-work) if(work_to_do) { printf("work already present, likely threads are busy\n"); } work_to_do = 1; //rc = pthread_cond_broadcast(&cond); rc = pthread_cond_signal(&cond); check_result("pthread_cond_broadcast()\n", rc); rc = pthread_mutex_unlock(&mutex); check_result("pthread_mutex_unlock()\n", rc); sleep(5); // sleep is not a very robust way to serialize threads } printf("mian complete\n"); exit(0); return 0; }
