Posix信號量


1、概述

  信號量(semaphore)是一種用於提供不同進程間或一個給定進程的不同線程間同步手段的原語。信號量的使用主要是用來保護共享資源,使得資源在一個時刻只有一個進程(線程)所擁有。信號量的值為正的時候,說明它空閑。所測試的線程可以鎖定而使用它。若為0,說明它被占用,測試的線程要進入睡眠隊列中,等待被喚醒。Posix信號量分為有名信號量和無名信號量(也叫基於內存的信號量)。

2、Posix有名信號量

  有名信號量既可以用於線程間的同步也可以用於進程間的同步。

1)由sem_open來創建一個新的信號量或打開一個已存在的信號量。其格式為:

sem_t *sem_open(const char *name,int oflag,mode_t mode,unsigned int value);
返回:若成功則為指向信號量的指針,若出錯則為SEM_FAILED 其中,第三、四個參數可以沒有,主要看第二個參數如何選取。
oflag參數:可以是0、O_CREAT或O_CREAT|O_EXCL。如果指定O_CREAT標志而沒有指定O_EXCL,那么只有當所需的信號量尚未存在時才初始化它。但是如果所需的信號量已經存在也不會出錯。 但是如果在所需的信號量存在的情況下指定O_CREAT|O_EXCL卻會報錯。
mode參數:指定權限位。
value參數:指定信號量的初始值。該初始值不能超過SEM_VALUE_MAX(這個常值必須至少為32767).二值信號量的初始值通常為1,計數信號量的初始值則往往大於1。
用sem_close來關閉該信號量。

創建一個新的信號量程序如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 #include <fcntl.h>
 7 
 8 //創建模式權限
 9 #define FILE_MODE  (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
10 
11 int main(int argc,char *argv[])
12 {
13     int     c, flags;
14     sem_t   *sem;
15     unsigned int value;
16     flags = O_RDWR | O_CREAT;
17     value = 1; //初始化信號量的值為1,即二元信號量
18     while((c = getopt(argc,argv,"ei:"))!= -1)
19     {
20         switch(c)
21         {
22             case 'e':
23                 flags |= O_EXCL;
24                 break;
25             case 'i':
26                 value = atoi(optarg);  //獲取信號量的值
27                 break;
28         }
29     }
30     if(optind != argc -1)
31     {
32         printf("usage: semcreate [-e] [-i initalvalue] <name>");
33         exit(0);
34     }
35     //創建信號量,返回sem_t類型指針
36     if((sem = sem_open(argv[optind],flags,FILE_MODE,value)) == SEM_FAILED)
37     {
38         perror("sem_open() error");
39         exit(-1);
40     }
41     //關閉打開的信號量
42     sem_close(sem);
43     exit(0);
44 }

2)使用sem_unlink刪除信號量:
int sem_unlink(const char *name); 返回:成功返回0,出錯返回-1

刪除信號量程序如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 #include <fcntl.h>
 7 
 8 int main(int argc,char *argv[])
 9 {
10     if(argc != 2)
11     {
12         printf("usage: semunlink<name>");
13         exit(0);
14     }
15         //從系統中刪除信號量
16     if(sem_unlink(argv[1]) == -1)
17     {
18         perror("sem_unlink() error");
19         exit(-1);
20     }
21     exit(0);
22 }

3)獲取信號量的當前值:
int sem_getvalue(sem_t *sem,int *valp); 返回:成功返回0,出錯返回-1
sem_getvalue在由valp指向的整數中返回所指定信號量的當前值。如果信號量當前已上鎖,那么返回值或為0,或為某個負數,絕對值即為等待等待該信號量解鎖的線程數。

獲取信號量的值程序如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 #include <fcntl.h>
 7 
 8 int main(int argc,char *argv[])
 9 {
10     sem_t  *sem;
11     int  val;
12     if(argc != 2)
13     {
14         printf("usage: semgetvalue<name>");
15         exit(0);
16     }
17     //打開一個已經存在的有名信號量
18     sem = sem_open(argv[1],0);
19      //獲取信號量的值
20     sem_getvalue(sem,&val);
21     printf("value = %d\n",val);
22     exit(0);
23 }

4)信號量的等待:(P操作,也稱為遞減down 或 上鎖lock)
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
返回:成功返回0,出錯返回-1
sem_wait函數測試所指定信號量的值,如果該值大於0,就將它的值減1並立即返回;如果該值等於0,調用線程就被投入睡眠中,直到該值變為大於0,這時再將它減1,函數隨后返回。“測試並減1”操作必須是原子的。sem_wait和sem_trywait的差別是:當所指定信號量的值已經是0時,后者並不將調用的進程投入睡眠。相反,它返回一個EAGAIN錯誤。如果被某個信號中斷,sem_wait就可能過早的返回,返回的錯誤為EINTR。

等待信號量程序如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 #include <fcntl.h>
 7 
 8 int main(int argc,char *argv[])
 9 {
10     sem_t *sem;
11     int val;
12     if(argc != 2)
13     {
14         printf("usage: semwait<name>");
15         exit(0);
16     }
17         //打開已經存在的信號量
18     sem = sem_open(argv[1],0);
19         //等待
20     sem_wait(sem);
21         //獲取信號量的值
22     sem_getvalue(sem,&val);
23     printf("pid %ld has semaphore,value = %d\n",(long) getpid(),val);
24     pause();
25     exit(0);
26 }

5)信號量掛出(V操作,也稱為遞增up 或解鎖unlock)
int sem_post(sem_t *sem);返回:成功返回0,出錯返回-1 將所指定的信號量值加1

信號量掛出程序如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 #include <fcntl.h>
 7 
 8 int main(int argc,char *argv[])
 9 {
10     sem_t     *sem;
11     int     val;
12     if(argc != 2)
13     {
14         printf("usage: semopt <name>");
15         exit(0);
16     }
17     //打開已經存在的信號量
18     sem = sem_open(argv[1],0);
19     //信號量掛出
20     sem_post(sem); 
21     //獲取掛出后的信號量值
22     sem_getvalue(sem,&val);
23     printf("value = %ld\n",val);
24     exit(0);
25 }

在Centos上測試Posix信號量如下:

3、采用Posix信號量實現生產者-消費者問題

  對生產者-消費者問題進行擴展,把共享緩沖區用作一個環繞緩沖區,即生產者填寫最后一項后回頭來填寫第一項,消費者也這么操作。此時需要維持三個條件:

(1)當緩沖區為空時,消費者不能試圖從其中去除一個條目

(2)當緩沖區填滿時,生產者不能試圖往其中放置一個條目

(3)共享變量可能描述緩沖區的當前狀態(下標、計數和鏈表指針),因此生產者和消費者的所有緩沖區操作都必須保護起來,以避免競爭。

給出使用信號量的方案展示三種不同類型的信號量:

(1)定義mutex二元信號量保護兩個臨界區。

(2)定義nempty的計數信號量統計共享緩沖區中的空槽位數。

(3)定義nstored的計數信號量統計共享緩沖區中已填寫的槽位數。

實現單個生產者和單個消費者的情況,程序如下所示:

View Code
  1 include <stdlib.h>
  2 #include <unistd.h>
  3 #include <semaphore.h>
  4 #include <errno.h>
  5 #include <fcntl.h>
  6 //文件模式
  7 #define FILE_MODE  (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
  8 #define NBUFF             10      //槽位的個數
  9 #define SEM_MUTEX         "mutex1" 
 10 #define SEM_NEMPTY         "nemtpy1"
 11 #define SEM_NSTORED     "nstored1"
 12 
 13 int nitems;  //條目的個數
 14 //緩沖區結構
 15 struct  
 16 {
 17     int buff[NBUFF];   
 18     sem_t *mutex,*nempty,*nstored;   //信號量
 19 }shared;
 20 
 21 char *px_ipc_name(const char *name);
 22 void *produce(void *arg);
 23 void *consume(void *arg);
 24 
 25 int main(int argc,char *argv[])
 26 {
 27     pthread_t   tid_produce,tid_consume;
 28     if(argc != 2)
 29     {
 30         printf("usage: prodcons <#itmes>");
 31         exit(0);
 32     }
 33     nitems = atoi(argv[1]);  //獲取條目數目
 34         //創建二元信號量
 35     if((shared.mutex = sem_open(SEM_MUTEX,O_CREAT,FILE_MODE,1)) == SEM_FAILED)
 36     {
 37         perror("sem_open() error");
 38         exit(-1);
 39     }
 40         //創建nempty信號量
 41     if((shared.nempty =  sem_open(SEM_NEMPTY,O_CREAT,FILE_MODE,NBUFF)) == SEM_FAILED)
 42     {
 43         perror("sem_open() error");
 44         exit(-1);
 45     }
 46         //創建nstored信號量
 47     if((shared.nstored = sem_open(SEM_NSTORED,O_CREAT,FILE_MODE,0)) == SEM_FAILED)
 48     {
 49         perror("sem_open() error");
 50         exit(-1);
 51     }
 52      pthread_setconcurrency(2);
 53         //生產者線程
 54     pthread_create(&tid_produce,NULL,produce,NULL);
 55         //消費者線程
 56     pthread_create(&tid_consume,NULL,consume,NULL);
 57     pthread_join(tid_produce,NULL);
 58     pthread_join(tid_consume,NULL);
 59     sem_unlink(SEM_MUTEX);
 60     sem_unlink(SEM_NEMPTY);
 61     sem_unlink(SEM_NSTORED);
 62     exit(0);
 63 }
 64 
 65 void *produce(void *arg)
 66 {
 67     int i;
 68     printf("produce is called.\n");
 69     for(i=0;i<nitems;i++)
 70     {
 71      //判斷是否有空槽,有的將其減少1
 72          sem_wait(shared.nempty);
 73          //鎖住槽位,對於多個生產者的時候有必要,單個生產者沒有必要
 74       sem_wait(shared.mutex); 
 75       printf("produced a new item.\n");
 76       shared.buff[i%NBUFF] = i;
 77       sem_post(shared.mutex);  //釋放鎖
 78       sem_post(shared.nstored);  //緩沖區中條目數加1
 79     }
 80     return NULL;
 81 }
 82 
 83 void *consume(void *arg)
 84 {
 85     int   i;
 86     printf("consumer is called.\n");
 87     for(i=0;i<nitems;i++)
 88     {
 89         //判斷緩沖區中是否有條目,有的話將條目數減少1
 90                 sem_wait(shared.nstored); 
 91                  //鎖住緩沖區,對多個消費者有必要,對單個消費者沒必要
 92         sem_wait(shared.mutex);
 93         if(shared.buff[i % NBUFF] != i)
 94             printf("buff[%d] = %d\n",i,shared.buff[i % NBUFF]);
 95         printf("removed a item.\n");
 96         sem_post(shared.mutex);  //釋放鎖
 97         sem_post(shared.nempty); //將緩沖區中的空槽數目加1
 98     }
 99     return NULL;
100 }

程序執行結果如下:

result文件內容如下:生產者和消費者公用緩沖區。

4、Posix基於內存的信號量

  Posix有名信號量創建時候是用一個name參數標識,它通常指代文件系統中的某個文件。而基於內存的信號量是由應用程序分配信號量的內存空間,即分配一個sem_t數據類型的內存空間,然后由系統初始化它們的值。操作函數如下:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);   //初始化內存信號量
int sem_destroy(sem_t *sem);   //摧毀信號量

如果shared=0,那么待初始化的信號量是在同一進程的各個線程間共享的,否則該信號量是在進程間共享的,此時該信號量必須存放在某種類型的共享內存區中,使得用它的進程能夠訪問該共享內存區。value是該信號量的初始值。

現在采用基於內存的信號量實現生產者-消費者問題,單個生產者和單個消費者,程序如下所示:

View Code
 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 
 7 #define NBUFF             10
 8 
 9 int nitems;
10 //緩沖區結構
11 struct
12 {
13     int buff[NBUFF];
14     sem_t mutex,nempty,nstored;
15 }shared;
16 
17 void *produce(void *arg);
18 void *consume(void *arg);
19 
20 int main(int argc,char *argv[])
21 {
22     pthread_t   tid_produce,tid_consume;
23     if(argc != 2)
24     {
25         printf("usage: prodcons <#itmes>");
26         exit(0);
27     }
28     nitems = atoi(argv[1]);
29         //創建基於內存的信號量
30     if(sem_init(&shared.mutex,0,1) == -1)
31     {
32         perror("sem_open() error");
33         exit(-1);
34     }
35     if(sem_init(&shared.nempty,0,NBUFF) == -1)
36     {
37         perror("sem_open() error");
38         exit(-1);
39     }
40     if(sem_init(&shared.nstored,0,0) == -1)
41     {
42         perror("sem_open() error");
43         exit(-1);
44     }
45      pthread_setconcurrency(2);
46     pthread_create(&tid_produce,NULL,produce,NULL);
47     pthread_create(&tid_consume,NULL,consume,NULL);
48     pthread_join(tid_produce,NULL);
49     pthread_join(tid_consume,NULL);
50        //摧毀信號量
51     sem_destroy(&shared.mutex);
52     sem_destroy(&shared.nempty);
53     sem_destroy(&shared.nstored);
54     exit(0);
55 }
56 
57 void *produce(void *arg)
58 {
59     int i;
60     printf("produce is called.\n");
61     for(i=0;i<nitems;i++)
62     {
63       sem_wait(&shared.nempty);
64       sem_wait(&shared.mutex);
65       printf("produced a new item.\n");
66       shared.buff[i%NBUFF] = i;
67       sem_post(&shared.mutex);
68       sem_post(&shared.nstored);
69     }
70     return NULL;
71 }
72 
73 void *consume(void *arg)
74 {
75     int   i;
76     printf("consumer is called.\n");
77     for(i=0;i<nitems;i++)
78     {
79         sem_wait(&shared.nstored);
80         sem_wait(&shared.mutex);
81         if(shared.buff[i % NBUFF] != i)
82             printf("buff[%d] = %d\n",i,shared.buff[i % NBUFF]);
83         printf("removed a item.\n");
84         sem_post(&shared.mutex);
85         sem_post(&shared.nempty);
86     }
87     return NULL;
88 }

程序執行結果與上面一致。

5、多個生產者、單個消費者

  針對這種情況,不僅要考慮生產者與消費者之間的同步,而且還要考慮多個生產者之間的互斥。生產者中同時獲取nempty信號量可以有多個,但是每個時刻只能有一個生產者能獲取mutex信號量。修改緩沖區結構如下:

struct
{
    int         buff[NBUFF];  //緩沖區
    int         nput;      //待存入緩沖區下標
    int         nputval;    // 待存入的值
    sem_t     mutex,nempy,nstored;  //基於內存的信號量
}shared;

添加nput和nputval用於同步多個生產者線程。實現程序如下:

View Code
 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <unistd.h>
 4 #include <semaphore.h>
 5 #include <errno.h>
 6 
 7 #define NBUFF             10
 8 #define MAXNTHREADS            100
 9 int nitems,nproducers;             //條目數和生產者線程數目
10 struct
11 {
12     int buff[NBUFF];
13     int nput;             
14     int nputval;
15     sem_t mutex,nempty,nstored;
16 }shared;
17 
18 void *produce(void *arg);
19 void *consume(void *arg);
20 
21 int main(int argc,char *argv[])
22 {
23     int i,count[MAXNTHREADS];
24     pthread_t   tid_produce[MAXNTHREADS],tid_consume;
25     if(argc != 3)
26     {
27         printf("usage: prodcons <#itmes> <#producers>");
28         exit(0);
29     }
30     nitems = atoi(argv[1]);
31     nproducers = atoi(argv[2]);
32     if(sem_init(&shared.mutex,0,1) == -1)
33     {
34         perror("sem_open() error");
35         exit(-1);
36     }
37     if(sem_init(&shared.nempty,0,NBUFF) == -1)
38     {
39         perror("sem_open() error");
40         exit(-1);
41     }
42     if(sem_init(&shared.nstored,0,0) == -1)
43     {
44         perror("sem_open() error");
45         exit(-1);
46     }
47      pthread_setconcurrency(nproducers+1);
48         //創建多個生產者線程
49     for(i=0;i<nproducers;i++)
50     {
51         count[i] = 0;
52         pthread_create(&tid_produce[i],NULL,produce,&count[i]);
53     }
54     pthread_create(&tid_consume,NULL,consume,NULL);
55     for(i=0;i<nproducers;i++)
56     {
57         pthread_join(tid_produce[i],NULL);
58         printf("count[%d] = %d\n",i,count[i]);
59     }
60     pthread_join(tid_produce,NULL);
61     pthread_join(tid_consume,NULL);
62     sem_destroy(&shared.mutex);
63     sem_destroy(&shared.nempty);
64     sem_destroy(&shared.nstored);
65     exit(0);
66 }
67 
68 void *produce(void *arg)
69 {
70     int i;
71     printf("produce is called.\n");
72     for(;;)
73     {
74       sem_wait(&shared.nempty);
75       sem_wait(&shared.mutex);
76       if(shared.nput >= nitems)  //判斷下標是否超出
77       {
78          sem_post(&shared.nempty);  //恢復empty的值
79          sem_post(&shared.mutex);
80          return NULL;
81       }
82       shared.buff[shared.nput%NBUFF] = shared.nputval;
83       shared.nput++;
84       shared.nputval++;
85       sem_post(&shared.mutex);
86       sem_post(&shared.nstored);
87       *((int *)arg) += 1;
88     }
89     return NULL;
90 }
91 
92 void *consume(void *arg)
93 {

程序測試結果如下:

6、多個生產者、多個消費者

  這種情況需要考慮多個生產者之間的同步和多個消費者之間的同步,修改緩沖區結構如下所示:

struct
{
    int buff[NBUFF];
    int nput;     //生產者產生新條目的下標
    int nputval;
    int nget;      //消費者移除條目的下標
    int ngetval;
    sem_t mutex,nempty,nstored;
}shared;

實現程序如下:

View Code
  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <unistd.h>
  4 #include <semaphore.h>
  5 #include <errno.h>
  6 
  7 #define NBUFF             10
  8 #define MAXNTHREADS           100
  9 int nitems,nproducers,nconsumers;
 10 struct
 11 {
 12     int buff[NBUFF];
 13     int nput;
 14     int nputval;
 15     int nget;
 16     int ngetval;
 17     sem_t mutex,nempty,nstored;
 18 }shared;
 19 
 20 void *produce(void *arg);
 21 void *consume(void *arg);
 22 
 23 int main(int argc,char *argv[])
 24 {
 25     int i,prodcount[MAXNTHREADS],conscount[MAXNTHREADS];
 26     pthread_t   tid_produce[MAXNTHREADS],tid_consume[MAXNTHREADS];
 27     if(argc != 4)
 28     {
 29         printf("usage: prodcons <#itmes> <#producers> <#consumers>");
 30         exit(0);
 31     }
 32     nitems = atoi(argv[1]);
 33     nproducers = atoi(argv[2]);
 34     nconsumers = atoi(argv[3]);
 35     if(sem_init(&shared.mutex,0,1) == -1)
 36     {
 37         perror("sem_open() error");
 38         exit(-1);
 39     }
 40     if(sem_init(&shared.nempty,0,NBUFF) == -1)
 41     {
 42         perror("sem_open() error");
 43         exit(-1);
 44     }
 45     if(sem_init(&shared.nstored,0,0) == -1)
 46     {
 47         perror("sem_open() error");
 48         exit(-1);
 49     }
 50      pthread_setconcurrency(nproducers+nconsumers);
 51     for(i=0;i<nproducers;i++)
 52     {
 53         prodcount[i] = 0;
 54         pthread_create(&tid_produce[i],NULL,produce,&prodcount[i]);
 55     }
 56     for(i=0;i<nconsumers;i++)
 57     {
 58          conscount[i] = 0;
 59         pthread_create(&tid_consume[i],NULL,consume,&conscount[i]);
 60     }
 61     for(i=0;i<nproducers;i++)
 62     {
 63         pthread_join(tid_produce[i],NULL);
 64         printf("producer count[%d] = %d\n",i,prodcount[i]);
 65     }
 66     for(i=0;i<nconsumers;i++)
 67     {
 68         pthread_join(tid_consume,NULL);
 69         printf("consumer count[%d] = %d\n",i,conscount[i]);
 70     }
 71     sem_destroy(&shared.mutex);
 72     sem_destroy(&shared.nempty);
 73     sem_destroy(&shared.nstored);
 74     exit(0);
 75 }
 76 
 77 void *produce(void *arg)
 78 {
 79     int i;
 80     printf("produce is called.\n");
 81     for(;;)
 82     {
 83       sem_wait(&shared.nempty);
 84       sem_wait(&shared.mutex);
 85       if(shared.nput >= nitems)
 86       {
 87          sem_post(&shared.nempty);
 88          sem_post(&shared.mutex);
 89          return NULL;
 90       }
 91       shared.buff[shared.nput%NBUFF] = shared.nputval;
 92       shared.nput++;
 93       shared.nputval++;
 94       sem_post(&shared.mutex);
 95       sem_post(&shared.nstored);
 96       *((int *)arg) += 1;
 97     }
 98     return NULL;
 99 }
100 
101 void *consume(void *arg)
102 {
103     int   i;
104     printf("consumer is called.\n");
105     for(;;)
106     {
107         sem_wait(&shared.nstored);
108         sem_wait(&shared.mutex);
109         if(shared.nget >= nitems)
110         {
111             sem_post(&shared.nstored);
112             sem_post(&shared.mutex);
113             return NULL;
114         }
115         i = shared.nget % NBUFF;
116         if(shared.buff[i] != shared.ngetval)
117             printf("error: buff[%d] = %d\n",i,shared.buff[i % NBUFF]);
118         shared.nget++;
119         shared.ngetval++;
120         sem_post(&shared.mutex);
121         sem_post(&shared.nempty);
122         *((int*)arg) += 1;
123     }
124     return NULL;
125 }

測試結果如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM