在這篇文章中,我們主要討論一下死鎖及其解決辦法。
概述
在上一篇文章中,我們討論了如何使用一個互斥鎖去保護多個資源,以銀行賬戶轉賬為例,當時給出的解決方法是基於Class對象創建互斥鎖。
這樣雖然解決了同步的問題,但是能在現實中使用嗎?答案是不可以,尤其是在高並發的情況下,原因是我們使用的互斥鎖的范圍太大,以轉賬為例,我們的做法會鎖定整個賬戶Class對象,這樣會導致轉賬操作只能串行進行,但是在實際場景中,大量的轉賬操作業務中的雙方是不相同的,直接在Class對象級別上加鎖是不能接受的。
那如果在對象實例級別上加鎖,使用細粒度鎖,會有什么問題?可能會發生死鎖。
我們接下來看一下造成死鎖的原因和可能的解決方案。
死鎖案例
什么是死鎖?
死鎖是指一組互相競爭資源的線程因互相等待,導致“永久”阻塞的現象。
一般來說,當我們使用細粒度鎖時,它在提升性能的同時,也可能會導致死鎖。
我們還是以銀行轉賬為例,來看一下死鎖是如何發生的。
首先,我們先定義個BankAccount對象,來存儲基本信息,代碼如下。
public class BankAccount {
private int id;
private double balance;
private String password;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
}
接下來,我們使用細粒度鎖來嘗試完成轉賬操作,代碼如下。
public class BankTransferDemo {
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
synchronized(sourceAccount) {
synchronized(targetAccount) {
if (sourceAccount.getBalance() > amount) {
System.out.println("Start transfer.");
System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
sourceAccount.setBalance(sourceAccount.getBalance() - amount);
targetAccount.setBalance(targetAccount.getBalance() + amount);
System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
}
}
}
}
}
我們用下面的代碼來做簡單測試。
public static void main(String[] args) throws InterruptedException {
BankAccount sourceAccount = new BankAccount();
sourceAccount.setId(1);
sourceAccount.setBalance(50000);
BankAccount targetAccount = new BankAccount();
targetAccount.setId(2);
targetAccount.setBalance(20000);
BankTransferDemo obj = new BankTransferDemo();
Thread t1 = new Thread(() ->{
for (int i = 0; i < 10000; i++) {
obj.transfer(sourceAccount, targetAccount, 1);
}
});
Thread t2 = new Thread(() ->{
for (int i = 0; i < 10000; i++) {
obj.transfer(targetAccount, sourceAccount, 1);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Finished.");
}
測試代碼中包含了2個線程,其中t1線程循環從sourceAccount向targetAccount轉賬,而t2線程會循環從targetAccount向sourceAccount轉賬。
從運行結果來看,t1線程中的循環在運行600次左右時,t2線程也創建好,開始循環轉賬了,這時就會發生死鎖,導致t1線程和t2線程都無法繼續執行。
我們可以用下面的資源分配圖來更直觀的描述死鎖。
死鎖的原因和預防
並發程序一旦死鎖,一般沒有特別好的辦法,很多時候我們只能重啟應用,因此,解決死鎖問題的最好辦法是規避死鎖。
我們先來看一下死鎖發生的條件,一個叫Coffman的牛人,於1971年在ACM Computing Surveys發表了一篇名為System Deadlocks的文章,他總結了只有以下四個條件全部滿足的情況下,才會發生死鎖:
- 互斥,共享資源X和Y只能被一個線程占用。
- 占有且等待,線程t1已經取得共享資源X,在等待共享資源Y的時候,不釋放共享資源X。
- 不可搶占,其他線程不能強行搶占線程t1占有的資源。
- 循環等待,線程t1等待線程t2占有的資源,線程t2等待線程t1占有的資源,就是循環等待。
通過上述描述,我們能夠推導出,只要破壞上面其中一個條件,就可以避免死鎖的發生。
但是第一個條件互斥,是不可以被破壞的,否則我們就沒有用鎖的必要了,那么我們來看如何破壞其他三個條件。
破壞占用且等待條件
如果要破壞占用且等待條件,我們可以嘗試一次性申請全部資源,這樣就不需要等待了。
在實現過程中,我們需要創建一個新的角色,負責同時申請和同時釋放全部資源,我們可以將其稱為Allocator。
我們來看一下具體的代碼實現。
public class Allocator {
private volatile static Allocator instance;
private Allocator() {}
public static Allocator getInstance() {
if (instance == null) {
synchronized(Allocator.class) {
if (instance == null) {
instance = new Allocator();
}
}
}
return instance;
}
private Set<Object> lockObjs = new HashSet<Object>();
public synchronized boolean apply(Object... objs) {
for (Object obj : objs) {
if (lockObjs.contains(obj)) {
return false;
}
}
for (Object obj : objs) {
lockObjs.add(obj);
}
return true;
}
public synchronized void free(Object... objs) {
for (Object obj : objs) {
if (lockObjs.contains(obj)) {
lockObjs.remove(obj);
}
}
}
}
Allocator是一個單例模式,它會使用一個Set對象來保存所有需要處理的資源,然后使用apply()和free()來同時鎖定或者釋放所有資源,它們會接收不固定參數。
我們來看一下新的transfer()方法應該怎么寫。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
Allocator allocator = Allocator.getInstance();
while(!allocator.apply(sourceAccount, targetAccount));
try {
synchronized(sourceAccount) {
synchronized(targetAccount) {
if (sourceAccount.getBalance() > amount) {
System.out.println("Start transfer.");
System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
sourceAccount.setBalance(sourceAccount.getBalance() - amount);
targetAccount.setBalance(targetAccount.getBalance() + amount);
System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
}
}
}
}
finally {
allocator.free(sourceAccount, targetAccount);
}
}
我們可以看到,transfer()方法中,首先獲取Allocator實例,然后調用apply(),傳入sourceAccount和targetAccount實例,請注意這里使用了while循環,即直到apply()返回true,才會退出循環,此時,Allocator已經鎖定了sourceAccount和targetAccount,接下來,我們使用synchronized關鍵字來鎖定sourceAccount和targetAccount,然后執行轉賬的業務邏輯。這里並不是必須要用synchronized,但是這樣做可以避免其他操作來影響轉賬操作,例如如果轉賬的過程中對sourceAccount實例進行取錢操作,如果不用synchronized,就有可能引發並發問題。
下面是測試代碼。
public static void main(String[] args) throws InterruptedException {
BankAccount sourceAccount = new BankAccount();
sourceAccount.setId(1);
sourceAccount.setBalance(50000);
BankAccount targetAccount = new BankAccount();
targetAccount.setId(2);
targetAccount.setBalance(20000);
BankTransferDemo obj = new BankTransferDemo();
Thread t1 = new Thread(() ->{
for (int i = 0; i < 10000; i++) {
obj.transfer(sourceAccount, targetAccount, 1);
}
});
Thread t2 = new Thread(() ->{
for (int i = 0; i < 10000; i++) {
obj.transfer(targetAccount, sourceAccount, 1);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Finished.");
}
程序是可以正常執行的,結果和我們預期一致。
在這里,我們需要保證鎖對象的不可變性,對於BankAccount對象來說,id屬性可以看做是其主鍵,id相同的BankAccount實例,從業務角度來說,指向的都是同一個賬戶,但是對於鎖對象來說,id相同的不同實例,會產生不同的鎖,從而引發並發問題。
我們來看下面修改后的測試代碼。
public static void main(String[] args) throws InterruptedException {
BankTransferDemo obj = new BankTransferDemo();
Thread t1 = new Thread(() ->{
for (int i = 0; i < 10000; i++) {
// 這里應該從后端獲取賬戶實例,此處只做演示。
BankAccount sourceAccount = new BankAccount();
sourceAccount.setId(1);
sourceAccount.setBalance(50000);
BankAccount targetAccount = new BankAccount();
targetAccount.setId(2);
targetAccount.setBalance(20000);
obj.transfer(sourceAccount, targetAccount, 1);
}
});
Thread t2 = new Thread(() ->{
for (int i = 0; i < 10000; i++) {
// 這里應該從后端獲取賬戶實例,此處只做演示。
BankAccount sourceAccount = new BankAccount();
sourceAccount.setId(1);
sourceAccount.setBalance(50000);
BankAccount targetAccount = new BankAccount();
targetAccount.setId(2);
targetAccount.setBalance(20000);
obj.transfer(targetAccount, sourceAccount, 1);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Finished.");
}
上述代碼中,每次轉賬都創建新的BankAccount實例,然后將其傳入Allocator,這樣做,是不能夠正常處理的,因為每次使用的互斥鎖都作用在不同的實例上,這一點,需要特別注意。
破壞不可搶占條件
破壞不可搶占條件很簡單,解決的關鍵在於能夠主動釋放它占有的資源,但是synchronized是不能做到這一點的。
synchronized申請資源的時候,如果申請失敗,線程會直接進入阻塞狀態,什么都不能做,已經鎖定的資源也無法釋放。
我們可以使用java.util.concurrent包中的Lock對象來實現這一點,相關代碼如下。
private Lock lock = new ReentrantLock();
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
try {
lock.lock();
if (sourceAccount.getBalance() > amount) {
System.out.println("Start transfer.");
System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
sourceAccount.setBalance(sourceAccount.getBalance() - amount);
targetAccount.setBalance(targetAccount.getBalance() + amount);
System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
}
}
finally {
lock.unlock();
}
}
破壞循環條件
破壞循環條件,需要對資源進行排序,然后按序申請資源。
我們來看下面的代碼。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
BankAccount left = sourceAccount;
BankAccount right = targetAccount;
if (sourceAccount.getId() > targetAccount.getId()) {
left = targetAccount;
right = sourceAccount;
}
synchronized(left) {
synchronized(right) {
if (sourceAccount.getBalance() > amount) {
System.out.println("Start transfer.");
System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
sourceAccount.setBalance(sourceAccount.getBalance() - amount);
targetAccount.setBalance(targetAccount.getBalance() + amount);
System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
}
}
}
}
在這里,我們假設BankAccount中的id
是主鍵,我們按照id
對sourceAccount和targetAccount進行排序,之后按照id
從小到大申請資源,這樣就不會有死鎖發生了。
我們在解決並發問題的時候,可能會有多種方式,我們需要評估一下各個解決方案,從中選擇一個成本最低的方案。
對於我們一直談論的轉賬示例,破壞循環條件可能是一個比較好的解決方法。
使用等待-通知機制
我們上面在破壞占用且等待條件時,使用了如下的死循環:
while(!allocator.apply(sourceAccount, targetAccount));
在並發量不高的情況下,這樣寫沒有問題,但是在高並發的情況下,這樣寫可能需要循環太多次才能拿到鎖,太消耗CPU了,屬於蠻干型。
在這種情況下,一種合理的方案是:如果線程要求的條件不滿足,那么線程阻塞自己,進入等待狀態,當線程要求的條件滿足后,通知等待的線程重新執行,這里線程阻塞就避免了循環消耗CPU的問題。
這就是我們要討論的等待-通知機制。
Java中的等待-通知機制
Java中的等待-通知機制流程是怎樣的?
線程首先獲取互斥鎖,當線程要求的條件不滿足時,釋放互斥鎖,進入等待狀態;當要求的條件滿足時,通知等待的線程,重新獲取互斥鎖。
Java使用synchronized關鍵字配合wait()、notify()、notifyAll()三個方法實現等待-通知機制。
在並發程序中,當一個線程進入臨界區后,由於某些條件沒有滿足,需要進入等待狀態,Java對象的wait()方法能夠實現這一點。當線程要求的條件滿足時,Java對象的notify()和notifyAll()方法就可以通知等待的線程,它會告訴線程,你需要的條件曾經滿足過,之所以說曾經,是因為notify()只能保證在通知的那一時刻,條件是滿足的,而被通知線程的執行時刻和通知時刻一般不會重合,所以在線程開始執行的時候,可能條件又不滿足了。
另外需要注意,被通知的線程重新執行時,還需要獲取互斥鎖,因為之前在調用wait()方法時,互斥鎖已經被釋放了。
wait()、notify()和notifyAll()三個方法能夠被調用的前提是已經獲取了響應的互斥鎖,所以這三個方法都是在synchronized{}內部被調用的。
下面我們來看一下修改后的Allocator,其中apply()和free()方法的代碼如下。
public synchronized void apply(Object... objs) {
for (Object obj : objs) {
while (lockObjs.contains(obj)) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
for (Object obj : objs) {
lockObjs.add(obj);
}
}
public synchronized void free(Object... objs) {
for (Object obj : objs) {
if (lockObjs.contains(obj)) {
lockObjs.remove(obj);
}
}
this.notifyAll();
}
對應的transfer()方法的代碼如下。
public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
Allocator allocator = Allocator.getInstance();
allocator.apply(sourceAccount, targetAccount);
try {
synchronized(sourceAccount) {
synchronized(targetAccount) {
if (sourceAccount.getBalance() > amount) {
System.out.println("Start transfer.");
System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
sourceAccount.setBalance(sourceAccount.getBalance() - amount);
targetAccount.setBalance(targetAccount.getBalance() + amount);
System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
}
}
}
}
finally {
allocator.free(sourceAccount, targetAccount);
}
}
運行結果和我們期望是一致的。
條件曾經滿足
在上述代碼中,我們可以發現,apply()方法中的判斷條件之前是if,現在改成了while, while (lockObjs.contains(obj))
,這樣做可以解決條件曾經滿足的問題。
因為當wait()返回時,有可能條件已經發生了變化,曾經條件滿足,但是現在已經不滿足了,所以要重新檢驗條件是否滿足。
這是一種范式,是一種經典的做法。
notify() vs notifyAll()
notify()和notifyAll()有什么區別?
notify()會隨機的通知等待隊列中的一個線程, 而notifyAll()會通知等待隊列中的所有線程。
我們盡量使用notifyAll()方法,因為notify()可能會導致某些線程永遠不會被通知到。
假設我們有一個實例,它有資源 A、B、C、D,我們使用實例對象來創建互斥鎖。
- 線程t1申請到了A、B
- 線程t2申請到了C、D
- 線程t3試圖申請A、B,失敗,進入等待隊列
- 線程t4試圖申請C、D,失敗,進入等待隊列
- 此時,線程t1執行結束,釋放鎖
- 線程t1調用實例的notify()來通知等待隊列中的線程,有可能被通知的是線程t4,但線程t4申請的是C、D還被線程t2占用,所以線程t4只能繼續等待
- 此時,線程t2執行結束,釋放鎖
- 線程t2調用實例的notify()來通知等待隊列中的線程,t3或者t4只能有1個被喚醒並正常執行,另外1個則再也沒有機會被喚醒
wait()和sleep()的區別
wait()方法與sleep()方法的不同之處在於,wait()方法會釋放對象的“鎖標志”。當調用某一對象的wait()方法后,會使當前線程暫停執行,並將當前線程放入對象等待池中,直到調用了notify()方法后,將從對象等待池中移出任意一個線程並放入鎖標志等待池中,只有鎖標志等待池中的線程可以獲取鎖標志,它們隨時准備爭奪鎖的擁有權。當調用了某個對象的notifyAll()方法,會將對象等待池中的所有線程都移動到該對象的鎖標志等待池。
sleep()方法需要指定等待的時間,它可以讓當前正在執行的線程在指定的時間內暫停執行,進入阻塞狀態,該方法既可以讓其他同優先級或者高優先級的線程得到執行的機會,也可以讓低優先級的線程得到執行機會。但是sleep()方法不會釋放“鎖標志”,也就是說如果有synchronized同步塊,其他線程仍然不能訪問共享數據。
總結一下,wait()和sleep()區別如下。
- wait()釋放資源,sleep()不釋放資源
- wait()需要被喚醒,sleep()不需要
- wait()是object頂級父類的方法,sleep()則是Thread的方法
wait()和sleep()都會讓渡CPU執行時間,等待再次調度!