前言
我們使用加鎖機制來保證線程安全,但是如果過度地使用加鎖,則可能會導致死鎖。下面將介紹關於死鎖的相關知識以及我們在編寫程序時如何預防死鎖。
什么是死鎖
學習操作系統時,給出死鎖的定義為兩個或兩個以上的線程在執行過程中,由於競爭資源而造成的一種阻塞的現象,若無外力作用,它們都將無法推進下去。簡化一點說就是:一組相互競爭資源的線程因為互相等待,導致“永久”阻塞的現象。
下面我們通過一個轉賬例子來深入理解死鎖。
class Account {
private int balance;
// 轉賬
void transfer(Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
為了使以上轉賬方法transfer()不存在並發問題,很快地我們可以想使用Java的synchronized修飾transfer方法,於是代碼如下:
class Account {
private int balance;
// 轉賬
synchronized void transfer(Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
需要注意,這里我們使用的內置鎖是this
,這把鎖雖然可以保護我們自己的balance,卻不可以保護target的balance。使用我們上一篇介紹的鎖模型來描繪這個代碼就是下面這樣:(圖來自參考[1])
更具體來說,假設有 A、B、C 三個賬戶,余額都是 200 元,我們用兩個線程分別執行兩個轉賬操作:賬戶 A 轉給賬戶 B 100 元,賬戶 B 轉給賬戶 C 100 元,最后我們期望的結果應該是賬戶 A 的余額是 100 元,賬戶 B 的余額是 200 元, 賬戶 C 的余額是 300 元。
如果有兩個線程1和線程2,線程1 執行賬戶 A 轉賬戶 B 的操作,線程2執行賬戶 B 轉賬戶 C 的操作。這兩個線程分別運行在兩顆的CPU上,由於this
這個鎖只能保護自己的balance而不能保護別人的,線程 1 鎖定的是賬戶 A 的實例(A.this),而線程 2 鎖定的是賬戶 B 的實例(B.this),所以這兩個線程可以同時進入臨界區 transfer(),因此兩個線程沒有實現互斥。
出現可能的結果就為,兩個線程同時讀到賬戶B的余額為200元,導致最終賬戶 B 的余額可能是 300(線程 1 后於線程 2 寫 B.balance,線程 2 寫的 B.balance 值被線程 1 覆蓋),可能是 100(線程 1 先於線程 2 寫 B.balance,線程 1 寫的 B.balance 值被線程 2 覆蓋)。在這種情況下,賬戶B的余額就不會是 200。
並發轉賬示意圖(圖來自參考[1])
於是我們應該使用一個能夠覆蓋所有保護資源的鎖,如果還記得我們上一篇講synchronized修飾靜態方法時默認的鎖對象的話,那這里就很容易解決了。這個默認的鎖就是類的class對象。於是,我們就可以使用Account.class作為一個可以保護這個轉賬過程的鎖。
class Account {
private int balance;
// 轉賬
void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
這個方案雖然不存在並發問題,但是所有賬戶的轉賬操作都是串行的。現實世界中,賬戶 A 轉賬戶 B、賬戶 C 轉賬戶 D 這兩個轉賬操作現實世界里是可以並行的。較於實際情況來說,這個方案就顯得性能太差。
於是,我們盡量模仿現實世界的轉賬操作:
每個賬戶都有一個賬本,這些賬本都統一存放在文件架上。當轉賬A給賬戶B轉賬時,櫃員會去拿A賬本和B賬本做登記,此時櫃員在拿賬本時會遇到三種情況:
- 文件架上恰好有A賬本和B賬本,那就同時拿走;
- 如果文件架上只有A賬本和B賬本之一,那這個櫃員就先把文件架上有的賬本拿到手,同時等着其他櫃員把另外一個賬本送回來;
- A賬本和B賬本都沒有,那這個櫃員就等着兩個賬本都被送回來
在編程實現中,我們可以使用兩把鎖來實現這個過程。在 transfer() 方法內部,我們首先嘗試鎖定轉出賬戶 this(先把A賬本拿到手),然后嘗試鎖定轉入賬戶 target(再把B賬本拿到手),只有當兩者都成功時,才執行轉賬操作。
這個邏輯可以圖形化為下圖這個樣子,(圖來自參考[1]):
代碼如下:
class Account {
private int balance;
// 轉賬
void transfer(Account target, int amt){
// 鎖定轉出賬戶A
synchronized(this) {
// 鎖定轉入賬戶B
synchronized(target) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
經過這樣的優化后,賬戶 A 轉賬戶 B 和賬戶 C 轉賬戶 D 這兩個轉賬操作就可以並行了。
但是這樣卻會導致死鎖。例如情況:櫃員張三做賬戶A轉賬戶B的轉賬操作,櫃員李四做賬戶B轉賬戶C的轉賬操作。他們兩個同時操作,於是就會出現下面這種情形:(圖來自參考[1])
他倆會一直等待對方將賬本放到文件架上,造成一個一直僵持的局勢。
關於這種現象,我們還可以借助資源分配圖來可視化鎖的占用情況(資源分配圖是個有向圖,它可以描述資源和線程的狀態)。其中,資源用方形節點表示,線程用圓形節點表示;資源中的點指向線程的邊表示線程已經獲得該資源,線程指向資源的邊則表示線程請求資源,但尚未得到。(圖來自參考[1])
Java並發程序一旦死鎖,一般沒有特別好的方法,恢復應用程序的唯一方式就是中止並重啟。因此,我們要盡量避免死鎖的發生,最好不要產生死鎖。要知道如何才能做到不要產生死鎖,我們首先要知道什么條件會發生死鎖。
死鎖發生的四個必要條件
雖然進程在運行過程中,可能發生死鎖,但死鎖的發生也必須具備一定的條件,死鎖的發生必須具備以下四個必要條件:
- 互斥,共享資源 X 和 Y 只能被一個線程占用;
- 占有且等待,線程 T1 已經取得共享資源 X,在等待共享資源 Y 的時候,不釋放共享資源 X;
- 不可搶占,其他線程不能強行搶占線程 T1 占有的資源;
- 循環等待,線程 T1 等待線程 T2 占有的資源,線程 T2 等待線程 T1 占有的資源,就是循環等待。
破壞死鎖發生的條件預防死鎖
只有這四個條件都發生時才會出現死鎖,那么反過來,也就是說只要我們破壞其中一個,就可以成功預防死鎖的發生。
四個條件中我們不能破壞互斥,因為我們使用鎖目的就是保證資源被互斥訪問,於是我們就對其他三個條件進行破壞:
- 占用且等待:一次性申請所有的資源,這樣就不存在等待了。
- 不可搶占,占用部分資源的線程進一步申請其他資源時,如果申請不到,可以主動釋放它占有的資源。
- 循環等待,靠按序申請資源來預防。所謂按序申請,是指資源是有線性順序的,申請的時候可以先申請資源序號小的,再申請資源序號大的,這樣線性化申請后就不存在循環了。
下面我們使用這些方法去解決如上的死鎖問題。
破壞占用且等待條件
一次性申請完所有資源。我們設置一個管理員來管理賬本,櫃員同時申請需要的賬本,而管理員同時出他們需要的賬本。如果不能同時出借,則櫃員就需要等待。
“同時申請”:這個操作是一個臨界區,含有兩個操作,同時申請資源apply()和同時釋放資源free()。
class Allocator {
private List<Object> als = new ArrayList<>();
// 一次性申請所有資源
synchronized boolean apply( Object from, Object to){
if(als.contains(from) || als.contains(to)){ //from 或者 to賬戶被其他線程擁有
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
// 歸還資源
synchronized void free(Object from, Object to){
als.remove(from);
als.remove(to);
}
}
class Account {
// actr 應該為單例,只能由一個人來分配資源
private Allocator actr;
private int balance;
// 轉賬
void transfer(Account target, int amt){
// 一次性申請轉出賬戶和轉入賬戶,直到成功
while(!actr.apply(this, target)) //最好可以加個timeout避免一直循環
;
try{
// 鎖定轉出賬戶
synchronized(this){ //存在客戶對自己賬戶的操作
// 鎖定轉入賬戶
synchronized(target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
actr.free(this, target) //釋放資源
}
}
}
破壞不可搶占條件
破壞不搶占要能夠主動釋放它占有的資源,但synchronized是做不到的。原因為synchronized申請不到資源時,線程直接進入了阻塞狀態,而線程進入了阻塞狀態也就沒有辦法釋放它占有的資源了。不過SDK中的java.util.concurrent提供了Lock
解決這個問題。
支持定時的鎖
顯式使用Lock
類中的定時tryLock
功能來代替內置鎖機制,可以檢測死鎖和從死鎖中恢復過來。使用內置鎖的線程獲取不到鎖會被阻塞,而顯式鎖可以指定一個超時時限(Timeout),在等待超過該時間后tryLock就會返回一個失敗信息,也會釋放其擁有的資源。
破壞循環等待條件
破壞這個條件,需要對資源進行排序,然后按序申請資源。我們假設每個賬戶都有不同的屬性 id,這個 id 可以作為排序字段,申請的時候,我們可以按照從小到大的順序來申請。
比如下面代碼中,①~⑤處的代碼對轉出賬戶(this)和轉入賬戶(target)排序,然后按照序號從小到大的順序鎖定賬戶。這樣就不存在“循環”等待了。
class Account {
private int id;
private int balance;
// 轉賬
void transfer(Account target, int amt){
Account left = this // ①
Account right = target; // ②
if (this.id > target.id) { // ③
left = target; // ④
right = this; // ⑤
}
// 鎖定序號小的賬戶
synchronized(left){
// 鎖定序號大的賬戶
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
小結
記得學習操作系統時還有避免死鎖,其和預防死鎖的區別在於:預防死鎖是設法至少破壞產生死鎖的四個必要條件之一,嚴格地防止死鎖的出現,但是這也會使系統性能降低;而避免死鎖則不那么嚴格的限制產生死鎖的必要條件的存在,因為即使死鎖的必要條件存在,也不一定發生死鎖,死鎖避免是在系統運行過程中注意避免死鎖的最終發生。避免死鎖的經典算法就是銀行家算法,這里就不擴開介紹了。
還有一個避免出現死鎖的結論:如果所有線程以固定順序來獲得鎖,那么在程序中就不會出現鎖順序死鎖問題。查看參考[4]理解。
我們使用細粒度鎖鎖住多個資源時,要注意死鎖的產生。只有先嗅到死鎖的味道,才有我們的施展之地。
參考:
[1]極客時間專欄王寶令《Java並發編程實戰》
[2]Brian Goetz.Tim Peierls. et al.Java並發編程實戰[M].北京:機械工業出版社,2016
[3]iywwuyifan.避免死鎖和預防思索的區別.https://blog.csdn.net/masterchiefcc/article/details/83303813
[4]AddoilDan.死鎖面試題(什么是死鎖,產生死鎖的原因及必要條件).https://blog.csdn.net/hd12370/article/details/82814348