有關進程通信的知識主要分為五個部分:
①什么是進程通信;
②實現進程通信的誤區;
③如何正確實現進程通信;
④經典的進程通信問題與信號量機制;
⑤避免編程失誤的“管程”。
本文將按照這五個部分的提出順序進行講解,力求通俗易懂、融會貫通。
①什么是進程通信?
需要首先明確的是,進程通信並不是指進程間“傳遞數據”。
為了說明進程通信,需要先介紹一下進程通信的背景。現代操作系統中的進程間可能存在着共享的內存區,比如字處理進程A(可以想象為Word)、字處理進程B(可以想象為記事本)和打印機進程C共享一小塊內存:待打印文件地址隊列。該隊列中有一個指針out指向隊列中下一個被打印的文件地址,還有一個指針in指向隊列尾的后一位置,即新的待打印文件地址應存入的位置。顯然,指針out是供進程C訪問的,每當打印機空閑且out!=in,進程C就打印out所指的文件。而指針in則是供進程A與進程B訪問的,每當它們有希望打印的文件時就執行如下三步:“讀取in”、“向in所指位置寫入待打印文件地址”、“修改in使其指向下一位置”。
但是A和B都能讀寫指針in就會帶來沖突問題:假設現在A占用着CPU並准備打印文件,A讀取了in並將待打印文件名寫入了in所指位置,但是A還沒來得及修改in,CPU就切換到了進程B執行,B在執行過程中也准備打印文件,並且完成了對in的所有操作。一段時間后,CPU又切換到了進程A,但此時的進程A並不知道自己寫入到隊列的文件名已經被B給覆蓋了,A只會繼續執行“修改in使其指向下一位置”的操作,從而出現了進程A與進程B的“沖突”。
這種存在共享內存區的進程間的沖突問題,解決方法的思路是統一的:當某個進程正在操作共享內存區時,其他進程不得操作共享內存區。這個思路實現的關鍵點就是:令其他進程知道“有一個進程在操作共享內存區”,因此這類問題就被稱為進程通信問題,通信的“內容”就是:有沒有其他進程在操作共享內存區。(講解到信號量機制時進程通信將廣義化,但依然不是進程間的“實際通信”,而是某些信號的共享)
因為“操作共享內存區”太長,所以人們一般稱正在操作共享內存區的進程是在臨界區內,同時將進程中需要操作共享內存區的部分代碼稱之為臨界區代碼。思路也就可以稱作:當有進程在臨界區時,其他進程不得進入臨界區。
②實現進程通信的誤區
因為實現進程通信的關鍵,就是令其他進程知道現在已經有進程在臨界區了,所以一個很簡單的解決思路就出來了:
將臨界區想象成一個房子,同一時間內房子內只能有一個進程,那么確保這一點的方法就是給房子加鎖(mutex),如果鎖是鎖上的,則准備進入的進程不得進入,如果鎖是打開的,則准備進入的進程可以進入,並且進入后要將鎖鎖上,此外退出時也要負責將鎖打開。
將上述想法轉換為代碼表示,就是令每個進程在臨界區代碼的前后,分別添加如下代碼,其中mutex為共享內存區中的一個變量:
1 int mutex=1; //mutex為1表示鎖打開,為0表示鎖關閉 2 while(true) 3 { 4 //執行非臨界區代碼 5 6 //准備執行臨界區代碼,即准備進入臨界區 7 8 while(mutex==0);//如果mutex為0,說明有其他進程在臨界區內,當前進程應卡在此處 9 mutex=0;//若代碼能執行至此處,說明mutex為假,即沒有其他進程在臨界區,於是將mutex設為真,告知其他進程當前有(本)進程在臨界區 10 11 /*臨界區代碼*/ 12 13 //准備退出臨界區,解開鎖 14 mutex=1; 15 16 //執行非臨界區代碼 17 }
但是上述代碼是無法解決進程通信問題的!原因就是:如果沒有計算機底層(硬件或操作系統)的限制,那么進程間的切換可能發生在任意兩條機器指令之間,更遑論高級程序語言的兩條語句之間。
現在假設三個進程A、B、C共享某內存區,A已進入臨界區,於是欲進入臨界區的B在第8行代碼處卡住(想進入房子,一直循環判斷“鎖”的狀態)。突然,A退出了臨界區,並且CPU切換到了B,於是B結束了第8行的循環(進入了房子),准備執行第9行代碼(上鎖),但是B還沒來得及“上鎖”,CPU又因為某特殊原因如中斷,被切換到了進程C,並且進程C也想進入臨界區,由於此時“鎖”是打開的,於是C直接結束了第8行的循環(直接進入房子),准備執行第9行代碼。顯然,此時臨界區內有兩個進程了。
因此,實現進程通信時需要注意的最大誤區就是:如果代碼中的語句(指令)不是特殊的,那么任意兩條語句(指令)間均有被“打斷”的可能性。
③如何正確實現進程通信
我們先看看一種不需要借助計算機底層支持的解決進程通信的方法:嚴格輪換法。然后再說說現代計算機實現進程通信的基本技術。
所謂嚴格輪換法,依然可以抽象地將臨界區看成一個房子,但是這次我們不是靠鎖來實現房子內只有一個進程,而是靠“鑰匙”,鑰匙在哪個進程手上,哪個進程就可以進入臨界區,當該進程退出臨界區時,需要將鑰匙交給下一個進程(不管它要不要進入臨界區,反正“輪到你了”)。
以三個進程0,1,2共享內存區為例,則三個進程的臨界區代碼分別如下,假設key初始化為0:
int key=0;
//進程0 while(true) { //不斷判斷鑰匙是否在自己手上,如果不在則一直循環等下去 while(key!=0); //執行臨界區代碼 //退出臨界區,將鑰匙交給下一個進程 key=1; } //進程1 while(true) { //不斷判斷鑰匙是否在自己手上,如果不在則一直循環等下去 while(key!=1); //執行臨界區代碼 //退出臨界區,將鑰匙交給下一個進程 key=2; } //進程2 while(true) { //不斷判斷鑰匙是否在自己手上,如果不在則一直循環等下去 while(key!=2); //執行臨界區代碼 //退出臨界區,將鑰匙交給下一個進程 key=0; }
嚴格輪換法的確可以解決進程通信,但是其效率非常低,原因如下:
1.嚴格輪換:如果此時key為2,那么只有進程2可以進入臨界區,哪怕進程2從始至終都沒有進入臨界區,進程0和進程1也不得進入臨界區。
2.忙等待:如果此時進程0占用着CPU而key為1或2,那么進程0不能進入臨界區,只能一直循環判斷key的值,從整個系統的角度來說,CPU在“忙”,但進程卻在“等待”,也就是說CPU此時一直在空轉(就像汽車空擋猛踩油門)
為了解決忙等待和嚴格輪換的缺陷,一種新的思路被提了出來,我稱之為沉睡與喚醒策略:令進程的共享內存區中保存一個特殊的“沉睡進程隊列”,如果進程A在准備進入臨界區時發現已有其他進程在臨界區內,則進程A將自己的信息加入到沉睡進程隊列,即“沉睡”,然后自我阻塞,從而讓出CPU、避免忙等待,當臨界區內的進程退出臨界區時,需要負責檢查沉睡進程隊列,若隊列不為空,則需要將其中的某個進程移出隊列,並將其“喚醒”,使其從阻塞態進入就緒態。當然,根據實現方式的不同,沉睡、阻塞可能是一個操作,移出隊列和喚醒也是一個操作。
int mutex=1;//mutex為1表示鎖打開,為0表示鎖關閉
//沉睡 void sleep(int process) { //將process所表示的進程加入到沉睡進程隊列 //將process由運行態轉換為阻塞態 } //喚醒 void wakeup() { //檢查沉睡進程隊列,若不為空,則移出某進程,並將其由阻塞態轉換為就緒態 } //進程中的代碼 while(true) { //准備進入臨界區 if(mutex==0) sleep();
//上鎖
mutex=0; //臨界區代碼 //退出臨界區,開鎖,喚醒沉睡進程
mutex=1; wakeup();
}
顯然,上述代碼又踩了②中所提到的誤區。
假設進程A在臨界區內,現在進程B占用CPU並且想進入臨界區,B檢查mutex發現為0,於是准備執行sleep(),但是此時CPU被進程A搶去了,進程A在自己的時間片段內退出了臨界區,試着去“喚醒”沉睡的進程,但是卻沒有沉睡的進程,於是喚醒操作不了了之,接着進程A任務完成、結束。於是CPU又切換到了B,B繼續執行sleep(),進入沉睡並阻塞,但是再也沒有進程會來叫醒B了……
上述現象可以這么說:B本來應該由A來喚醒,但偏偏A的喚醒沒有被B收到,因為B后來才沉睡。該現象出現的根本原因就是:B決定沉睡和執行沉睡是兩個操作,這兩個操作之間可能被“打斷”。
再假設,進程B沉睡,進程A在臨界區內且占用CPU,進程A在時間片段內執行完了臨界區代碼,解開了鎖,准備執行wakeup(),但是CPU此時被進程C搶走,因為鎖已解開,所以C進入了臨界區,一段時間后,C尚未退出臨界區,但CPU再次切換至進程A,A繼續執行wakeup(),將B叫醒,於是B上鎖(其實鎖已經上了),進入臨界區,但是此刻C已在臨界區內!
這個現象可以這么說:A本來打算解開鎖、叫醒B,但A解開鎖后,C乘虛而入了。這個現象出現的根本原因就是:A解開鎖、叫醒沉睡進程是兩個操作,這兩個操作之間可能被“打斷”。此外,即使A先叫醒了B,B也不會再去上鎖,因為對於B來說被叫醒即sleep()結束,可以直接開始臨界區操作,因此其他進程還是會因為鎖打開而進入臨界區!
要想解決上述兩個問題,最直接的想法就是令“判斷是否沉睡”和“沉睡”合為“一個操作”,“解開鎖”和“叫醒沉睡進程”也合為“一個操作”,從而避免被打斷,假設下面的sleep()、wakeup()為“原子操作”,即執行時不會被中斷,則沉睡與喚醒策略可以實現:
//沉睡,假設為原子操作 void sleep(int mutex) { //檢查鎖mutex,若為鎖上,則沉睡並阻塞調用者,否則上鎖並返回 } //喚醒,假設為原子操作 void wakeup(int mutex) { //檢查沉睡進程隊列,若隊列不為空,則喚醒其中某進程(不開鎖,因為被喚醒的進程不會再去上鎖),若隊列空,則解開鎖 } //進程代碼 while(true) { //准備進入臨界區 sleep(mutex); /*臨界區代碼*/ //退出臨界區,並喚醒存在的沉睡進程 wakeup(mutex); }
在只有一個CPU的系統中,令sleep()、wakeup()成為原子操作並不復雜,只要將sleep()設為一個系統調用,並且操作系統在執行時(僅僅幾條指令的時間而已)暫時屏蔽中斷,從而避免執行sleep()、wakeup()時出現進程切換即可,多CPU系統中令sleep()成為原子操作需要一些更特殊的指令或技術。
④經典的進程通信問題與信號量機制
首先看看“生產者-消費者”問題,該問題將引出沉睡與喚醒策略的升級版——信號量機制。
生產者-消費者問題的背景如下:有兩個或多個進程分別為producer和consumer,它們共享的內存區最多可以存放N個產品,producer負責生產產品,consumer負責消費產品,如果共享內存區中已有N個產品,則producer需要沉睡,如果共享內存區中沒有產品,則consumer需要沉睡,並且producer和consumer不能同時訪問共享內存區。
一個簡單的想法是這樣的:
int count=0; //表示共享內存區中的產品個數
int mutex=1; //進入臨界區的鎖,臨界區內的進程可以操作共享內存區
//num即生產者編號,允許存在多個生產者進程
void producer(int num) {
produce(); if(count==N) //沉睡 //欲操作共享區,即欲進入臨界區 sleep(mutex); put_product(); count++; if(count==1) //count為1說明之前count為0,consumer可能已沉睡,所以喚醒consumer //離開臨界區 wakeup(mutex); }
//num即消費者編號,允許存在多個消費者 void consumer(int num) { if(count==0) //沉睡 //欲操作共享區,即欲進入臨界區 sleep(mutex); take_product(); count--; if(count==N-1) //說明之前count為N,producer可能已沉睡,所以喚醒producer //離開臨界區 wakeup(mutex);
consume(); }
很顯然,上述代碼又踩了誤區,對count的判斷和對應的操作之間可能被打斷,從而可能錯過另一方的喚醒:假設只有一個生產者和一個消費者,生產者發現count為N,然后CPU就被消費者占用,消費者取走一個產品並發現需要喚醒沉睡的生產者,但是此刻生產者沒有沉睡,所以喚醒操作不了了之,接着消費者退出臨界區,CPU被生產者占用,生產者執行因count已滿而導致的沉睡,但是消費者不再有機會叫醒它……
不幸的是,sleep()和wakeup()並不能解決對count的判斷,因為count並不是一把“鎖”(鎖只有兩個狀態,count不是),count是一種“信號量”,進程們通過count這個信號量的值來判斷自己是否需要沉睡,與進入臨界區而沉睡不同,這種沉睡是受條件所限而沉睡。但是解決這個問題只需要將sleep()和wakeup()稍加修改就可以:
//down即新的sleep,也是一個“原子操作”,semaphore表示信號量,對應sleep()中的鎖 void down(int semaphore) { //檢查semaphore,若大於0則使其減一並返回,若為0則沉睡調用者 } //up即新的wakeup,也是一個“原子操作”,semaphore表示信號量,對應wakeup()中的鎖 void up(int semaphore) { //檢查semaphore,若為0則喚醒沉睡進程隊列中的某沉睡進程,否則令semaphore+1 }
與sleep()和wakeup()的參數為鎖不同,信號量機制的參數為“信號量”,從而可以解決生產者-消費者問題,同時也可以替代sleep()和wakeup(),因為鎖也可以當做是一個信號量:
//對共享內存區稍作修改,新增變量empty表示空位置個數,count依然表示產品個數
int count=0;
int empty=N;
int mutex=1; //進入臨界區的鎖,1開0閉,臨界區內的進程允許操作共享內存區
//num表示生產者編號,允許存在多個生產者 void producer(int num) { produce(); down(empty);//減少一個空位置,若已為0則沉睡 //進入臨界區 down(mutex); put_product(); //離開臨界區 up(mutex); up(count);//若產品數原先為0,則喚醒因為沒有產品而沉睡的消費者進程,否則令產品數+1 }
//num表示消費者編號,允許存在多個消費者 void consumer(int num) { down(count);//減少一個產品數,若已為0則沉睡 //進入臨界區 down(mutex); take_product(); //離開臨界區 up(mutex); up(empty);//若空位置原先為0,則喚醒因為沒有空位置而沉睡的生產者進程,否則令空位置+1 consume(); }
信號量機制的理解並不困難,個人估計唯一的困惑點就是為什么當semaphore為0時,up()不需要令semaphore+1,這一點的解釋用一句話來說就是:因為當semaphore為0時,down()沒有令semaphore-1。也可以類比wakeup(),wakeup()在有沉睡進程時是不打開鎖的,因為被喚醒的進程不會再去上鎖。
生產者-消費者問題,以及信號量機制帶來了一個新的思考:進程間可能不僅僅存在共享內存區的讀寫沖突問題,還可能存在“資源”共享的問題。在生-消背景下,空位置就是生產者需要的資源,而產品就是消費者需要的資源,進程得在有了需要的資源后才能做自己要做的事。本文最初提到的字處理-打印問題就是生-消問題,只是我們忽略了打印機進程在發現out==in時的操作,如果打印機進程在out==in時選擇沉睡,那么字處理進程就得負責將其喚醒。
接下來看看哲學家就餐問題,該問題可以進一步地體現進程間資源搶占可能導致的問題。假設有5個哲學家圍着桌子坐,每個人面前都有足夠的食物,但筷子只有5根,見圖:
每個哲學家只會做兩件事:思考、吃飯。而每當需要吃飯時,哲學家必須取兩根筷子才能吃,並且只能取自己左手和右手的筷子,如上圖哲學家A只能用筷子0和筷子1吃飯。
顯然,各個哲學家就相當於各個進程,筷子就是它們需要共享的資源(並且共享方式是連鎖的)。先來看看錯誤的代碼:
int chopMutex[5]={1}; //5根筷子各自的信號量(鎖),1可取0不可取
//哲學家進程,num表示哲學家編號,從0到4對應從A到E void philosopher(int num) { while(true) { think(); //思考 down(chopMutex[num]); //拿左邊的筷子,若已被左邊的哲學家取走,則阻塞 down(chopMutex[(num+1)%5]); //拿右邊的筷子,若已被右邊的哲學家取走,則阻塞 eat(); //吃飯 //逐個放下筷子 up(chopMutex[num]); up(chopMutex[(num+1)%5]); } }
上述代碼初看仿佛沒有問題,每個哲學家都利用信號量機制(此處的信號量即單根筷子的鎖)來取筷子。但其實上述代碼是有問題的:若A拿起了左邊的筷子后就切換到了B,B也拿起了左邊的筷子,然后又切換到了C,C也拿起了左邊的筷子……最后,每個哲學家都拿到了自己左手邊的筷子,但是每個哲學家都會因為拿不到右邊的筷子而一直阻塞下去。這種現象我們稱之為“死鎖”:一個進程的集合中,每一個進程都在等待只能由同一集合中的其他進程才能觸發的事件(比如釋放某資源)。
解決哲學家問題的簡單解法是:令同一時間只允許一個哲學家吃飯。
int qualification=1; //代表吃飯的權利
//哲學家進程,num表示哲學家編號,從0到4對應從A到E void philosopher(int num) { while(true) { think(); //思考 down(qualification); //試圖獲取吃飯的權利 //拿筷子,吃飯 takeChopsticks(num); takeChopsticks((num+1)%5); eat(); //放下筷子,停止吃飯 putChopsticks(num); putChopsticks((num+1)%5); up(qualification); //交出吃飯權利,即吃完了 } }
上述解法沒有問題,只是有缺陷:5根筷子明明可以支持兩個哲學家吃飯,比如A和C或者A和D一起吃,上述解法卻只讓一個人吃。
可以解決該缺陷的一種解法是,設置一個mutex作為進入臨界區的鎖,再令每個哲學家對應兩個信號量,state和qualification,state表示該哲學家的“狀態”:思考、想吃飯、在吃飯;qualification表示該哲學家的“資格”:現在有沒有資格吃飯。每個哲學家都可以讀、寫任一哲學家的狀態和資格,因此臨界區即讀寫哲學家狀態、資格的代碼。
當哲學家X准備吃飯即想拿筷子時,先進入臨界區(從而可以讀寫任一哲學家的state和qualification),然后將自己的狀態改為想吃飯,接着檢查自己左右兩邊的哲學家是否在吃飯,如果均不在吃飯,則自己有資格吃飯,於是通過up()使自己的qualification+1,然后退出臨界區,再通過down()使用掉自己的資格;如果左右兩邊有哲學家在吃飯,則不使自己的qualification+1,退出臨界區,再通過down()使用自己的資格,但是因為沒有資格,X將阻塞於此,直到正在吃飯的旁邊哲學家吃完飯,然后給予自己資格。
當哲學家Y吃完飯即放下筷子時,先進入臨界區,將自己的狀態改為思考,接着檢查自己左右兩邊的哲學家是否想吃飯且有資格吃飯,若是則令其qualification+1從而使其得以吃飯,左右哲學家均處理完畢后Y退出臨界區。
#define THINKING 0 //在思考 #define HUNGRY 1 //想吃飯 #define EATING 2 //在吃飯 int state[5]={THINKING}; //表示各個哲學家的狀態 int mutex=1; //臨界區的鎖,為0表示鎖上 int qualification[5]={0}; //哲學家的資格,為1時表示可以吃飯,0表示不可以
//檢查哲學家i是否想吃飯且有資格吃飯 void check(int i) { //若哲學家i想吃飯,且其左右哲學家均不在吃飯,則i的吃飯資格+1,並且將i的狀態改為正在吃飯 if(state[i]==HUNGRY && state[(i+4)%5]!=EATING && state[(i+1)%5]!=EATING) {
state[i]=EATING;
up(qualification[i]); }
} void takeChopsticks(int i) { down(mutex); //進入臨界區(臨界區內可讀、寫哲學家的狀態和資格) state[i]=HUNGRY; //表明i想吃飯 check(i); //檢查自己是否有資格吃飯 up(mutex); //離開臨界區 down(qualification[i]); //若check(i)時確認自己有資格吃飯,則此處用去吃飯資格,否則阻塞直至被給予吃飯資格 } void putChopsticks(int i) { down(mutex); //進入臨界區(臨界區內可讀、寫哲學家的狀態和資格) state[i]=THINKING; //表明自己不在吃飯也不想吃飯 check((i+4)%5); //檢查左邊的哲學家是否想且有資格吃飯,若是則給予他資格 check((i+1)%5); //檢查右邊的哲學家是否想且有資格吃飯,若是則給予他資格 up(mutex); //離開臨界區 } void philosopher(int i) { while(true) { think(); takeChopsticks(i); putChopsticks(i); } }
哲學家進餐問題比生產者-消費者問題要更復雜,因為進程需要的資源不是一種而是兩種,而且這兩種資源的競爭對象不一樣。解決這類問題的關鍵點就是:如果進程X需要x個資源,則X要么一次性占用這x個資源,要么一個都不占用直到可以一次性占用着x個資源,不能出現占用一部分資源然后等待的情況。
最后提出的問題是最復雜的,叫讀者-寫者問題,在哲學家進餐問題中,資源是“獨享”式的:一根筷子如果被一個哲學家取走了,則這根筷子只能屬於該哲學家,除非他放下筷子。但是在讀者-寫者問題中,資源是既“獨享”又“共享”的,我們先看看其背景:
假設存在大量進程共享一個數據庫(或文件),為了簡化問題,我們再假設進程要么是只會讀取數據庫的“讀者”,要么是只會寫入數據庫的“寫者”,同一時間數據庫要么有一個寫者在寫、要么有不限量個讀者在讀、要么沒有進程訪問。
根據上述要求,該數據庫作為一種資源,在讀者與寫者之間、寫者與寫者之間是“獨享”的:有讀者在數據庫則寫者得等,有寫者在數據庫則讀者、其他寫者得等。但是在讀者與讀者之間又是“共享”的:有讀者在數據庫則其他后到的讀者可以進去讀。
如果不允許讀者-讀者共享,那么問題就變得很簡單,只要給數據庫上一把“鎖”即可,有進程在數據庫,其他想進去的進程就得沉睡。所以讀者-寫者問題的關鍵難點就是:如何令已有讀者在讀的情況下,后來的讀者可以進去?
根據關鍵難點的描述,一種被稱為“讀者優先”的解決思路被提出來:
設置共享變量rd_count,表示當前數據庫中讀者的數量,設置一把鎖rdc_mutex,進程要想操作rd_count,必須利用鎖rdc_mutex進出“rd_count的臨界區”。再設置一把鎖db_mutex,表示數據庫的鎖。
寫者想進入數據庫,必須在數據庫內無人的情況下才行,即db_mutex解開時才可以進入數據庫。同理,寫者退出數據庫時,必須解開db_mutex鎖。
若讀者想進入數據庫,則必須滿足兩個條件其中一個:
1.數據庫內無人且鎖打開 2.數據庫內有人但是是讀者。
同理,讀者退出數據庫時,若數據庫內還有人(讀者)則直接推出,否則退出並解開db_mutex。我們可以借助rd_count來實現對第二點的判斷,詳情見代碼:
//讀者優先解法 int db_mutex=1; //數據庫的鎖,db即database,1開0閉 int rd_count=0; //表示數據庫中讀者的數量,rd即reader int rdc_mutex=1; //rd_count的鎖,rdc即reader_count,1開0閉 //讀者進程,num即讀者編號 void reader(int num) { while(true) { /***准備進入數據庫***/ //先通過rdc_mutex進入rd_count臨界區 down(rdc_mutex); //判斷數據庫內是否已有讀者,若是,則負責搶數據庫的鎖 if(rd_count==0) down(db_mutex); //若自己是“第一個”讀者,則執行至此時已搶到數據庫並上了鎖,所以令rd_count++ //若自己不是“第一個”讀者,則直接令rd_count++ rd_count++; up(rdc_mutex); //離開rd_count臨界區 read_data(); //讀取數據庫數據 /***准備離開數據庫***/ //通過rdc_mutex進入rd_count臨界區 down(rdc_mutex); //令rd_count--后判斷數據庫內是否已無讀者,若是則解開數據庫的鎖,喚醒沉睡進程(若有,必為寫者) rd_count--; if(rd_count==0) up(db_mutex); up(rdc_mutex); //離開rd_count臨界區 use_data(); } } //寫者進程,num即編號 void writer(int num) { while(true) { produce_data(); //欲進入數據庫,檢查鎖,若鎖上沉睡,否則鎖上並進入 down(db_mutex); write_data(); up(db_mutex); //離開數據庫,喚醒沉睡進程(可能是讀者也可能是寫者) } }/
顯然,上述代碼可以滿足讀者-寫者問題的問題,只是存在一點“缺陷”:如果不斷地有讀者到來,以致於數據庫內總是至少有一個讀者,那么寫者將永遠沒有機會進入數據庫。這也是該解法被稱為“讀者優先”的原因。因為只要數據庫內有讀者,那么后面來的讀者就可以進入數據庫,而不需要在意是否有先到的寫者想進入數據庫。
但是在某些情況下,我們希望算法能滿足:即使數據庫內有讀者,如果有寫者先到達(在等待),那么后到達的讀者也不能進入數據庫,必須讓先到達的、等待中的寫者使用完數據庫后才可以進入數據庫。
要想滿足該要求,被稱為“讀寫平等”的解決思路被提了出來:在數據庫“門前”設立一個“候選人”位置,每個進程必須先成為候選人,再判斷能否進入數據庫。利用候選人機制,即使數據庫內有讀者(數量不定),只要寫者搶占了候選人位置,后到達的讀者就不能進入數據庫,同理后到達的寫者也需等待。如果令因沒搶到候選人而沉睡的進程按到達時間順序排成隊列,並且喚醒時按隊列順序進行,那么進程訪問數據庫的順序就是時間順序的,因此這個算法也被稱為“讀寫平等”算法。舉例來說,數據庫內有進程,然后寫者A到達、成為候選人、沉睡,多個讀者到達、等待候選人位置、沉睡,寫者B到達、等待候選人位置、沉睡,那么最后這些進程一定是按“寫者A”、“讀者群”、“寫者B”的順序進入數據庫,也即按時間順序進入的數據庫,從而實現“讀寫平等”。
//讀寫平等 int candidate=1; //候選人資格鎖,1無候選人0有候選人 int rd_count=0; //讀者數量 int db_mutex=1; //數據庫鎖,1開0閉 int rdc_mutex=1; //rd_count的鎖,需要鎖是因為候選人讀者和欲離開數據庫的讀者都需要讀寫rd_count void reader() { while(true) { //欲進入數據庫,先爭奪候選人 down(candidate); //成為候選人后,進入rd_count臨界區 down(rd_count); //若數據庫內無讀者,則自己是第一個讀者,負責給數據庫上鎖,以防止寫者進入 if(rd_count==0) down(db_mutex); //修改讀者數量,離開rd_count臨界區,解開候選人鎖(因為自己進入數據庫) rd_count++; up(rd_count); up(candidate); read_data();//數據庫內操作 //欲離開數據庫,進入rd_count臨界區,若自己是最后一個讀者,解開數據庫鎖 down(rd_count); rd_count--; if(rd_count==0) up(db_mutex); up(rd_count); use_data(); //數據庫外操作 } } void writer() { while(true) { produce_data(); //數據庫外操作 //欲進入數據庫,先奪得候選人資格 down(candidate); //成為候選人后,給數據庫上鎖,以保證只有自己在內 down(db_mutex); up(candidate); //進入數據庫,解開候選人資格鎖 write_data(); //數據庫內操作 //離開數據庫,解開數據庫鎖 up(db_mutex); } }
在某些特殊情況下,我們可能需要一個更加極端的讀者-寫者算法,那就是“寫者優先”:
1.只要有寫者在等待,想進入數據庫的讀者就必須等待
2.數據庫鎖由鎖上變為打開時,優先喚醒寫者進程,不論是否有先於其到達的讀者(從而打破了時間順序,令寫者有了優先權)
回顧讀寫平等算法,可以發現當數據庫鎖解開時,離開數據庫的要么是寫者,要么是最后的讀者。
如果離開的是讀者,而且有沉睡候選人,那么沉睡候選人一定是寫者(讀者不會因為數據庫內有讀者而沉睡),所以最后的讀者只需要喚醒候選人即可保證“寫者優先”。
問題出在離開的是寫者的情況,寫者離開時的沉睡候選人既可能是寫者,也可能是讀者,但寫者離開時並沒有考慮這一點。因此要想實現寫者優先,需要下手的是寫者進程,讓它們變得更為自己人考慮。
在讀者優先算法中,讀者能“給自己人優先權”的根本原因在於讀者掌控着數據庫鎖,只要讀者不解開這把鎖,寫者就無法進入,但其它讀者通過rd_count,得到了一定情況下無視數據庫鎖的“特權”。
因此,一種類似的解法被提了出來:設置變量wt_count表示數據庫內以及想進入數據庫的寫者總數,想進入數據庫的寫者先通過wt_count判斷數據庫內是否有寫者,若無則競爭候選人,再等待數據庫鎖,若有則直接等待數據庫鎖;想離開數據庫的寫者直接釋放數據庫鎖(若有等待中的寫者,此后即可進入),再通過wt_count判斷是否還有其他寫者,若無則解開候選人鎖,若有則直接走人,由“最后一個”寫者負責解開候選人鎖。這個想法就是利用候選人鎖,使得寫者得以“卡住”數據庫、保證若有其他寫者則將數據庫讓給其他寫者。從而實現了“寫者優先”
//寫者優先 int candidate=1; //候選人資格鎖,1無候選人0有候選人 int wt_count=0; //數據庫內及想進入數據庫的寫者數量 int rd_count=0; //數據庫內的讀者數量 int db_mutex=1; //數據庫鎖,1開0閉 int rdc_mutex=1; //rd_count的鎖,需要鎖是因為候選人讀者和欲離開數據庫的讀者都需要讀寫rd_count int wtc_mutex=1; //wt_count的鎖,需要鎖是因為欲進入數據庫的寫者和欲離開數據庫的寫者都需要讀寫wt_count //reader進程與讀寫平等時相同 void reader() { while(true) { //欲進入數據庫,先爭奪候選人 down(candidate); //成為候選人后,進入rd_count臨界區 down(rd_count); //若數據庫內無讀者,則自己是第一個讀者,負責給數據庫上鎖,以防止寫者進入 if(rd_count==0) down(db_mutex); //修改讀者數量,離開rd_count臨界區,解開候選人鎖(因為自己進入數據庫) rd_count++; up(rd_count); up(candidate); read_data();//數據庫內操作 //欲離開數據庫,進入rd_count臨界區,若自己是最后一個讀者,解開數據庫鎖 down(rd_count); rd_count--; if(rd_count==0) up(db_mutex); up(rd_count); use_data(); //數據庫外操作 } } void writer() { while(true) { produce_data(); //數據庫外操作 //欲進入數據庫,先進入wt_count臨界區,判斷自己是否是“第一個”寫者 down(wtc_mutex); wt_count++; if(wt_count==1) //若自己是“第一個”寫者,則需要搶奪候選人 down(candidate); down(db_mutex); //不論自己是否是“第一個”寫者,都需要等待數據庫鎖 write_data(); //數據庫內操作 //欲離開數據庫,直接解開數據庫鎖,再進入wt_count臨界區,判斷自己是否是“最后一個”寫者 up(db_mutex); down(wtc_mutex); wt_count--; if(wt_count==0) //若自己是“最后一個”寫者,則解開候選人鎖,從而令讀者有機會搶奪候選人、進入數據庫 up(candidate); up(wtc_mutex); } }
⑤避免編程失誤的“管程”
回顧三個經典進程通信問題,可以發現,信號量機制的確可以解決進程通信的問題,但是編程較為麻煩且容易出錯造成死鎖,以生產者-消費者問題為例,如果因為編程時的失誤,某個生產者進程對信號量的操作順序從
down(empty);//減少一個空位置,若已為0則沉睡 down(mutex);//進入臨界區
變成了
down(mutex);//進入臨界區 down(empty);//減少一個空位置,若已為0則沉睡
那么生產者就可能因為進入臨界區后發現已無空位置而沉睡,並且沒有解開mutex從而導致消費者沒法取走產品,造成進程間的死鎖。也就是說,通過直接對進程編程來使用信號量是“比較危險”的做法,一不小心就可能造成死鎖等異常情況。因此,一種新的利用信號量實現進程通信的思想被提了出來:管程。
通過對進程通信的分析,可以發現,同一類進程對信號量的操作是相同的,比如生-消問題中的生產者,都是執行如下代碼
down(empty);
down(mutex);
put_product();
up(mutex);
up(count);
而消費者都是執行如下代碼
down(count);
down(mutex);
take_product();
up(mutex);
up(empty);
那么,我們是否可以做出如下的一個獨立的“模塊”
int empty=N; int count=0; void put_product(productType x) { down(empty); put(x); up(count); } void take_product(productType &x) { down(count); x=take(); up(empty); }
然后做出如下限制(為簡便,put_product()簡記為p(),take_product()簡記為t()):
1.同一時間只能有一個進程在執行p()或t(),其它調用了p()或t()的進程排隊等待
2.一個進程若在執行p()或t()時因為down()某個信號量而阻塞,則掛起,讓等待執行p()或t()的另一個進程執行其調用的p()或t()
3.一個進程若在執行p()或t()時因為up()某個信號量而喚醒了某沉睡進程,則在當前進程退出p()或t()后,令被喚醒進程執行其之前調用的p()或t()
如果能實現這一限制,那么生產者和消費者就可以簡單的完成自己想做的事,避免編程失誤或者說方便進程通信出錯時debug
void producer() { productType x; while(true) { x=produce(); //生產產品 put_product(x); //放置產品 } } void consumer() { productType x; while(true) { take_product(&x); //取得產品 consume(x); //消費產品 } }
上述的所謂“模塊”就是所謂的管程,習慣面向對象編程的人也可以將其視為一個類。之所以將這種實現進程通信的技術稱之為管程,是因為在旁人看來,管程就是一個管理員,其負責保證同一時間只能有一個進程調用某些方式,並且負責這些進程的沉睡與喚醒。
需要注意的是,就像信號量機制需要計算機底層的支持一樣,管程也不是任意情況下均能實現。比如C語言就不可能實現管程,因為C語言無法滿足管程需要的條件。但是有一些語言是可以實現管程的,比如JAVA,利用關鍵詞synchronized,可以使同一個類中的某些方法不能被“同時”執行,借助此支持,再將生產者、消費者、管程寫在同一個類中,就可以實現(線程級別的)管程思想。
有關進程通信的基礎知識就是上面這些,但是進程通信問題引出了另一個問題——死鎖。雖然本文提到過死鎖,但一直是在避免死鎖的出現。那么死鎖萬一出現了,該如何令操作系統知曉呢?操作系統知道有死鎖發生后,能不能解開死鎖呢?這類問題與進程通信有關,但又自成一派,因此將其留作日后單獨討論。