最近的多線程 想實現這樣的功能: 多線程Socket獲得的數據 對其進行某種處理FuncA,但是FuncA比較耗時,希望能夠單獨獨立出來,這樣 接受和處理數據分開,但是FuncA處理數據不能放入一個線程,否則很慢,要多線程處理,這個時候 就要使用 多線程 信號量 semaphore了。【我是在windows下 使用pthread win32 的】
在pthread win32源碼中,semaphore.h是這樣定義的:
typedef struct sem_t_ * sem_t;
下面是這個結構的定義
/* * ==================== * ==================== * Semaphores, Mutexes and Condition Variables * ==================== * ==================== */ struct sem_t_ { int value; pthread_mutex_t lock; HANDLE sem; #if defined(NEED_SEM) int leftToUnblock; #endif };
其方法有:
int sem_init (sem_t * sem,int pshared,unsigned int value); int sem_destroy (sem_t * sem); int sem_trywait (sem_t * sem); int sem_wait (sem_t * sem); int sem_timedwait (sem_t * sem,const struct timespec * abstime); int sem_post (sem_t * sem); int sem_post_multiple (sem_t * sem,int count); int sem_getvalue (sem_t * sem,int * sval); //沒有實現的有 int sem_open (const char * name,int oflag,mode_t mode,unsigned int value); int sem_close (sem_t * sem); int sem_unlink (const char * name);
沒有實現的 函數代碼都是類似下面這樣的,直接是返回錯誤:
int sem_open (const char *name, int oflag, mode_t mode, unsigned int value) { printf("本函數沒有實現\n");//本人添加的 以免誤用 errno = ENOSYS; return -1; } /* sem_open */
好,下面說重點了,sem_t 就是 semaphore信號量 怎么用。
1、定義sem_t結構體
sem_t 就是一個結構體指針,定義了,是沒有值的。比如
sem_t sem=NULL;
2、初始化sem_t變量
sem_init(&sem,0,0);
顯然內部會分配內存,返回值為0,如果失敗為非0【其實是-1,只有兩種返回值,納悶為什么不用boolean】,錯誤存儲在errno中。
第二個參數 為0,為1 是可以在多進程中使用,但是pthread win32沒有實現,所以保證永遠為0;
第三個參數是初始值,如果大於0,則會發送多少個sem_post操作的。一般設置0 即可。
3、在多線程中 等待
sem_wait(&sem); sem_timedwait(&sem, const struct timespec *abstime)); sem_trywait(&sem);
sem_wait是阻塞的,sem_timewait是阻塞一定時間后繼續,sem_trywait是非阻塞的,非阻塞怎么用,我暫時沒研究。
總之呢,wait之后,內部value就會-1;所以如果vaue小於0,那么就有線程正在等待,否則就不等待直接進行下面的工作了。
4、【重點】在主線程或其他地方 發送post,讓那些工作線程逐個開始工作起來
sem_post(&sem);//讓value+1
sem_post_multi(&sem,5); //讓vaue+5
5、當然就是結束了,當且僅當 sem有效 且 內部value>=0的時候,也就是沒有sem_wait的時候可以銷毀。
sem_destroy(&sem);
此時 sem_wait 就會失敗了。所以destroy前 先 sem_post value的絕對值 個信號 就可以了。
6、【補充】獲得sem內部的value值,當且僅當==0的時候,才可以destroy,小於0 隊列個數,大於0 排隊數量。
sem_getvalue(&sem,&theOutValue);
以上返回值為0 正常,否則不正常。
使用這個semaphore有一個好處,比如,只有2個線程,你一下子sem_post 20次,沒關系,sem_wait會執行20次的,而且是2個線程輪流。其及時對WINAPI semaphore的一個封裝。
semaphore如果在一個線程中使用,一般叫做 binary semaphore,單值。
ps:signal.h 頭文件 叫做信號 ,與這個semaphore信號量 不太一樣的。
附上 臨界區,互斥量,信號量,事件的區別 ,這些都屬於 線程安全的范疇,不知道還有別的沒有了。

臨界區,互斥量,信號量,事件的區別 =================================== 四種進程或線程同步互斥的控制方法 1、臨界區:通過對多線程的串行化來訪問公共資源或一段代碼,速度快,適合控制數據訪問。 2、互斥量:為協調共同對一個共享資源的單獨訪問而設計的。 3、信號量:為控制一個具有有限數量用戶資源而設計。 4、事 件:用來通知線程有一些事件已發生,從而啟動后繼任務的開始。 臨界區(Critical Section)(同一個進程內,實現互斥) =================================== 保證在某一時刻只有一個線程能訪問數據的簡便辦法。在任意時刻只允許一個線程對共享資源進行訪問。如果有多個線程試圖同時訪問臨界區,那么在有一個線程進入后其他所有試圖訪問此臨界區的線程將被掛起,並一直持續到進入臨界區的線程離開。臨界區在被釋放后,其他線程可以繼續搶占,並以此達到用原子方式操作共享資源的目的。 互斥量(Mutex)(可以跨進程,實現互斥) =================================== 互斥量跟臨界區很相似,只有擁有互斥對象的線程才具有訪問資源的權限,由於互斥對象只有一個,因此就決定了任何情況下此共享資源都不會同時被多個線程所訪問。當前占據資源的線程在任務處理完后應將擁有的互斥對象交出,以便其他線程在獲得后得以訪問資源。互斥量比臨界區復雜。因為使用互斥不僅僅能夠在同一應用程序不同線程中實現資源的安全共享,而且可以在不同應用程序的線程之間實現對資源的安全共享。 互斥量與臨界區的作用非常相似,但互斥量是可以命名的,也就是說它可以跨越進程使用。所以創建互斥量需要的資源更多,所以如果只為了在進程內部是用的話使用臨界區會帶來速度上的優勢並能夠減少資源占用量。 信號量(Semaphores)(主要是實現同步,可以跨進程) =================================== 信號量對象對線程的同步方式與前面幾種方法不同,信號允許多個線程同時使用共享資源,這與操作系統中的PV操作相同。它指出了同時訪問共享資源的線程最大數目。它允許多個線程在同一時刻訪問同一資源,但是需要限制在同一時刻訪問此資源的最大線程數目。一般是將當前可用資源計數設置為最大資源計數,每增加一個線程對共享資源的訪問,當前可用資源計數就會減1,只要當前可用資源計數是大於0的,就可以發出信號量信號。但是當前可用計數減小到0時則說明當前占用資源的線程數已經達到了所允許的最大數目,不能在允許其他線程的進入,此時的信號量信號將無法發出 事件(Event)(實現同步,可以跨進程) =================================== 事件對象也可以通過通知操作的方式來保持線程的同步。並且可以實現不同進程中的線程同步操作。
參考:
http://baike.baidu.com/view/1499210.htm
http://networking.ctocio.com.cn/tips/180/9333180.shtml
最后附上我自己的測試源代碼:

#include <stdio.h> #include <pthread.h> #include <sched.h> #include <semaphore.h> #include <conio.h>//for getch() #include <ctype.h> #include<Windows.h> #pragma comment(lib, "pthreadVC2.lib") //必須加上這句 sem_t sem; void*SemThreadWorker(void* Param) { printf("#1 [child %s]I am Waiting\n",Param); //sem_wait(&sem); //sem_timedwait(&sem,60); //sem_trywait(&sem); while(0==sem_wait(&sem)) { // Wait semaphore printf(" [child %s]I am Working in 1 second!!\n",Param); Sleep(1000); } printf("#2 [child %s]I am Exit\n",Param); return (void *)10; } int main() { int ret=0; ret=sem_init(&sem,0/*int pshared process_shared*/,0/*uint init_value*/);/*SEM_VALUE_MAX == int_max*/ if(ret!=0){ printf("Semaphore initialize failed"); exit(EXIT_FAILURE); } if(0==sem_getvalue(&sem,&ret)){printf("\n sem_getvalue value=%d \n",ret);}else{perror("@Error sem_getValue");} //CreateThreads pthread_t tids[5]; ret = pthread_create(&tids[0], NULL, SemThreadWorker, "1"); if (ret) {printf("Thread creation failed!!\n");exit(EXIT_FAILURE);} ret = pthread_create(&tids[1], NULL, SemThreadWorker, "2"); if (ret) {printf("Thread creation failed!!\n");exit(EXIT_FAILURE);} ret = pthread_create(&tids[2], NULL, SemThreadWorker, "3"); if (ret) {printf("Thread creation failed!!\n");exit(EXIT_FAILURE);} ret = pthread_create(&tids[3], NULL, SemThreadWorker, "4"); if (ret) {printf("Thread creation failed!!\n");exit(EXIT_FAILURE);} ret = pthread_create(&tids[4], NULL, SemThreadWorker, "5"); if (ret) {printf("Thread creation failed!!\n");exit(EXIT_FAILURE);} Sleep(100); printf("\n Post semaphore發送信號量過去: sem_post(&sem);\n"); //for(int i=0;i<10;i++){ // sem_post(&sem); // sem_post(&sem); // Sleep(100); //} if(sem_post(&sem)!=0){ printf("Error sem_post "); } if(0==sem_getvalue(&sem,&ret)){printf("\n sem_getvalue value=%d \n",ret);}else{perror("@Error sem_getValue");} if(sem_post(&sem)!=0){ printf("Error sem_post "); } if(0==sem_getvalue(&sem,&ret)){printf("\n sem_getvalue value=%d \n",ret);}else{perror("@Error sem_getValue");} if(sem_post_multiple(&sem,5)!=0){ printf("Error sem_post_multiple "); } if(0==sem_getvalue(&sem,&ret)){printf("\n sem_getvalue value=%d \n",ret);}else{perror("@Error sem_getValue");} Sleep(2000); printf("\n sem_getvalue(&sem,&ret);\n"); //sem_open //sem_close(&sem); //sem_unlink if(0==sem_getvalue(&sem,&ret)){printf("\n sem_getvalue value=%d \n",ret);}else{perror("@Error sem_getValue");} //為了讓sem_wait全部停止阻塞,才可以destroy sem_post_multiple(&sem,-ret); printf("\n sem_destroy(&sem);\n\n"); //第一次 sem_destroy return 0 if(sem_destroy(&sem)!=0){ perror("@Error sem_destroy "); } //第二次 sem_destroy 多次 return -1 if(sem_destroy(&sem)!=0){ perror("@Error sem_destroy "); } if(sem_post(&sem)!=0){ printf("@Error sem_post \n"); } printf(" Wait for thread pthread_join synchronization...\n"); void *threadResult=NULL; ret = pthread_join(tids[0], &threadResult); if (ret){printf("Thread join failed!!\n");exit(EXIT_FAILURE);} ret = pthread_join(tids[1], &threadResult); if (ret){printf("Thread join failed!!\n");exit(EXIT_FAILURE);} ret = pthread_join(tids[2], &threadResult); if (ret){printf("Thread join failed!!\n");exit(EXIT_FAILURE);} ret = pthread_join(tids[3], &threadResult); if (ret){printf("Thread join failed!!\n");exit(EXIT_FAILURE);} ret = pthread_join(tids[4], &threadResult); if (ret){printf("Thread join failed!!\n");exit(EXIT_FAILURE);} printf("press any key to continue...\n"); //getchar(); _getch(); return 1; }
五個線程工作,用semaphore來管理隊列