我們都見過交通阻塞,一大堆汽車因為爭奪行路權,互不相讓而造成阻塞,又或者因為車輛發生故障拋錨或兩輛車相撞而造成道路阻塞。在這種情況下,所有的車都停下來,誰也無法前行,這就是死鎖。本篇就來了解一下什么是死鎖,如何應對死鎖。
一、死鎖初窺
1.1 為何會發生死鎖?
死鎖的發生歸根結底是因為對資源的競爭。因為大家都想要某種資源,但又不能隨心所欲地得到所有資源,在爭奪的僵局中,導致任何人無法繼續推進。
在一個系統里存在多個線程,而這些線程共享該計算機系統里的資源。因為資源競爭而造成系統無法繼續推進就難以避免了。這里的資源可以使硬件(CPU、內存、磁盤等),也可以是軟件(例如鎖、信號量等)。
1.2 死鎖的定義與必要條件
(1)死鎖的定義
如果有一組線程,每個線程都在等待一個事件的發生,而這個事件只能有該線程里面的另一線程發出,則稱這組線程發生了死鎖。這里的事件主要是資源的釋放,在死鎖狀態中,沒有線程可以執行、釋放資源或被叫醒。
例如,有線程A和線程B如下:
如果線程A和線程B交替執行,那么線程A和線程B均會因為無法獲得對應的資源而無法繼續執行也不能釋放鎖,從而造成死鎖,如下圖所示:
(2)死鎖的4個必要條件
① 資源有限:即一個系統里面的資源數量是有限的,以致於無法同時滿足所有線程的資源需求。
② 持有等待:即一個線程在請求新的資源時,其已經獲得的資源並不釋放,而是繼續持有。
③ 不可搶占:即如果可以搶占一個資源,則也不會發生死鎖。(凡是可以搶占的資源,均不會稱為死鎖的原因)
④ 循環等待:即如果你等我、我等你,大家都這樣等着對方,就產生了死鎖。
二、應對死鎖
2.1 引子:哲學家就餐問題
哲學家每天只做兩件事:思考和吃飯。他們每天不停地思考人生的這里,比如人從什么地方來,人為什么是現在這個樣子,人類往哪里去等深刻的問題。當然,思考久了就會感到飢餓,而飢餓了就要吃飯。但是,吃飯是有規矩的:
哲學家圍坐在一個圓桌邊,每個人的左右兩邊均放着一根筷子。如果要吃飯,需要獲得左右兩邊的筷子(不能用一根筷子吃飯),如下圖所示:
我們很自然地得到一個算法,對於每一個哲學家,執行以下的算法:
① 等待左邊的筷子可用,然后拿起左邊的筷子。
② 等待右邊的筷子可用,然后拿起右邊的筷子。
③ 吃飯。
④ 放下兩根筷子。
顯然,如果每個哲學家穿插着執行,將會出現每個哲學家都拿起左邊筷子,而等待右邊筷子的情況,即死鎖將會發生。那么,有木有辦法防止哲學家出現死鎖呢?
2.2 死鎖的應對方法
操作系統應對死鎖的策略可以分為兩大種、四小種。兩大種是:允許死鎖發生 和 不讓死鎖發生。四小種是:允許死鎖發生有兩個子對策,一是假裝看不見不予理睬,二是死鎖發生后想辦法解決;不讓死鎖發生也有兩個子對策,一是通過平時的仔細檢點避免難題出現,二是通過將發生死鎖的必要條件消除杜絕死鎖的發生。
(1)順其自然不予理睬
此種策略就是操作系統不做任何措施,任由死鎖發生。老子曾說,無為而治,說的就是這個策略。但是,如果牽涉到高可靠性系統、實時控制系統,那就另當別論了,那絕對不允許死鎖。
(2)死鎖的檢測與恢復
在死鎖檢測上,一般會利用到兩個矩陣:一個是資源分配矩陣,另一個是資源等待矩陣,如下圖所示:
資源分配矩陣
資源等待矩陣
此外,還維持兩個矢量:一個是系統資源總量矢量(表示系統中所有資源的總數是多少),另一個是系統當前可用資源矢量(代表系統現在還有多少可用的資源),如下圖所示:
有了上面的矩陣和矢量,我們就可以通過簡單地矩陣運算來判斷系統是否發生了死鎖。例如,將上圖中的資源可用數量矩陣與資源等待矩陣的每一行相減,都會出現負值,那么該系統將要發生死鎖。
在死鎖恢復上,首先可以搶占(即將某個線程所占有的資源強行拿走,分配給別的線程),其次可以將整個線程Kill殺掉(因為搶占一個線程的資源有可能造成該線程無法再正確運行了),最后則是Rollback回滾(即將整個系統回滾到過去的某個狀態,大家從那個狀態重新來過)
(3)死鎖的動態避免
死鎖的檢測與恢復屬於后發制人,這時死鎖的消極后果已經產生,即使修復也已經浪費了時間,降低了效率,甚至造成了其他損失。因此,需要更加積極主動一點,不要等到死鎖發生了再亡羊補牢,而是在運行中就小心翼翼,不讓思索發生。
動態避免的原則在於:在每次進行資源分配時,必須經過仔細計算,確保該資源請求批准后系統不會進入死鎖或潛在的死鎖狀態。例如,有一種資源的數量為10個,當前有3個線程正在運行。每個線程需要資源的最大數和當前已經占用的資源數如下表所示:
可以通過以下分配過程得知,存在一個資源分配順序使得所有線程都能獲得其需要的資源,從而得知當前狀態是安全狀態,不會產生死鎖。相反,如果不存在這樣一個順序,那么就有可能產生死鎖。
動態避免的優點就是無需等待死鎖的發生,而是在死鎖有可能發生的時候采取先發制人的措施,斷然拒絕有可能進入死鎖的資源請求。但是,計算一個狀態是否安全並不是一件容易的事情。
(4)死鎖的靜態防止
該策略的中心思想是:清除死鎖發生的土壤(即死鎖的4個必要條件),只要清除4個必要條件中的一個,那么死鎖將無法發生。
① 清除資源獨占條件:一是增加資源到所有線程滿足的資源需要,但這並不實際,因為資源是有限的;二是將資源變為共享,但並不適合與所有的資源(例如鍵盤輸入就無法共享)。
② 清除保持和請求條件:一個線程必須一次請求其所需的所有資源,而不是一般情況下的請求一點資源做一點事情。由於一個線程一次就獲得了其所需的所有資源,該線程就可以順利執行,不會發生死鎖。
③ 清除非搶占條件:允許搶占資源,也就是說可以從一個線程手上將資源搶奪過來。
④ 清除循環等待條件:出現循環等待是因為線程請求資源的順序是隨機的,所以只要約定線程對資源的使用順序,那么死鎖就不能發生。
2.3 銀行家算法
顧名思義,銀行家算法就是仿照銀行發放貸款時采用的控制方式而設計的一種死鎖避免算法,該算法的策略是實現動態避免死鎖。
銀行家算法的基本思想是:分配資源之前,判斷系統是否是安全的;若是,才分配。每分配一次資源就測試一次是否安全,不是資源全部就位后才測試。我們可以把操作系統看作是銀行家,操作系統管理的資源相當於銀行家管理的資金,進程向操作系統請求分配資源相當於用戶向銀行家貸款。概括起來基本思想就是:
① 分配檢測:Request < Need
Request < Available
② 安全序列檢測算法
下面看一個在操作系統教科書中出現的例子:
某系統有R1,R2,R3共3中資源,在T0時刻P0,P1,P2,P3和P4這5個進程對資源的占用和需求情況如下表1,此時系統的可用資源向量為(3,3,2)。試問:
1、T0時刻系統是否存在安全序列?
2、P1請求資源:P1發出請求向量Request(1,0,2),系統是否接受該請求?請使用銀行家算法檢查
表1 T0時刻的資源分配表
MAX Allocation Need Available P0 7 5 3 0 1 0 7 4 3 3 3 2 P1 3 2 2 2 0 0 1 2 2 P2 9 0 2 3 0 2 6 0 0 P3 2 2 2 2 1 1 0 1 1 P4 4 3 3 0 0 2 4 3 1 1、T0時刻系統是否存在安全序列?
Available > Need1 ----> 可用資源分配給P1,直到P1進程執行完成,然后Available = Available + Allocation1 = (5,3,2)
Available > Need3 ----> 可用資源分配給P3,直到P3進程執行完成,然后Available = Available + Allocation3 = (7,4,3)
Available > Need4 依次類推
得到安全序列為:P1,P3,P4,P2,P0
2、P1請求資源:P1發出請求向量Request(1,0,2),系統是否接受該請求?請使用銀行家算法檢查
第一步(假分配檢查):把Request分配給P1,必須滿足Request要小於Available,Request要小於Need。
Request(1,0,2)< Available(3,3,2)
Request(1,0,2)< Need(1,2,2)
因為滿足第一步檢查,進入第二層檢查(安全序列檢查)。
第二步(安全序列檢查):建立安全性檢查表
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 如果 Work > Need,那么執行Work+Allocation,得到:
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 5 3 2 true 5 3 2 找到Need < Work的進程,如果沒有找到這樣的進程而進程集合沒有執行,則算法返回,得到不存在安全序列結果,否則繼續執行該算法。
這里我們找到了P3進程。修改安全序列檢查表:
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 5 3 2 true P3 5 3 2 0 1 1 2 1 1 7 4 3 true 7 4 3 這樣一直執行到所有的進程到完成,以完成該安全序列檢查表:
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 5 3 2 true P3 5 3 2 0 1 1 2 1 1 7 4 3 true P4 7 4 3 4 3 1 0 0 2 7 4 5 true P0 7 4 5 7 4 3 0 1 0 7 5 5 true P2 7 5 5 6 0 0 3 0 2 10 5 7 true 這樣就找到了整個安全序列為:P1,P3,P4,P0,P2
總的來說,銀行家算法是一個動態避免死鎖算法,通過對資源的仔細分配以避免死鎖。其特點是可以超額批准客戶的信用額度,即所有客戶的信用額度之和可以超過銀行的全部資本,這就是杠桿(Leverage)!
2.4 解決:哲學家就餐問題
這里使用C#語言,模擬信號量,以消除死鎖的必要條件(消除保持並等待的必要條件)的方式來實現解決哲學家就餐問題。
(1)首先定義哲學家的三種狀態:
/// <summary> /// 哲學家狀態 /// </summary> public enum StateEnum { // 思考狀態 THINKING = 0, // 飢餓狀態 HUNGRY = 1, // 吃飯狀態 EATING = 2 }
(2)然后定義一個臨界區互斥用的信號量,給每個哲學家單獨定義一個信號量。如果一個哲學家需要阻塞,則阻塞發生在該信號量上。
private const int NumOfPhilosopher = 5; // 哲學家人數 private static StateEnum[] states = new StateEnum[NumOfPhilosopher]; // 記錄每個哲學家當前狀態的數組 private static semaphore mutex = 1; // 模擬互斥信號量 private static semaphore[] s = new semaphore[NumOfPhilosopher]; // 每個哲學家等待一個單獨的信號量
這里的semaphore其實就是int類型:
using semaphore = System.Int32;
要模擬互斥信號量,需要有兩種基本原語操作Up 和 Down:
/// <summary> /// 互斥信號量Down /// </summary> private static void Down(semaphore mutex) { if (mutex == 1) { mutex--; } } /// <summary> /// 互斥信號量Down /// </summary> private static void Down(ref semaphore mutex) { // 阻塞操作 while (mutex < 1) { } } /// <summary> /// 互斥信號量Up /// </summary> private static void Up(semaphore mutex) { if (mutex == 0) { mutex++; } } /// <summary> /// 互斥信號量Up /// </summary> private static void Up(ref semaphore mutex) { if (mutex == 0) { mutex++; } }
(3)哲學家的兩種生活狀態:Think 和 Eat
/// <summary> /// 思考 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void Think(int philosopher) { Console.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); } /// <summary> /// 吃飯 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void Eat(int philosopher) { Console.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); }
(4)哲學家的日常生活:思考,拿筷子,吃飯,放下筷子,繼續思考......
/// <summary> /// 哲學家程序 /// </summary> /// <param name="philosopher">哲學家編號</param> private static void PhilosopherRoutine(object number) { int philosopher = (semaphore)number; while (true) { Think(philosopher); TakeChopsticks(philosopher); // 同時獲得兩根筷子,否則阻塞等待 Eat(philosopher); PutChopsticks(philosopher); // 同時放下兩根筷子 } } /// <summary> /// 獲取筷子 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void TakeChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.HUNGRY; Test(philosopher); // 試圖拿起兩根筷子 Up(mutex); Down(ref s[philosopher]); // 如果沒有拿到筷子,則繼續阻塞等待 } /// <summary> /// 放下筷子 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void PutChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.THINKING; int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; // 測試左面的哲學家是否可以吃飯 Test(left); // 測試右面的哲學家是否可以吃飯 Test(right); Up(mutex); } /// <summary> /// 測試是否可以同時拿起兩根筷子 /// </summary> /// <param name="philosopher">哲學家編號</param> private static void Test(int philosopher) { int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; if (states[philosopher] == StateEnum.HUNGRY && states[left] != StateEnum.EATING && states[right] != StateEnum.EATING) { // 可以拿起兩根筷子,改變哲學家狀態到吃飯狀態 states[philosopher] = StateEnum.EATING; // 發出叫醒信號 Up(ref s[philosopher]); } }
Run之后的結果如下圖所示:
這樣看不清楚,截一段結果出來看看:
Philosopher:2 IS THINKING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:1 IS THINKING. Philosopher:5 IS EATING. Philosopher:4 IS THINKING. Philosopher:3 IS EATING. Philosopher:5 IS THINKING. Philosopher:2 IS EATING. Philosopher:1 IS EATING. Philosopher:4 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:1 IS THINKING. Philosopher:4 IS THINKING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:1 IS EATING. Philosopher:2 IS THINKING. Philosopher:4 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:1 IS THINKING. Philosopher:3 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:4 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:3 IS EATING. Philosopher:5 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS EATING. Philosopher:1 IS EATING. Philosopher:5 IS THINKING. Philosopher:3 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:4 IS EATING. Philosopher:5 IS THINKING. Philosopher:3 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:5 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:5 IS EATING. Philosopher:1 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:5 IS THINKING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:1 IS EATING. Philosopher:5 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:4 IS EATING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:1 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS EATING. Philosopher:5 IS THINKING. Philosopher:4 IS EATING. Philosopher:3 IS THINKING. Philosopher:1 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS EATING. Philosopher:1 IS THINKING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:5 IS THINKING. Philosopher:5 IS EATING.
可以看到,哲學家們交替着思考吃飯,井然有序,沒有發生死鎖。這里沒有使用.NET中現有的Mutex、Semaphore等類型,而采用了一個int類型來模擬最簡單的互斥信號量。
完整的代碼如下:

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using semaphore = System.Int32; namespace PhilosopherDemo { public class Program { private const int NumOfPhilosopher = 5; // 哲學家人數 private static StateEnum[] states = new StateEnum[NumOfPhilosopher]; // 記錄每個哲學家當前狀態的數組 private static semaphore mutex = 1; // 模擬互斥信號量 private static semaphore[] s = new semaphore[NumOfPhilosopher]; // 每個哲學家等待一個單獨的信號量 /// <summary> /// 初始化哲學家狀態 /// </summary> private static void InitializePhilosopher() { for (int i = 0; i < NumOfPhilosopher; i++) { states[i] = StateEnum.THINKING; s[i] = 1; } } /// <summary> /// 哲學家程序 /// </summary> /// <param name="philosopher">哲學家編號</param> private static void PhilosopherRoutine(object number) { int philosopher = (semaphore)number; while (true) { Think(philosopher); TakeChopsticks(philosopher); // 同時獲得兩根筷子,否則阻塞等待 Eat(philosopher); PutChopsticks(philosopher); // 同時放下兩根筷子 } } /// <summary> /// 獲取筷子 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void TakeChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.HUNGRY; Test(philosopher); // 試圖拿起兩根筷子 Up(mutex); Down(ref s[philosopher]); // 如果沒有拿到筷子,則繼續阻塞等待 } /// <summary> /// 放下筷子 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void PutChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.THINKING; int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; // 測試左面的哲學家是否可以吃飯 Test(left); // 測試右面的哲學家是否可以吃飯 Test(right); Up(mutex); } /// <summary> /// 測試是否可以同時拿起兩根筷子 /// </summary> /// <param name="philosopher">哲學家編號</param> private static void Test(int philosopher) { int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; if (states[philosopher] == StateEnum.HUNGRY && states[left] != StateEnum.EATING && states[right] != StateEnum.EATING) { // 可以拿起兩根筷子,改變哲學家狀態到吃飯狀態 states[philosopher] = StateEnum.EATING; // 發出叫醒信號 Up(ref s[philosopher]); } } /// <summary> /// 思考 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void Think(int philosopher) { Console.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); } /// <summary> /// 吃飯 /// </summary> /// <param name="philosoper">哲學家編號</param> private static void Eat(int philosopher) { Console.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); } /// <summary> /// 互斥信號量Down /// </summary> private static void Down(semaphore mutex) { if (mutex == 1) { mutex--; } } /// <summary> /// 互斥信號量Down /// </summary> private static void Down(ref semaphore mutex) { // 阻塞操作 while (mutex < 1) { } } /// <summary> /// 互斥信號量Up /// </summary> private static void Up(semaphore mutex) { if (mutex == 0) { mutex++; } } /// <summary> /// 互斥信號量Up /// </summary> private static void Up(ref semaphore mutex) { if (mutex == 0) { mutex++; } } public static void Main(string[] args) { InitializePhilosopher(); for (int i = 0; i < NumOfPhilosopher; i++) { Thread thread = new Thread(PhilosopherRoutine); thread.Start(i); } Console.ReadKey(); } } }
參考資料
鄒恆明,《操作系統之哲學原理》,機械工業出版社