我們學習了操作系統,想必對生產消費者問題都不陌生。作為同步互斥問題的一個經典案例,生產消費者模型其實是解決實際問題的基礎模型,解決很多的實際問題都會依賴於它。而此模型要解決最大的問題便是同步與互斥。而通常呢,在多進程的環境下我們一般是是用信號量來解決(可以戳這里看看);在多線程的情況,則會用到兩個東西: 互斥量和條件變量。通常用它們兩個來實現線程間通信,以此來解決多線程下的同步和互斥問題。不過在具體實現生產消費模型前,為了更好理解當中的處理原理,還是先來回顧一下一些線程間通信的相關知識。
互斥問題
大部分情況,線程使用的數據都是局部變量,變量的地址空間在線程棧空間內,這種情況,變量歸屬單個線程,其他線程無法獲得這種變量。但有時候,很多變量都需要在線程間共享,這樣的變量稱為共享變量,可以通過數據的共享,完成線程之間的交互。多個線程並發的操作共享變量,會帶來一些問題。
// 操作共享變量會有問題的售票系統代碼 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <pthread.h> int ticket = 100; void *route( void *arg) { char id = *(char*)arg; while ( 1 ) { if ( ticket > 0 ) { usleep( 1000) ; printf( " thread %c sells ticket:%d\n" , id, ticket) ; ticket--; } else { break; } } } int main(void) { pthread_t t1, t2, t3, t4; char a1=1,a2=2,a3=3,a4=4; pthread_create( &t1, NULL, route, &a1); pthread_create( &t2, NULL, route, &a2); pthread_create( &t3, NULL, route, &a3); pthread_create( &t4, NULL, route, &a4); pthread_join( t1, NULL) ; pthread_join( t2, NULL) ; pthread_join( t3, NULL) ; pthread_join( t4, NULL) ; }
//一次執行結果: thread 4 sells ticket:100 ... thread 4 sells ticket:1 thread 2 sells ticket:0 thread 1 sells ticket:-1 thread 3 sells ticket:-2
為什么無法獲得正確結果?
·if 語句判斷條件為真以后,代碼可以並發的切換到其他線程;usleep這個模擬漫長業務的過程,在這個漫長的業務過程中,可能有很多個線程會進入該代碼段
·ticket--操作本身就不是一個原子操作
如果取出 “ticket--”部分的匯編代碼
objdump -d a.out > test.objdump 152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax 154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34
ticket--操作並不是原⼦子操作,而是對應三條匯編指令:
load:將共享變量ticket從內存加載到寄存器中
update: 更新寄存器里面的值,執行-1操作
store:將新值,從寄存器寫回共享變量ticket的內存地址
要解決以上問題,需要做到三點:
1.代碼必須要有互斥行為:當代碼進入臨界區執行時,不允許其他線程進入該臨界區。
2.如果多個線程同時要求執行臨界區的代碼,並且臨界區沒有線程在執行,那么只能允許一個線程進入該臨界區。
3.如果線程不在臨界區中執行,那么該線程不能阻止其他線程進入臨界區。
Linux下用互斥量就做到了以上3點,它本質上其實就是一把鎖。
互斥量
互斥量使用一般是以下幾個步驟:
1.定義互斥量(mutex): pthread_mutex_t mutex;
2.初始化:
①靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
②動態分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); 參數: mutex:要初始化的互斥量 attr:如果不設置線程屬性的話填NULL
3.上鎖:pthread_mutex_lock(&mutex) 如果是1,值0,返回; 如果是0,便阻塞
調⽤用pthread_ lock 時,可能會遇到以下情況:
互斥量處於未鎖狀態,該函數會將互斥量鎖定,同時返回成功
發起函數調用時,其他線程已經鎖定互斥量,或者存在其他線程同時申請互斥量,
但沒有競爭到互斥量,那么pthread_ lock調⽤用會陷入阻塞,等待互斥量解鎖。
4.解鎖: pthread_mutex_unlock(&mutex) 置為1,返回
5.銷毀:pthread_mutex_destroy(&mutex)
銷毀互斥量需要注意:使⽤用PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要銷毀
不要銷毀⼀一個已經加鎖的互斥量已經銷毀的互斥量,要確保后⾯面不會有線程再嘗試加鎖
自旋鎖
互斥鎖是當阻塞在pthread_mutex_lock時,放棄CPU,好讓別人使用CPU。自旋鎖阻塞在spin_lock時不會阻塞CPU,不斷對CPU詢問。(實時系統中應用比較多,要求對鎖進行較快響應)它使用形式與互斥量類似,不再贅述。
1.定義自旋鎖: pthread_spinlock_t spin 2.初始化自旋鎖:pthread_spin_intt(pthread_spinlock_t *s, int s) 3.上鎖:int pthread_spin_lock(pthread_spinlock_t *lock) 4.解鎖:int pthread_spin_lock(pthread_spinlock_t *lock) 5.銷毀:int pthread_spin_lock(pthread_spinlock_t *lock)
讀寫鎖
在編寫多線程的時候,有⼀一種情況是十分常見的。那就是,有些公共數據修改的機會比較少。相比較改寫,它們讀的機會反而高的多。通常而言,在讀的過程中,往往伴隨着查找的操作,中間耗時很長。給這種代碼段加鎖,會極⼤大地降低我們程序的效率。那么有沒有一種方法,可以專門處理這種多讀少寫的情況呢? 有,那就是讀寫鎖。讀寫鎖本質上是一種自旋鎖[長時間等人和短時間等人的例子]
·注意:讀共享,寫排他,寫優先級高
它處理方式和前面互斥量類似,就不在贅述。
1.定義:pthread_rwlock_t lock
2.初始化 pthread_rwlock_init(&lock, NULL) 3.上鎖:pthread_rwlock_rdlock(&lock) pthread_rwlock_wrlock(&lock) 4.解鎖:pthread_rwlock_unlock(&lock) 5.銷毀:pthread_rwlock_destroy(&lock)
條件變量
生產者消費問題要解決另一個問題就同步的問題。當一個線程互斥地訪問某個變量時,它可能發現在其它線程改變狀態之前,它什么也做不了。例如一個線程訪問隊列時,發現隊列為空,它只能等待,直到其它線程將一個節點添加到隊列中。這種情況下就需要用到條件變量了。
使用步驟如下:
1.定義條件變量:
pthread_cond_t cond;
pthread_mutex_t mutex
2.初始化條件變量:
pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *restrict attr) 參數: cond:要初始化的條件變量 attr:填NULL(用於設置線程屬性)
3.等待條件:
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 參數: cond:要在這個條件變量上等待 mutex:互斥量,后⾯面詳細解釋
注意這里wait函數需要互斥量(后面解釋)。如果在鎖環境下,此處互斥量形同虛設。在鎖環境下,會將mutex解鎖; wait返回時,將mutex鎖制成原來狀態
4.使條件滿足 :
int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);
5.銷毀條件變量:
int pthread_cond_destroy(pthread_cond_t *cond)
為什么pthread_ cond_ wait 需要互斥量?
①條件等待是線程間同步的一種手段,如果只有一個線程,條件不滿足,一直等下去都不會滿足,所以必須要有一個線程通過某些操作,改變共享變量,使原先不滿足的條件變得滿足,並且友好的通知等待在條件變量上的線程。
②條件不會無緣無故的突然變得滿足了,必然會牽扯到共享數據的變化。所以一定要用互斥鎖來保護。沒有互斥鎖就無法安全的獲取和修改共享數據。
按照上面的說法,我們設計出如下的代碼:先上鎖,發現條件不滿足,解鎖,然后等待在條件變量上不就行了?但是這樣也會有問題
// 錯誤的設計
pthread_mutex_lock(&mutex); while (condition_is_false) { pthread_mutex_unlock(&mutex); //解鎖之后,等待之前,條件可能已經滿足,信號已經發出,但是該信號可能被錯過
pthread_cond_wait(&cond); pthread_mutex_lock(&mutex); } pthread_mutex_unlock(&mutex);
·由於解鎖和等待不是原子操作。調用解鎖之后,pthread_cond_wait之前,如果已經有其他線程獲取到互斥量,摒棄條件滿足,發送了信號,那么pthread_ cond_wait將錯過這個信號,可能會導致線程永遠阻塞在這個pthread_cond_wait。所以解鎖和等待必須是一個原子操作。
·int pthread_cond_wait; 進入該函數后,會去看條件量是否為0?等於0,就把互斥量變成1,直到cond_wait返回時,把條件量改成1,同時將互斥量恢復成原樣。
所以正確是條件變量的使用規范是這樣的:(這里以生產消費問題為例 簡單的實現一下同步,使得消費者需要在有產品的情況下才可進行消費。)
條件變量使用范例即:
·等待條件:
pthread_mutex_lock(&mutex); while (條件為假) pthread_cond_wait(&cond, &mutex); //pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mutex, //然后阻塞在等待隊列里休眠,直到再次被喚醒 //(大多數情況下是等待的條件成立而被喚醒,喚醒后, //該進程會進行pthread_mutex_lock(&mutex)先鎖定,然后再讀取資源
修改條件 pthread_mutex_unlock(&mutex);
·給條件發送信號代碼
pthread_mutex_lock(&mutex); //設置條件為真
pthread_cond_signal(cond); pthread_mutex_unlock(&mutex);
多線程下的生產消費者問題
好,這下終於把准備工作做好了,結合線程的基本操作,多線程下的生產消費者我們也就不難實現出來了。如下:
/************************************************************************* > File Name: pc.c > Author: tp > Mail: > Created Time: Sun 27 May 2018 06:28:33 PM CST ************************************************************************/ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define PRO_NUM 3 //生產線程數量 #define CON_NUM 0 //消費線程數量 pthread_cond_t cond; pthread_cond_t n_empty; pthread_mutex_t mutex; int g_num = 0; //產品數量 int empty_num = 3; //生產的空位數量 //productor void* pro_route(void* arg) { int id =*(int*)arg; free(arg); while(1) { pthread_mutex_lock(&mutex); while(empty_num <= 0) { printf("生產線程%d等待。\n", id); pthread_cond_wait(&n_empty, &mutex); printf("有空位,數量為%d\n", empty_num); } printf("生產線程%d生產\n", id); ++g_num; --empty_num; printf("生產品%d完成\n", g_num); sleep(rand()%3); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); sleep(rand()%3); } } //consumer void *con_route(void* arg) { int id =*(int*)arg; free( arg); while(1) { pthread_mutex_lock(&mutex); while(g_num <= 0) { printf("消費線程%d等待。。\n", id); pthread_cond_wait(&cond, &mutex); printf("第%d產品到了!!\n", g_num); } printf("消費線程%d消費 產品%d\n", id , g_num); --g_num; ++empty_num; sleep(rand()%2); printf("消費線程%d消費完成\n", id); pthread_cond_signal(&n_empty); pthread_mutex_unlock(&mutex); sleep(rand()%3); } } int main( ) { srand(getpid()); pthread_t tids[PRO_NUM + CON_NUM]; //互斥量,條件變量初始化 pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond, NULL); //條件變量1 pthread_cond_init(&n_empty, NULL);//條件變量2 //創建生產者線程 for(int i =0; i< PRO_NUM; ++i) { int* p = (int*)malloc(sizeof(int)); //傳入參數相當作線程編號 *p = i; pthread_create(&tids[i], NULL, pro_route, p); } //創建消費者線程 for(int i =0; i< CON_NUM; ++i) { int* p = (int*)malloc(sizeof(int)); //消費線程編號 *p = i; pthread_create(&tids[i], NULL, con_route, p); } for(int i =0; i< PRO_NUM + CON_NUM; ++i) //回收線程 { pthread_join(tids[i], NULL); } //互斥量,條件變量銷毀 pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); pthread_cond_destroy(&n_empty); return 0; }
結果:
當我們只有生產者生產時,此時生產的空位生產滿了之后便會阻塞,如下:
當去添加兩個消費線程時(將CON_NUM改為2),這樣生產、消費得以進行。如下: