多線程同步的四種方式
對於多線程程序來說,同步是指在一定的時間內只允許某一個線程來訪問某個資源。而在此時間內,不允許其他的線程訪問該資源。可以通過互斥鎖(Mutex)、條件變量(condition variable)、讀寫鎖(reader-writer lock)、信號量(semaphore)來同步資源。
-
互斥鎖(Mutex)
互斥量是最簡單的同步機制,即互斥鎖。多個進程(線程)均可以訪問到一個互斥量,通過對互斥量加鎖,從而來保護一個臨界區,防止其它進程(線程)同時進入臨界區,保護臨界資源互斥訪問。
互斥鎖需要滿足三個條件:
- 互斥 不同線程的臨界區沒有重疊
- 無死鎖 如果一個線程正在嘗試獲得一個鎖,那么總會成功地獲得這個鎖。若線程A調用lock()但是無法獲得鎖,則一定存在其他線程正在無窮次地執行臨界區。
- 無飢餓 每一個試圖獲得鎖的線程最終都能成功。
#include <stdio.h> #include <stdlib.h> #include <pthread.h> void *function(void *arg); pthread_mutex_t mutex; int counter = 0; int main(int argc, char *argv[]) { int rc1,rc2; char *str1="hello"; char *str2="world"; pthread_t thread1,thread2; pthread_mutex_init(&mutex,NULL); if((rc1 = pthread_create(&thread1,NULL,function,str1))) { fprintf(stdout,"thread 1 create failed: %d\n",rc1); } if(rc2=pthread_create(&thread2,NULL,function,str2)) { fprintf(stdout,"thread 2 create failed: %d\n",rc2); } pthread_join(thread1,NULL); pthread_join(thread2,NULL); return 0; } void *function(void *arg) { char *m; m = (char *)arg; pthread_mutex_lock(&mutex); while(*m != '\0') { printf("%c",*m); fflush(stdout); m++; sleep(1); } printf("\n"); pthread_mutex_unlock(&mutex); }
-
條件變量(condition variable)
生產者消費者問題:每次生產一個商品,發一個信號,告訴消費者“我生產商品了,快來消費”,消費者拿到生產者的條件變量后每次消費兩個商品,然后發出信號“我消費了商品,你可以生產了”--_--(發的這個信號是一個條件變量,通過發送這個信號可以喚醒阻塞的線程,收到信號后,不滿足需求也會繼續阻塞)
為了防止競爭,條件變量的使用總是和一個互斥鎖結合在一起;條件變量是線程的另一種同步機制,它和互斥量是一起使用的。互斥量的目的就是為了加鎖,而條件變量的結合,使得線程能夠以等待的狀態來迎接特定的條件發生,而不需要頻繁查詢鎖。
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <signal.h> // 定義條件變量cond_product pthread_cond_t cond_product; // 定義條件變量cond_consume pthread_cond_t cond_consume; // 定義線程互斥鎖lock pthread_mutex_t lock; //初始化函數 void init_work(void) { // 條件變量的初始化 pthread_cond_init(&cond_product, NULL); pthread_cond_init(&cond_consume, NULL); // 線程鎖的初始化 pthread_mutex_init(&lock, NULL); } //生產線程,每次生產一個產品 void* handle_product(void *arg){ int i; int* product = NULL; product = (int *) arg; for(i=1; i<50; i++) { pthread_mutex_lock(&lock); //生產進程上鎖,是消費進程無法改變商品個數 if (*product >= 4) { // 倉庫已滿,應該阻塞式等待 printf("\033[43m倉庫已滿, 暫停生產...\033[0m\n"); pthread_cond_wait(&cond_product, &lock); } printf("生產中....\n"); sleep(2); printf("生產成功....\n"); *product+=1; pthread_cond_signal(&cond_consume);//發出信號,條件已經滿足 printf("\033[32m生產一個產品,生產%d次,倉庫中剩余%d個\033[0m\n",i,*product); printf ("發信號--->生產成功\n"); pthread_mutex_unlock(&lock);//生產進程解鎖 usleep(50000); } return NULL; } //消費線程,每次消費兩個產品,消費6次間歇 void* handle_consume(void *arg){ int i; int* product = NULL; product = (int *)arg; for (i=1; i<50; i++) { pthread_mutex_lock(&lock); if (*product <= 1) //消費線程每次消費2個,故總產品數量小於1即阻塞 { /* 阻塞式等待 */ printf("\033[43m缺貨中,請等待...\033[0m\n"); pthread_cond_wait(&cond_consume, &lock); } /* 消費產品,每次從倉庫取出兩個產品 */ printf("消費中...\n"); sleep(2); *product-=2; printf("消費完成...\n"); printf("\033[31m消費兩個產品,共消費%d次,倉庫剩余%d個\033[0m\n",i,*product); pthread_cond_signal(&cond_product); printf ("發信號---> 已消費\n"); pthread_mutex_unlock(&lock); usleep(30000); if (i%6 == 0){ //消費間歇 sleep(9); } } return NULL; } int main() { pthread_t th_product, th_consume; //定義線程號 int ret; int intrinsic = 3; //初始化所有變量 init_work(); //創建進程並傳遞相關參數 ret = pthread_create(&th_product, 0, handle_product, &intrinsic); if (ret != 0) { perror("創建生產線程失敗\n"); exit(1); } ret = pthread_create(&th_consume, 0, handle_consume, &intrinsic); if (ret != 0) { perror("創建消費線程失敗\n"); exit(1); } pthread_join(th_product, 0);//回收生產線程 pthread_join(th_consume, 0);//回收消費線程 return 0; }
-
讀寫鎖(reader-writer lock)
前面介紹的互斥量加鎖要么是鎖狀態,要么就是不加鎖狀態。而且只有一次只有一個線程可以對其加鎖。這樣的目的是為了防止變量被不同的線程修改。但是如果有線程只是想讀而不會去寫的話,這有不會導致變量被修改。但是如果是互斥量加鎖,則讀寫都沒有辦法。這種場景不能使用互斥量,必須使用讀寫鎖。
讀寫鎖可以有3種狀態:
-
讀模式下加鎖狀態
-
寫模式下加鎖狀態
-
不加鎖狀態
一次只有一個線程可以占有寫模式的讀寫鎖,但是多個線程可以同時占有讀模式的讀寫鎖。當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞。當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權。但是任何希望以寫模式對此鎖進行加鎖的線程都會阻塞。直到所有的線程釋放它們的讀鎖為止。
讀寫鎖非常適合於對數據結構讀的次數大於寫的情況。當讀寫鎖在寫模式下時,它所保護的數據結構就可以被安全地修改,因為一次只有一個線程可以在寫模式下擁有這個鎖。
讀寫鎖也叫做共享互斥鎖。當讀寫鎖是讀模式鎖住的,就可以說是以共享模式鎖住的。當它是寫模式鎖住的時候,就可以說成是以互斥模式鎖住的。
#include <pthread.h>
Int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
Int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
讀寫鎖通過調用pthread_rwlock_init進行初始化。在釋放讀寫鎖占有的內存之前,需要調用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init為讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在調用pthread_rwlock_destroy之前就釋放了讀寫鎖占用的內存空間。那么分配給這個鎖的資源就會丟失。
要在讀模式下鎖定讀寫鎖,需要調用pthread_rwlock_rdlock,要在寫模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock。不管以何種方式鎖住讀寫鎖。都可以調用pthread_rwlock_unlock進行解鎖。
Int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
Int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
Int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
#include <pthread.h> #include <stdio.h> pthread_rwlock_t rwlock; int data=1; void readerA(){ while(1){ pthread_rwlock_rdlock(&rwlock); printf("A讀者讀出:%d\n",data); pthread_rwlock_unlock(&rwlock); Sleep(1000); } } void writerB(){ while(1){ pthread_rwlock_wrlock(&rwlock); data++; printf("B作者寫入:%d\n",data); pthread_rwlock_unlock(&rwlock); Sleep(1000); } } int main() { pthread_t t1; pthread_t t2; pthread_rwlock_init(&rwlock,NULL); pthread_create(&t1,NULL,readerA,NULL); pthread_create(&t2,NULL,writerB,NULL); pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_rwlock_destroy(&rwlock); return 0; }
-
-
信號量(semaphore)
在生產者消費者模型中,對任務數量的記錄就可以使用信號量來做。可以理解為帶計數的條件變量。當信號量的值小於0時,工作進程或者線程就會阻塞,等待物品到來。當生產者生產一個物品,會將信號量值加1操作。 這是會喚醒在信號量上阻塞的進程或者線程,它們去爭搶物品。
這里用一個讀寫文件作為例子:
首先需要用sem_init(); 初始化sem_t型變量,並設置初始信號量。比如設置為1.
每次調用sem_wait(sem_t *); 信號量減一,當調用sem_post(sem_t *); 信號量加一。
當信號量為0時,sem_wait(); 函數阻塞,等待信號量 >0 時,才進行。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
typedef struct{
sem_t *lock;
int num;
}STRUCT;
void test(void * obj)
{
STRUCT *point = (STRUCT *)obj;
sem_t *semlock = point->lock;
sem_wait(semlock);
FILE *f = fopen("test.txt","a");
if(f==NULL)
printf("fopen is wrong\n");
printf("sem_wait %d\n",point->num);
int j=0;
for(j=0;j<30;j++)
fprintf(f,"%c111111111111\n",'a'+point->num);
fclose(f);
sem_post(semlock);
return;
}
int main()
{
pthread_t pid[20]; // pthread_t pid;
int ret,i=0;
STRUCT obj[13];
sem_t semlock;
if(sem_init(&semlock,0,1)!=0) // 此處初始信號量設為1. 第二個參數為0表示不應用於其他進程。
printf("sem_init is wrong\n");
for(i=0;i<10;i++)
{
obj[i].num = i;
obj[i].lock = &semlock;
ret = pthread_create(&pid[i],NULL,(void *)test,&obj[i]);
if(ret!=0)
{
printf("create thread wrong %d!!\n",i);
return 0;
}
}
for(i=0;i<10;i++)
pthread_join(pid[i],NULL); // 等待其他線程結束,如果沒有這里,主線程先結束,會釋放pid[]及obj[],則出現BUG。
return 0;
}