互斥鎖和條件變量


  為了允許在線程或進程之間共享數據,同步時必須的,互斥鎖和條件變量是同步的基本組成部分。

1、互斥鎖

  互斥鎖是用來保護臨界區資源,實際上保護的是臨界區中被操縱的數據,互斥鎖通常用於保護由多個線程或多進程分享的共享數據。一般是一些可供線程間使用的全局變量,來達到線程同步的目的,即保證任何時刻只有一個線程或進程在執行其中的代碼。一般加鎖的輪廓如下:

pthread_mutex_lock() 臨界區 pthread_mutex_unlock()

互斥鎖API

pthread_mutex_lock(pthread_mutex_t *mutex);

 用此函數加鎖時,如果mutex已經被鎖住,當前嘗試加鎖的線程就會阻塞,直到互斥鎖被其他線程釋放。當此函數返回時,說明互斥鎖已經被當前線程成功加鎖.

pthread_mutex_trylock(pthread_mutex_t *mutex);

 用此函數加鎖時,如果mutex已經卑瑣主,當前嘗試加鎖的線程不會阻塞,而是立即返回,返回的錯誤碼為EBUSY,而不是阻塞等待。

pthread_mutex_unlock(pthread_mutex_t *mutex);

注意使用鎖之前要記得初始化。互斥鎖的初始化有兩種初始化方式:

1.對於靜態分配的互斥鎖一半用宏賦值的方式初始化

eg: static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

2.對於動態分配的互斥鎖(如調用malloc)或分配在共享內存中,則必須調用pthread_mutex_init(pthread_mutex *mutex, pthread_mutexattr_t *mutexattr)函數來進行初始化。

例子1:寫個程序實現生產者—消費者問題,先只考慮多個生產者線程之間的同步,直到所有的生產者線程都完成工作以后,才啟動消費者線程。程序如下:

View Code
 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <pthread.h>
 5 #include <errno.h>
 6 
 7 #define     MAXNITEMS        1000000
 8 #define     MAXNTHREADS     100
 9 
10 int nitems;
11 
12 struct
13 {
14     pthread_mutex_t     mutex;
15     int                 buff[MAXNITEMS];
16     int                 nput;
17     int                 nval;
18 } shared = {
19     PTHREAD_MUTEX_INITIALIZER
20 };
21 
22 void *produce(void*);
23 void *consume(void*);
24 
25 int main(int argc,char *argv[])
26 {
27     int     i,nthreads,count[MAXNTHREADS];
28     pthread_t tid_produce[MAXNTHREADS],tid_consume;
29     if(argc != 3)
30     {
31         printf("usage: producongs2 <#itmes> <#threads>.\n");
32         exit(0);
33     }
34     nitems = atoi(argv[1]);
35     nthreads = atoi(argv[2]);
36     pthread_setconcurrency(nthreads);  //設置線程並發級別
37     for(i=0;i<nthreads;++i)
38     {
39         count[i] = 0;
40         pthread_create(&tid_produce[i],NULL,produce,&count[i]);
41     }
42     for(i=0;i<nthreads;i++)
43     {
44         pthread_join(tid_produce[i],NULL); //等待線程退出
45         printf("count[%d] = %d\n",i,count[i]);
46     }
47     pthread_create(&tid_consume,NULL,consume,NULL);
48     pthread_join(tid_consume,NULL);  //等待線程退出
49     exit(0);
50 }
51 
52 void *produce(void *arg)
53 {
54     for(; ;)
55     {
56         pthread_mutex_lock(&shared.mutex); //加鎖
57         if(shared.nput >= nitems)
58         {
59             pthread_mutex_unlock(&shared.mutex); //釋放鎖
60             return ;
61         }
62         shared.buff[shared.nput] = shared.nval;
63         shared.nput++;
64         shared.nval++;
65         pthread_mutex_unlock(&shared.mutex); //加鎖
66         *((int*) arg) += 1;
67     }
68 }
69 void *consume(void *arg)
70 {
71     int     i;
72     for(i=0;i<nitems;i++)
73     {
74         if(shared.buff[i] != i)
75             printf("buff[%d] = %d\n",i,shared.buff[i]);
76     }
77     return;
78 }

程序執行結果如下:

例子2:改進例子1,所有生產者線程啟動后立即啟動消費者線程,這樣生產者線程產生數據的同時,消費者線程就能出來它,此時必須同步生產者和消費者,程序如下:

View Code
 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <pthread.h>
 5 #include <errno.h>
 6 
 7 #define     MAXNITEMS        1000000
 8 #define     MAXNTHREADS     100
 9 
10 int nitems;
11 
12 struct
13 {
14     pthread_mutex_t     mutex;
15     int                 buff[MAXNITEMS];
16     int                 nput;
17     int                 nval;
18 } shared = {
19     PTHREAD_MUTEX_INITIALIZER
20 };
21 
22 void *produce(void*);
23 void *consume(void*);
24 void consume_wait(int);
25 int main(int argc,char *argv[])
26 {
27     int     i,nthreads,count[MAXNTHREADS];
28     pthread_t tid_produce[MAXNTHREADS],tid_consume;
29     if(argc != 3)
30     {
31         printf("usage: producongs2 <#itmes> <#threads>.\n");
32         exit(0);
33     }
34     nitems = atoi(argv[1]);
35     nthreads = atoi(argv[2]);
36     pthread_setconcurrency(nthreads+1);
37     //創建生產者線程
38     for(i=0;i<nthreads;++i)
39     {
40         count[i] = 0;
41         pthread_create(&tid_produce[i],NULL,produce,&count[i]);
42     }
43     //創建消費者線程
44     pthread_create(&tid_consume,NULL,consume,NULL);
45     for(i=0;i<nthreads;i++)
46     {
47         pthread_join(tid_produce[i],NULL);
48         printf("count[%d] = %d\n",i,count[i]);
49     }
50     //等待消費者線程退出
51     pthread_join(tid_consume,NULL);
52     exit(0);
53 }
54 
55 void *produce(void *arg)
56 {
57     for(; ;)
58     {
59         pthread_mutex_lock(&shared.mutex);
60         if(shared.nput >= nitems)
61         {
62             pthread_mutex_unlock(&shared.mutex);
63             return ;
64         }
65         shared.buff[shared.nput] = shared.nval;
66         shared.nput++;
67         shared.nval++;
68         pthread_mutex_unlock(&shared.mutex);
69         *((int*) arg) += 1;
70     }
71 }
72 void *consume(void *arg)
73 {
74     int     i;
75     for(i=0;i<nitems;i++)
76     {
77         consume_wait(i);
78         if(shared.buff[i] != i)
79             printf("buff[%d] = %d\n",i,shared.buff[i]);
80     }
81     return;
82 }
83 void consume_wait(int i)
84 {
85     for(; ;)  //進行輪詢,判斷i是否已經由生產者生產
86     {
87         pthread_mutex_lock(&shared.mutex);
88         if(i<shared.nput)   //i已經生產
89         {
90             pthread_mutex_unlock(&shared.mutex);
91             return; 
92         }
93         pthread_mutex_unlock(&shared.mutex);
94     }
95 }

存在的問題:當消費者獲取的條目尚沒有准備好時,消費者線程一次次的循環去判斷,每次給互斥鎖解鎖又上鎖,這種輪詢的辦法浪費CPU時間。

2、條件變量

  互斥鎖用於上鎖,條件變量用於等待,條件變量的使用是與互斥鎖共通使用的。

2.1等待與信號發送

  條件變量類型是pthread_cond_t,調用函數如下:

pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *pmutex);

pthread_cond_signal(pthread_cond_t *pcond);

每個條件變量總是有一個互斥鎖與之關聯。現在采用條件變量實現生產者與消費者問題,程序如下:

View Code
 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <pthread.h>
 5 #include <errno.h>
 6 
 7 #define     MAXNITEMS        1000000
 8 #define     MAXNTHREADS     100
 9 
10 int nitems;
11 
12 struct
13 {
14     pthread_mutex_t     mutex;
15     int                 buff[MAXNITEMS];
16     int                 nput;
17     int                 nval;
18 } shared = {
19     PTHREAD_MUTEX_INITIALIZER
20 };
21 //條件變量
22 struct {
23     pthread_mutex_t mutex;  
24     pthread_cond_t  cond;
25     int nready;
26 }nready = {
27   PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER
28 };
29 
30 void *produce(void*);
31 void *consume(void*);
32 
33 int main(int argc,char *argv[])
34 {
35     int     i,nthreads,count[MAXNTHREADS];
36     pthread_t tid_produce[MAXNTHREADS],tid_consume;
37     if(argc != 3)
38     {
39         printf("usage: producongs2 <#itmes> <#threads>.\n");
40         exit(0);
41     }
42     nitems = atoi(argv[1]);
43     nthreads = atoi(argv[2]);
44     pthread_setconcurrency(nthreads+1);
45     for(i=0;i<nthreads;++i)
46     {
47         count[i] = 0;
48         pthread_create(&tid_produce[i],NULL,produce,&count[i]);
49     }
50     pthread_create(&tid_consume,NULL,consume,NULL);
51     for(i=0;i<nthreads;i++)
52     {
53         pthread_join(tid_produce[i],NULL);
54         printf("count[%d] = %d\n",i,count[i]);
55     }
56     pthread_join(tid_consume,NULL);
57     exit(0);
58 }
59 
60 void *produce(void *arg)
61 {
62     printf("producer begins work\n");
63     for(; ;)
64     {
65         pthread_mutex_lock(&shared.mutex);
66         if(shared.nput >= nitems)
67         {
68             pthread_mutex_unlock(&shared.mutex);
69             return ;
70         }
71         shared.buff[shared.nput] = shared.nval;
72         shared.nput++;
73         shared.nval++;
74         pthread_mutex_unlock(&shared.mutex);
75         pthread_mutex_lock(&nready.mutex);
76         if(nready.nready == 0)
77             pthread_cond_signal(&nready.cond); //通知消費者
78         nready.nready++;
79         pthread_mutex_unlock(&nready.mutex);
80         *((int*) arg) += 1;
81     }
82 }
83 void *consume(void *arg)
84 {
85     int     i;
86     printf("consuemer begins work.\n");
87     for(i=0;i<nitems;i++)
88     {
89         pthread_mutex_lock(&nready.mutex);
90         while(nready.nready == 0)
91             pthread_cond_wait(&nready.cond,&nready.mutex); //等待生產者
92         nready.nready--;
93         pthread_mutex_unlock(&nready.mutex);
94         if(shared.buff[i] != i)
95             printf("buff[%d] = %d\n",i,shared.buff[i]);
96     }
97     return;
98 }

程序執行結果如下:

總的來說,給條件變量發送信號的過程代碼如下:

struct
{
    pthread_mutex_t    mutex;
    pthread_cond_t       cond;
    //維護本條件的各個變量
}var = {PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER,...}

pthread_mutex_lock(&var.mutex);
設置條件為真
pthread_cond_signal(&var.cond);
pthread_mutex_unlock(&var.mutex);

測試條件並進入睡眠以等待條件變為真的代碼大體如下:

pthread_mutex_lock(&var.mutex);
while(條件為假)
   pthread_cond_wait(&var.cond,&var.mutex);
修改條件
pthread_mutex_unlock(&var.mutex);

 2.2定時等待和廣播

  通常pthread_cond_signal只是喚醒等待在相應條件變量上的一個線程,在某些情況下需要喚醒多個線程(例如讀寫者問題),可以調用pthread_cond_broadcast喚醒阻塞在相應條件變量上的所有線程。pthread_cond_timewait允許線程就阻塞時間設置一個限制值。API如下:

pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex, const struct timespec *abstime);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM