Java並發編程實戰(4)- 死鎖


在這篇文章中,我們主要討論一下死鎖及其解決辦法。

概述

在上一篇文章中,我們討論了如何使用一個互斥鎖去保護多個資源,以銀行賬戶轉賬為例,當時給出的解決方法是基於Class對象創建互斥鎖。

這樣雖然解決了同步的問題,但是能在現實中使用嗎?答案是不可以,尤其是在高並發的情況下,原因是我們使用的互斥鎖的范圍太大,以轉賬為例,我們的做法會鎖定整個賬戶Class對象,這樣會導致轉賬操作只能串行進行,但是在實際場景中,大量的轉賬操作業務中的雙方是不相同的,直接在Class對象級別上加鎖是不能接受的。

那如果在對象實例級別上加鎖,使用細粒度鎖,會有什么問題?可能會發生死鎖。

我們接下來看一下造成死鎖的原因和可能的解決方案。

死鎖案例

什么是死鎖?

死鎖是指一組互相競爭資源的線程因互相等待,導致“永久”阻塞的現象。

一般來說,當我們使用細粒度鎖時,它在提升性能的同時,也可能會導致死鎖。

我們還是以銀行轉賬為例,來看一下死鎖是如何發生的。

首先,我們先定義個BankAccount對象,來存儲基本信息,代碼如下。

public class BankAccount {
	private int id;
	private double balance;
	private String password;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public double getBalance() {
		return balance;
	}
	public void setBalance(double balance) {
		this.balance = balance;
	}
}

接下來,我們使用細粒度鎖來嘗試完成轉賬操作,代碼如下。

public class BankTransferDemo {
	
	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
}

我們用下面的代碼來做簡單測試。

	public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

測試代碼中包含了2個線程,其中t1線程循環從sourceAccount向targetAccount轉賬,而t2線程會循環從targetAccount向sourceAccount轉賬。

從運行結果來看,t1線程中的循環在運行600次左右時,t2線程也創建好,開始循環轉賬了,這時就會發生死鎖,導致t1線程和t2線程都無法繼續執行。

我們可以用下面的資源分配圖來更直觀的描述死鎖。

死鎖的原因和預防

並發程序一旦死鎖,一般沒有特別好的辦法,很多時候我們只能重啟應用,因此,解決死鎖問題的最好辦法是規避死鎖。

我們先來看一下死鎖發生的條件,一個叫Coffman的牛人,於1971年在ACM Computing Surveys發表了一篇名為System Deadlocks的文章,他總結了只有以下四個條件全部滿足的情況下,才會發生死鎖:

  • 互斥,共享資源X和Y只能被一個線程占用。
  • 占有且等待,線程t1已經取得共享資源X,在等待共享資源Y的時候,不釋放共享資源X。
  • 不可搶占,其他線程不能強行搶占線程t1占有的資源。
  • 循環等待,線程t1等待線程t2占有的資源,線程t2等待線程t1占有的資源,就是循環等待。

通過上述描述,我們能夠推導出,只要破壞上面其中一個條件,就可以避免死鎖的發生。

但是第一個條件互斥,是不可以被破壞的,否則我們就沒有用鎖的必要了,那么我們來看如何破壞其他三個條件。

破壞占用且等待條件

如果要破壞占用且等待條件,我們可以嘗試一次性申請全部資源,這樣就不需要等待了。

在實現過程中,我們需要創建一個新的角色,負責同時申請和同時釋放全部資源,我們可以將其稱為Allocator。

我們來看一下具體的代碼實現。

public class Allocator {
	
	private volatile static Allocator instance;
	
	private Allocator() {}
	
	public static Allocator getInstance() {
		if (instance == null) {
			synchronized(Allocator.class) {
				if (instance == null) {
					instance = new Allocator();
				}
			}
		}
		
		return instance;
	}
	
	private Set<Object> lockObjs = new HashSet<Object>();
	
	public synchronized boolean apply(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				return false;
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
		
		return true;
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
	}
}

Allocator是一個單例模式,它會使用一個Set對象來保存所有需要處理的資源,然后使用apply()和free()來同時鎖定或者釋放所有資源,它們會接收不固定參數。

我們來看一下新的transfer()方法應該怎么寫。

	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		Allocator allocator = Allocator.getInstance();
		while(!allocator.apply(sourceAccount, targetAccount));
		try {
			synchronized(sourceAccount) {
				synchronized(targetAccount) {
					if (sourceAccount.getBalance() > amount) {
						System.out.println("Start transfer.");
						System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
						sourceAccount.setBalance(sourceAccount.getBalance() - amount);
						targetAccount.setBalance(targetAccount.getBalance() + amount);
						System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					}
				}
			}
		}
		finally {
			allocator.free(sourceAccount, targetAccount);
		}
	}

我們可以看到,transfer()方法中,首先獲取Allocator實例,然后調用apply(),傳入sourceAccount和targetAccount實例,請注意這里使用了while循環,即直到apply()返回true,才會退出循環,此時,Allocator已經鎖定了sourceAccount和targetAccount,接下來,我們使用synchronized關鍵字來鎖定sourceAccount和targetAccount,然后執行轉賬的業務邏輯。這里並不是必須要用synchronized,但是這樣做可以避免其他操作來影響轉賬操作,例如如果轉賬的過程中對sourceAccount實例進行取錢操作,如果不用synchronized,就有可能引發並發問題。

下面是測試代碼。

	public static void main(String[] args) throws InterruptedException {
		BankAccount sourceAccount = new BankAccount();
		sourceAccount.setId(1);
		sourceAccount.setBalance(50000);
		
		BankAccount targetAccount = new BankAccount();
		targetAccount.setId(2);
		targetAccount.setBalance(20000);
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

程序是可以正常執行的,結果和我們預期一致。

在這里,我們需要保證鎖對象的不可變性,對於BankAccount對象來說,id屬性可以看做是其主鍵,id相同的BankAccount實例,從業務角度來說,指向的都是同一個賬戶,但是對於鎖對象來說,id相同的不同實例,會產生不同的鎖,從而引發並發問題。

我們來看下面修改后的測試代碼。

public static void main(String[] args) throws InterruptedException {
		
		
		BankTransferDemo obj = new BankTransferDemo();
		
		Thread t1 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 這里應該從后端獲取賬戶實例,此處只做演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(sourceAccount, targetAccount, 1);
			}
		});
		
		Thread t2 = new Thread(() ->{
			for (int i = 0; i < 10000; i++) {
				// 這里應該從后端獲取賬戶實例,此處只做演示。
				BankAccount sourceAccount = new BankAccount();
				sourceAccount.setId(1);
				sourceAccount.setBalance(50000);
				
				BankAccount targetAccount = new BankAccount();
				targetAccount.setId(2);
				targetAccount.setBalance(20000);
				obj.transfer(targetAccount, sourceAccount, 1);
			}
		});
		
		t1.start();
		t2.start();
		
		t1.join();
		t2.join();
		
		System.out.println("Finished.");
	}

上述代碼中,每次轉賬都創建新的BankAccount實例,然后將其傳入Allocator,這樣做,是不能夠正常處理的,因為每次使用的互斥鎖都作用在不同的實例上,這一點,需要特別注意。

破壞不可搶占條件

破壞不可搶占條件很簡單,解決的關鍵在於能夠主動釋放它占有的資源,但是synchronized是不能做到這一點的。

synchronized申請資源的時候,如果申請失敗,線程會直接進入阻塞狀態,什么都不能做,已經鎖定的資源也無法釋放。

我們可以使用java.util.concurrent包中的Lock對象來實現這一點,相關代碼如下。

    private Lock lock = new ReentrantLock();
	
    public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
        try {
            lock.lock();
            if (sourceAccount.getBalance() > amount) {
                System.out.println("Start transfer.");
                System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
                sourceAccount.setBalance(sourceAccount.getBalance() - amount);
                targetAccount.setBalance(targetAccount.getBalance() + amount);
                System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
            }
        }
        finally {
            lock.unlock();
        }
    }

破壞循環條件

破壞循環條件,需要對資源進行排序,然后按序申請資源。

我們來看下面的代碼。

	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
		BankAccount left = sourceAccount;
		BankAccount right = targetAccount;
		if (sourceAccount.getId() > targetAccount.getId()) {
			left = targetAccount;
			right = sourceAccount;
		}
		synchronized(left) {
			synchronized(right) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}

在這里,我們假設BankAccount中的id是主鍵,我們按照id對sourceAccount和targetAccount進行排序,之后按照id從小到大申請資源,這樣就不會有死鎖發生了。

我們在解決並發問題的時候,可能會有多種方式,我們需要評估一下各個解決方案,從中選擇一個成本最低的方案。

對於我們一直談論的轉賬示例,破壞循環條件可能是一個比較好的解決方法。

使用等待-通知機制

我們上面在破壞占用且等待條件時,使用了如下的死循環:

    while(!allocator.apply(sourceAccount, targetAccount));

在並發量不高的情況下,這樣寫沒有問題,但是在高並發的情況下,這樣寫可能需要循環太多次才能拿到鎖,太消耗CPU了,屬於蠻干型。

在這種情況下,一種合理的方案是:如果線程要求的條件不滿足,那么線程阻塞自己,進入等待狀態,當線程要求的條件滿足后,通知等待的線程重新執行,這里線程阻塞就避免了循環消耗CPU的問題。

這就是我們要討論的等待-通知機制。

Java中的等待-通知機制

Java中的等待-通知機制流程是怎樣的?

線程首先獲取互斥鎖,當線程要求的條件不滿足時,釋放互斥鎖,進入等待狀態;當要求的條件滿足時,通知等待的線程,重新獲取互斥鎖。

Java使用synchronized關鍵字配合wait()、notify()、notifyAll()三個方法實現等待-通知機制。

在並發程序中,當一個線程進入臨界區后,由於某些條件沒有滿足,需要進入等待狀態,Java對象的wait()方法能夠實現這一點。當線程要求的條件滿足時,Java對象的notify()和notifyAll()方法就可以通知等待的線程,它會告訴線程,你需要的條件曾經滿足過,之所以說曾經,是因為notify()只能保證在通知的那一時刻,條件是滿足的,而被通知線程的執行時刻和通知時刻一般不會重合,所以在線程開始執行的時候,可能條件又不滿足了。

另外需要注意,被通知的線程重新執行時,還需要獲取互斥鎖,因為之前在調用wait()方法時,互斥鎖已經被釋放了。

wait()、notify()和notifyAll()三個方法能夠被調用的前提是已經獲取了響應的互斥鎖,所以這三個方法都是在synchronized{}內部被調用的。

下面我們來看一下修改后的Allocator,其中apply()和free()方法的代碼如下。

	public synchronized void apply(Object... objs) {
		for (Object obj : objs) {
			while (lockObjs.contains(obj)) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					System.out.println(e.getMessage());
				}
			}
		}
		for (Object obj : objs) {
			lockObjs.add(obj);
		}
	}
	
	public synchronized void free(Object... objs) {
		for (Object obj : objs) {
			if (lockObjs.contains(obj)) {
				lockObjs.remove(obj);
			}
		}
		this.notifyAll();
	}

對應的transfer()方法的代碼如下。

	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {
	Allocator allocator = Allocator.getInstance();
	allocator.apply(sourceAccount, targetAccount);
	try {
		synchronized(sourceAccount) {
			synchronized(targetAccount) {
				if (sourceAccount.getBalance() > amount) {
					System.out.println("Start transfer.");
					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
					sourceAccount.setBalance(sourceAccount.getBalance() - amount);
					targetAccount.setBalance(targetAccount.getBalance() + amount);
					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));
				}
			}
		}
	}
	finally {
		allocator.free(sourceAccount, targetAccount);
	}
}

運行結果和我們期望是一致的。

條件曾經滿足

在上述代碼中,我們可以發現,apply()方法中的判斷條件之前是if,現在改成了while, while (lockObjs.contains(obj)),這樣做可以解決條件曾經滿足的問題。

因為當wait()返回時,有可能條件已經發生了變化,曾經條件滿足,但是現在已經不滿足了,所以要重新檢驗條件是否滿足。

這是一種范式,是一種經典的做法。

notify() vs notifyAll()

notify()和notifyAll()有什么區別?

notify()會隨機的通知等待隊列中的一個線程, 而notifyAll()會通知等待隊列中的所有線程。

我們盡量使用notifyAll()方法,因為notify()可能會導致某些線程永遠不會被通知到。

假設我們有一個實例,它有資源 A、B、C、D,我們使用實例對象來創建互斥鎖。

  • 線程t1申請到了A、B
  • 線程t2申請到了C、D
  • 線程t3試圖申請A、B,失敗,進入等待隊列
  • 線程t4試圖申請C、D,失敗,進入等待隊列
  • 此時,線程t1執行結束,釋放鎖
  • 線程t1調用實例的notify()來通知等待隊列中的線程,有可能被通知的是線程t4,但線程t4申請的是C、D還被線程t2占用,所以線程t4只能繼續等待
  • 此時,線程t2執行結束,釋放鎖
  • 線程t2調用實例的notify()來通知等待隊列中的線程,t3或者t4只能有1個被喚醒並正常執行,另外1個則再也沒有機會被喚醒

wait()和sleep()的區別

wait()方法與sleep()方法的不同之處在於,wait()方法會釋放對象的“鎖標志”。當調用某一對象的wait()方法后,會使當前線程暫停執行,並將當前線程放入對象等待池中,直到調用了notify()方法后,將從對象等待池中移出任意一個線程並放入鎖標志等待池中,只有鎖標志等待池中的線程可以獲取鎖標志,它們隨時准備爭奪鎖的擁有權。當調用了某個對象的notifyAll()方法,會將對象等待池中的所有線程都移動到該對象的鎖標志等待池。

sleep()方法需要指定等待的時間,它可以讓當前正在執行的線程在指定的時間內暫停執行,進入阻塞狀態,該方法既可以讓其他同優先級或者高優先級的線程得到執行的機會,也可以讓低優先級的線程得到執行機會。但是sleep()方法不會釋放“鎖標志”,也就是說如果有synchronized同步塊,其他線程仍然不能訪問共享數據。

總結一下,wait()和sleep()區別如下。

  • wait()釋放資源,sleep()不釋放資源
  • wait()需要被喚醒,sleep()不需要
  • wait()是object頂級父類的方法,sleep()則是Thread的方法

wait()和sleep()都會讓渡CPU執行時間,等待再次調度!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM