為了允許在線程或進程之間共享數據,同步時必須的,互斥鎖和條件變量是同步的基本組成部分。
1、互斥鎖
互斥鎖是用來保護臨界區資源,實際上保護的是臨界區中被操縱的數據,互斥鎖通常用於保護由多個線程或多進程分享的共享數據。一般是一些可供線程間使用的全局變量,來達到線程同步的目的,即保證任何時刻只有一個線程或進程在執行其中的代碼。一般加鎖的輪廓如下:
pthread_mutex_lock() 臨界區 pthread_mutex_unlock()
互斥鎖API
pthread_mutex_lock(pthread_mutex_t *mutex);
用此函數加鎖時,如果mutex已經被鎖住,當前嘗試加鎖的線程就會阻塞,直到互斥鎖被其他線程釋放。當此函數返回時,說明互斥鎖已經被當前線程成功加鎖.
pthread_mutex_trylock(pthread_mutex_t *mutex);
用此函數加鎖時,如果mutex已經卑瑣主,當前嘗試加鎖的線程不會阻塞,而是立即返回,返回的錯誤碼為EBUSY,而不是阻塞等待。
pthread_mutex_unlock(pthread_mutex_t *mutex);
注意使用鎖之前要記得初始化。互斥鎖的初始化有兩種初始化方式:
1.對於靜態分配的互斥鎖一半用宏賦值的方式初始化
eg: static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2.對於動態分配的互斥鎖(如調用malloc)或分配在共享內存中,則必須調用pthread_mutex_init(pthread_mutex *mutex, pthread_mutexattr_t *mutexattr)函數來進行初始化。
例子1:寫個程序實現生產者—消費者問題,先只考慮多個生產者線程之間的同步,直到所有的生產者線程都完成工作以后,才啟動消費者線程。程序如下:

1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <errno.h> 6 7 #define MAXNITEMS 1000000 8 #define MAXNTHREADS 100 9 10 int nitems; 11 12 struct 13 { 14 pthread_mutex_t mutex; 15 int buff[MAXNITEMS]; 16 int nput; 17 int nval; 18 } shared = { 19 PTHREAD_MUTEX_INITIALIZER 20 }; 21 22 void *produce(void*); 23 void *consume(void*); 24 25 int main(int argc,char *argv[]) 26 { 27 int i,nthreads,count[MAXNTHREADS]; 28 pthread_t tid_produce[MAXNTHREADS],tid_consume; 29 if(argc != 3) 30 { 31 printf("usage: producongs2 <#itmes> <#threads>.\n"); 32 exit(0); 33 } 34 nitems = atoi(argv[1]); 35 nthreads = atoi(argv[2]); 36 pthread_setconcurrency(nthreads); //設置線程並發級別 37 for(i=0;i<nthreads;++i) 38 { 39 count[i] = 0; 40 pthread_create(&tid_produce[i],NULL,produce,&count[i]); 41 } 42 for(i=0;i<nthreads;i++) 43 { 44 pthread_join(tid_produce[i],NULL); //等待線程退出 45 printf("count[%d] = %d\n",i,count[i]); 46 } 47 pthread_create(&tid_consume,NULL,consume,NULL); 48 pthread_join(tid_consume,NULL); //等待線程退出 49 exit(0); 50 } 51 52 void *produce(void *arg) 53 { 54 for(; ;) 55 { 56 pthread_mutex_lock(&shared.mutex); //加鎖 57 if(shared.nput >= nitems) 58 { 59 pthread_mutex_unlock(&shared.mutex); //釋放鎖 60 return ; 61 } 62 shared.buff[shared.nput] = shared.nval; 63 shared.nput++; 64 shared.nval++; 65 pthread_mutex_unlock(&shared.mutex); //加鎖 66 *((int*) arg) += 1; 67 } 68 } 69 void *consume(void *arg) 70 { 71 int i; 72 for(i=0;i<nitems;i++) 73 { 74 if(shared.buff[i] != i) 75 printf("buff[%d] = %d\n",i,shared.buff[i]); 76 } 77 return; 78 }
程序執行結果如下:
例子2:改進例子1,所有生產者線程啟動后立即啟動消費者線程,這樣生產者線程產生數據的同時,消費者線程就能出來它,此時必須同步生產者和消費者,程序如下:

1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <errno.h> 6 7 #define MAXNITEMS 1000000 8 #define MAXNTHREADS 100 9 10 int nitems; 11 12 struct 13 { 14 pthread_mutex_t mutex; 15 int buff[MAXNITEMS]; 16 int nput; 17 int nval; 18 } shared = { 19 PTHREAD_MUTEX_INITIALIZER 20 }; 21 22 void *produce(void*); 23 void *consume(void*); 24 void consume_wait(int); 25 int main(int argc,char *argv[]) 26 { 27 int i,nthreads,count[MAXNTHREADS]; 28 pthread_t tid_produce[MAXNTHREADS],tid_consume; 29 if(argc != 3) 30 { 31 printf("usage: producongs2 <#itmes> <#threads>.\n"); 32 exit(0); 33 } 34 nitems = atoi(argv[1]); 35 nthreads = atoi(argv[2]); 36 pthread_setconcurrency(nthreads+1); 37 //創建生產者線程 38 for(i=0;i<nthreads;++i) 39 { 40 count[i] = 0; 41 pthread_create(&tid_produce[i],NULL,produce,&count[i]); 42 } 43 //創建消費者線程 44 pthread_create(&tid_consume,NULL,consume,NULL); 45 for(i=0;i<nthreads;i++) 46 { 47 pthread_join(tid_produce[i],NULL); 48 printf("count[%d] = %d\n",i,count[i]); 49 } 50 //等待消費者線程退出 51 pthread_join(tid_consume,NULL); 52 exit(0); 53 } 54 55 void *produce(void *arg) 56 { 57 for(; ;) 58 { 59 pthread_mutex_lock(&shared.mutex); 60 if(shared.nput >= nitems) 61 { 62 pthread_mutex_unlock(&shared.mutex); 63 return ; 64 } 65 shared.buff[shared.nput] = shared.nval; 66 shared.nput++; 67 shared.nval++; 68 pthread_mutex_unlock(&shared.mutex); 69 *((int*) arg) += 1; 70 } 71 } 72 void *consume(void *arg) 73 { 74 int i; 75 for(i=0;i<nitems;i++) 76 { 77 consume_wait(i); 78 if(shared.buff[i] != i) 79 printf("buff[%d] = %d\n",i,shared.buff[i]); 80 } 81 return; 82 } 83 void consume_wait(int i) 84 { 85 for(; ;) //進行輪詢,判斷i是否已經由生產者生產 86 { 87 pthread_mutex_lock(&shared.mutex); 88 if(i<shared.nput) //i已經生產 89 { 90 pthread_mutex_unlock(&shared.mutex); 91 return; 92 } 93 pthread_mutex_unlock(&shared.mutex); 94 } 95 }
存在的問題:當消費者獲取的條目尚沒有准備好時,消費者線程一次次的循環去判斷,每次給互斥鎖解鎖又上鎖,這種輪詢的辦法浪費CPU時間。
2、條件變量
互斥鎖用於上鎖,條件變量用於等待,條件變量的使用是與互斥鎖共通使用的。
2.1等待與信號發送
條件變量類型是pthread_cond_t,調用函數如下:
pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *pmutex);
pthread_cond_signal(pthread_cond_t *pcond);
每個條件變量總是有一個互斥鎖與之關聯。現在采用條件變量實現生產者與消費者問題,程序如下:

1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <pthread.h> 5 #include <errno.h> 6 7 #define MAXNITEMS 1000000 8 #define MAXNTHREADS 100 9 10 int nitems; 11 12 struct 13 { 14 pthread_mutex_t mutex; 15 int buff[MAXNITEMS]; 16 int nput; 17 int nval; 18 } shared = { 19 PTHREAD_MUTEX_INITIALIZER 20 }; 21 //條件變量 22 struct { 23 pthread_mutex_t mutex; 24 pthread_cond_t cond; 25 int nready; 26 }nready = { 27 PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER 28 }; 29 30 void *produce(void*); 31 void *consume(void*); 32 33 int main(int argc,char *argv[]) 34 { 35 int i,nthreads,count[MAXNTHREADS]; 36 pthread_t tid_produce[MAXNTHREADS],tid_consume; 37 if(argc != 3) 38 { 39 printf("usage: producongs2 <#itmes> <#threads>.\n"); 40 exit(0); 41 } 42 nitems = atoi(argv[1]); 43 nthreads = atoi(argv[2]); 44 pthread_setconcurrency(nthreads+1); 45 for(i=0;i<nthreads;++i) 46 { 47 count[i] = 0; 48 pthread_create(&tid_produce[i],NULL,produce,&count[i]); 49 } 50 pthread_create(&tid_consume,NULL,consume,NULL); 51 for(i=0;i<nthreads;i++) 52 { 53 pthread_join(tid_produce[i],NULL); 54 printf("count[%d] = %d\n",i,count[i]); 55 } 56 pthread_join(tid_consume,NULL); 57 exit(0); 58 } 59 60 void *produce(void *arg) 61 { 62 printf("producer begins work\n"); 63 for(; ;) 64 { 65 pthread_mutex_lock(&shared.mutex); 66 if(shared.nput >= nitems) 67 { 68 pthread_mutex_unlock(&shared.mutex); 69 return ; 70 } 71 shared.buff[shared.nput] = shared.nval; 72 shared.nput++; 73 shared.nval++; 74 pthread_mutex_unlock(&shared.mutex); 75 pthread_mutex_lock(&nready.mutex); 76 if(nready.nready == 0) 77 pthread_cond_signal(&nready.cond); //通知消費者 78 nready.nready++; 79 pthread_mutex_unlock(&nready.mutex); 80 *((int*) arg) += 1; 81 } 82 } 83 void *consume(void *arg) 84 { 85 int i; 86 printf("consuemer begins work.\n"); 87 for(i=0;i<nitems;i++) 88 { 89 pthread_mutex_lock(&nready.mutex); 90 while(nready.nready == 0) 91 pthread_cond_wait(&nready.cond,&nready.mutex); //等待生產者 92 nready.nready--; 93 pthread_mutex_unlock(&nready.mutex); 94 if(shared.buff[i] != i) 95 printf("buff[%d] = %d\n",i,shared.buff[i]); 96 } 97 return; 98 }
程序執行結果如下:
總的來說,給條件變量發送信號的過程代碼如下:
struct { pthread_mutex_t mutex; pthread_cond_t cond; //維護本條件的各個變量 }var = {PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER,...} pthread_mutex_lock(&var.mutex); 設置條件為真 pthread_cond_signal(&var.cond); pthread_mutex_unlock(&var.mutex);
測試條件並進入睡眠以等待條件變為真的代碼大體如下:
pthread_mutex_lock(&var.mutex); while(條件為假) pthread_cond_wait(&var.cond,&var.mutex); 修改條件 pthread_mutex_unlock(&var.mutex);
2.2定時等待和廣播
通常pthread_cond_signal只是喚醒等待在相應條件變量上的一個線程,在某些情況下需要喚醒多個線程(例如讀寫者問題),可以調用pthread_cond_broadcast喚醒阻塞在相應條件變量上的所有線程。pthread_cond_timewait允許線程就阻塞時間設置一個限制值。API如下:
pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex, const struct timespec *abstime);