Linux——多線程下解決生產消費者模型


我們學習了操作系統,想必對生產消費者問題都不陌生。作為同步互斥問題的一個經典案例,生產消費者模型其實是解決實際問題的基礎模型,解決很多的實際問題都會依賴於它。而此模型要解決最大的問題便是同步與互斥。而通常呢,在多進程的環境下我們一般是是用信號量來解決(可以戳這里看看);在多線程的情況,則會用到兩個東西:  互斥量和條件變量通常用它們兩個來實現線程間通信,以此來解決多線程下的同步和互斥問題。不過在具體實現生產消費模型前,為了更好理解當中的處理原理,還是先來回顧一下一些線程間通信的相關知識。

 

 互斥問題


大部分情況,線程使用的數據都是局部變量,變量的地址空間在線程棧空間內,這種情況,變量歸屬單個線程,其他線程無法獲得這種變量。但有時候,很多變量都需要在線程間共享,這樣的變量稱為共享變量,可以通過數據的共享,完成線程之間的交互。多個線程並發的操作共享變量,會帶來一些問題。

 // 操作共享變量會有問題的售票系統代碼
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include <pthread.h>
 int ticket = 100;
 void *route( void *arg) 
 {
     char id = *(char*)arg;
     while (  1 )  { 
         if (  ticket > 0 )  { 
             usleep( 1000) ;
             printf( " thread %c sells ticket:%d\n" , id, ticket) ;
             ticket--;
         }  else { 
             break;
         } 
     }
 }  
 int main(void) 
 {  
     pthread_t t1, t2, t3, t4;
     char a1=1,a2=2,a3=3,a4=4;
     pthread_create( &t1, NULL, route, &a1);
     pthread_create( &t2, NULL, route, &a2);
     pthread_create( &t3, NULL, route, &a3);
     pthread_create( &t4, NULL, route, &a4);
     pthread_join( t1, NULL) ;
     pthread_join( t2, NULL) ;
     pthread_join( t3, NULL) ;
     pthread_join( t4, NULL) ;
 }
 //一次執行結果:
 thread 4 sells ticket:100
 ...
 thread 4 sells ticket:1
 thread 2 sells ticket:0
 thread 1 sells ticket:-1
 thread 3 sells ticket:-2

為什么無法獲得正確結果?

·if 語句判斷條件為真以后,代碼可以並發的切換到其他線程;usleep這個模擬漫長業務的過程,在這個漫長的業務過程中,可能有很多個線程會進入該代碼段
·ticket--操作本身就不是一個原子操作

如果取出 “ticket--”部分的匯編代碼

objdump -d a.out > test.objdump 152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax 154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34

ticket--操作並不是原⼦子操作,而是對應三條匯編指令:
  load:將共享變量ticket從內存加載到寄存器中
  update: 更新寄存器里面的值,執行-1操作
  store:將新值,從寄存器寫回共享變量ticket的內存地址


要解決以上問題,需要做到三點:


1.代碼必須要有互斥行為:當代碼進入臨界區執行時,不允許其他線程進入該臨界區。

2.如果多個線程同時要求執行臨界區的代碼,並且臨界區沒有線程在執行,那么只能允許一個線程進入該臨界區。

3.如果線程不在臨界區中執行,那么該線程不能阻止其他線程進入臨界區。

Linux下用互斥量就做到了以上3點,它本質上其實就是一把鎖。

 

互斥量


互斥量使用一般是以下幾個步驟:

1.定義互斥量(mutex): pthread_mutex_t  mutex;

2.初始化:

  ①靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER 

  ②動態分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); 參數: mutex:要初始化的互斥量 attr:如果不設置線程屬性的話填NULL

 

3.上鎖:pthread_mutex_lock(&mutex)   如果是1,值0,返回;  如果是0,便阻塞

調⽤用pthread_ lock 時,可能會遇到以下情況:

互斥量處於未鎖狀態,該函數會將互斥量鎖定,同時返回成功
發起函數調用時,其他線程已經鎖定互斥量,或者存在其他線程同時申請互斥量,
但沒有競爭到互斥量,那么pthread_ lock調⽤用會陷入阻塞,等待互斥量解鎖。

4.解鎖: pthread_mutex_unlock(&mutex)   置為1,返回

5.銷毀:pthread_mutex_destroy(&mutex)    

銷毀互斥量需要注意:使⽤用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要銷毀
不要銷毀⼀一個已經加鎖的互斥量已經銷毀的互斥量,要確保后⾯面不會有線程再嘗試加鎖

 

 

自旋鎖

互斥鎖是當阻塞在pthread_mutex_lock時,放棄CPU,好讓別人使用CPU。自旋鎖阻塞在spin_lock時不會阻塞CPU,不斷對CPU詢問。(實時系統中應用比較多,要求對鎖進行較快響應)它使用形式與互斥量類似,不再贅述。

1.定義自旋鎖: pthread_spinlock_t spin 2.初始化自旋鎖:pthread_spin_intt(pthread_spinlock_t *s, int s) 3.上鎖:int pthread_spin_lock(pthread_spinlock_t *lock) 4.解鎖:int pthread_spin_lock(pthread_spinlock_t *lock) 5.銷毀:int pthread_spin_lock(pthread_spinlock_t *lock)

 

讀寫鎖

在編寫多線程的時候,有⼀一種情況是十分常見的。那就是,有些公共數據修改的機會比較少。相比較改寫,它們讀的機會反而高的多。通常而言,在讀的過程中,往往伴隨着查找的操作,中間耗時很長。給這種代碼段加鎖,會極⼤大地降低我們程序的效率。那么有沒有一種方法,可以專門處理這種多讀少寫的情況呢? 有,那就是讀寫鎖。讀寫鎖本質上是一種自旋鎖[長時間等人和短時間等人的例子]

·注意:讀共享,寫排他,寫優先級高

它處理方式和前面互斥量類似,就不在贅述。

1.定義:pthread_rwlock_t lock

2.初始化  pthread_rwlock_init(&lock, NULL) 3.上鎖:pthread_rwlock_rdlock(&lock)     pthread_rwlock_wrlock(&lock) 4.解鎖:pthread_rwlock_unlock(&lock) 5.銷毀:pthread_rwlock_destroy(&lock)

 

 

 

條件變量


生產者消費問題要解決另一個問題就同步的問題。當一個線程互斥地訪問某個變量時,它可能發現在其它線程改變狀態之前,它什么也做不了。例如一個線程訪問隊列時,發現隊列為空,它只能等待,直到其它線程將一個節點添加到隊列中。這種情況下就需要用到條件變量了。

使用步驟如下:

1.定義條件變量:

pthread_cond_t cond;
pthread_mutex_t mutex

2.初始化條件變量:

pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *restrict attr)   參數: cond:要初始化的條件變量 attr:填NULL(用於設置線程屬性)      

3.等待條件:

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);  參數: cond:要在這個條件變量上等待 mutex:互斥量,后⾯面詳細解釋

注意這里wait函數需要互斥量(后面解釋)。如果在鎖環境下,此處互斥量形同虛設。在鎖環境下,會將mutex解鎖; wait返回時,將mutex鎖制成原來狀態

4.使條件滿足 :

int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);

5.銷毀條件變量:

int pthread_cond_destroy(pthread_cond_t *cond)

 

為什么pthread_ cond_ wait 需要互斥量?

①條件等待是線程間同步的一種手段,如果只有一個線程,條件不滿足,一直等下去都不會滿足,所以必須要有一個線程通過某些操作,改變共享變量,使原先不滿足的條件變得滿足,並且友好的通知等待在條件變量上的線程。
②條件不會無緣無故的突然變得滿足了,必然會牽扯到共享數據的變化。所以一定要用互斥鎖來保護。沒有互斥鎖就無法安全的獲取和修改共享數據。

             

 

按照上面的說法,我們設計出如下的代碼:先上鎖,發現條件不滿足,解鎖,然后等待在條件變量上不就行了?但是這樣也會有問題

// 錯誤的設計
pthread_mutex_lock(&mutex); while (condition_is_false) { pthread_mutex_unlock(&mutex); //解鎖之后,等待之前,條件可能已經滿足,信號已經發出,但是該信號可能被錯過
    pthread_cond_wait(&cond); pthread_mutex_lock(&mutex); } pthread_mutex_unlock(&mutex);

·由於解鎖和等待不是原子操作。調用解鎖之后,pthread_cond_wait之前,如果已經有其他線程獲取到互斥量,摒棄條件滿足,發送了信號,那么pthread_ cond_wait將錯過這個信號,可能會導致線程永遠阻塞在這個pthread_cond_wait。所以解鎖和等待必須是一個原子操作。
·int pthread_cond_wait; 進入該函數后,會去看條件量是否為0?等於0,就把互斥量變成1,直到cond_wait返回時,把條件量改成1,同時將互斥量恢復成原樣。

 

所以正確是條件變量的使用規范是這樣的:(這里以生產消費問題為例 簡單的實現一下同步,使得消費者需要在有產品的情況下才可進行消費。)

                  

條件變量使用范例即:

·等待條件:

pthread_mutex_lock(&mutex); while (條件為假) pthread_cond_wait(&cond, &mutex); //pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mutex, //然后阻塞在等待隊列里休眠,直到再次被喚醒 //(大多數情況下是等待的條件成立而被喚醒,喚醒后, //該進程會進行pthread_mutex_lock(&mutex)先鎖定,然后再讀取資源
修改條件 pthread_mutex_unlock(&mutex);   

·給條件發送信號代碼

pthread_mutex_lock(&mutex); //設置條件為真
pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);

 

 

多線程下的生產消費者問題


好,這下終於把准備工作做好了,結合線程的基本操作,多線程下的生產消費者我們也就不難實現出來了。如下:

 /*************************************************************************
   > File Name: pc.c
   > Author: tp
   > Mail: 
   > Created Time: Sun 27 May 2018 06:28:33 PM CST
  ************************************************************************/
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <pthread.h>
 
 #define PRO_NUM 3   //生產線程數量 
 #define CON_NUM 0   //消費線程數量
 pthread_cond_t cond;
 pthread_cond_t n_empty;
 pthread_mutex_t mutex;
 
 int g_num = 0;      //產品數量
 int empty_num = 3;  //生產的空位數量
 
 //productor
 void* pro_route(void* arg)
 {
     int id =*(int*)arg;
     free(arg);
 
     while(1)
     {
         pthread_mutex_lock(&mutex);
         while(empty_num <= 0)
         {
             printf("生產線程%d等待。\n", id);
             pthread_cond_wait(&n_empty, &mutex);
             printf("有空位,數量為%d\n", empty_num);
         }
         printf("生產線程%d生產\n", id);
         ++g_num;
         --empty_num;
         printf("生產品%d完成\n", g_num);
         sleep(rand()%3);
         pthread_cond_signal(&cond);
         pthread_mutex_unlock(&mutex);
         sleep(rand()%3);
     }
 }
 //consumer
 void *con_route(void* arg)
 {
     int id =*(int*)arg;
     free( arg);
 
     while(1)
     {
         pthread_mutex_lock(&mutex);
         while(g_num <= 0)
         {
             printf("消費線程%d等待。。\n", id);
             pthread_cond_wait(&cond, &mutex);
             printf("第%d產品到了!!\n", g_num);
         }
         printf("消費線程%d消費 產品%d\n", id , g_num);
         --g_num;
         ++empty_num;
         sleep(rand()%2);
         printf("消費線程%d消費完成\n", id);
         pthread_cond_signal(&n_empty);
         pthread_mutex_unlock(&mutex);
         sleep(rand()%3);
     }
 }
 int main( )
 {
     srand(getpid());
 
     pthread_t tids[PRO_NUM + CON_NUM];
     //互斥量,條件變量初始化
     pthread_mutex_init(&mutex, NULL);
     pthread_cond_init(&cond, NULL);   //條件變量1
     pthread_cond_init(&n_empty, NULL);//條件變量2
 
     //創建生產者線程
     for(int i =0; i< PRO_NUM; ++i)
     {
         int* p = (int*)malloc(sizeof(int)); //傳入參數相當作線程編號
         *p = i;
         pthread_create(&tids[i], NULL, pro_route, p);
     }
     //創建消費者線程
     for(int i =0; i< CON_NUM; ++i)
     {
         int* p = (int*)malloc(sizeof(int)); //消費線程編號
         *p = i;
         pthread_create(&tids[i], NULL, con_route, p);
     }
     for(int i =0; i< PRO_NUM + CON_NUM; ++i) //回收線程
     {
         pthread_join(tids[i], NULL);
     }
     //互斥量,條件變量銷毀
     pthread_mutex_destroy(&mutex);
     pthread_cond_destroy(&cond);
     pthread_cond_destroy(&n_empty);
 
     return 0;
 }

 結果:

 當我們只有生產者生產時,此時生產的空位生產滿了之后便會阻塞,如下:

當去添加兩個消費線程時(將CON_NUM改為2),這樣生產、消費得以進行。如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM