Java同步塊
Java 同步塊(synchronized block)用來標記方法或者代碼塊是同步的。Java同步塊用來避免競爭。本文介紹以下內容:
- Java同步關鍵字(synchronzied)
- 同步的實例方法
- 同步的靜態方法
- 實例方法中的同步塊
- 靜態方法中的同步塊
- Java同步示例
- Java並發工具集
Java 同步關鍵字(synchronized)
Java中的同步塊用synchronized標記。同步塊在Java中是同步在某個對象上。所有在一個對象上的同步塊在同時只能被一個線程進入並執行操作。所有其他等待進入該同步塊的線程將被阻塞,直到執行該同步塊中的線程退出。
synchronized關鍵可以用在四種不同的代碼塊中:
- 實例方法
- 靜態方法
- 實例方法中的代碼塊
- 靜態方法中的代碼塊
上述代碼塊都同步在不同對象上。實際需要哪種同步塊視具體情況而定。
同步實例方法
下面是一個同步的實例方法:
- public synchronized void add(int value){
- this.count += value;
- }
注意在方法聲明中同步(synchronized )關鍵字。這告訴Java該方法是同步的。
Java同步實例方法是同步在擁有該方法的對象上。這樣,對每個實例其同步方法都同步在不同的對象上,即該方法所屬的實例。只有一個線程能夠在同步實例方法中運行。如果有多個實例存在,那么每個實例上一次只能有一個線程在同步實例方法中執行操作。一個實例一個線程。
同步靜態方法
同步靜態方法和同步實例方法一樣,也使用synchronized 關鍵字。Java同步靜態方法如下示例:
- public static synchronized void add(int value){
- count += value;
- }
同樣,這里synchronized關鍵字告訴Java這個方法是同步的。
同步靜態方法是指同步在該方法所在的類對象上。因為在Java虛擬機中一個類只能對應一個類對象,所以同時只允許一個線程執行同一個類中的靜態同步方法。
對於不同類中的靜態同步方法,每個類中一次只能有一個線程執行類中的靜態同步方法而無需等待。不管類中的哪個靜態同步方法被調用,一個類只能由一個線程同時執行。
實例方法中的同步塊
有時你不需要同步整個方法,而是同步方法中的一部分。Java可以對方法的一部分進行同步。
在非同步的Java方法中的同步塊的例子如下所示:
- public void add(int value){
- synchronized(this){
- this.count += value;
- }
- }
示例使用Java同步塊構造器來標記一塊代碼是同步的。該代碼在執行時和同步方法一樣。
注意Java同步塊構造器用括號將對象括起來。在上例中,使用了“this”,即為調用add方法的實例本身。在同步構造器中用括號括起來的對象叫做監視器對象。上述代碼使用監視器對象同步,同步實例方法使用調用方法本身的實例作為監視器對象。
在一個監視器對象上一次只能有一個線程在同步塊內執行。
下面兩個例子都同步在他們所調用的實例對象上,因此他們在同步的執行效果上是等效的。
- public class MyClass {
- public synchronized void log1(String msg1, String msg2){
- log.writeln(msg1);
- log.writeln(msg2);
- }
- public void log2(String msg1, String msg2){
- synchronized(this){
- log.writeln(msg1);
- log.writeln(msg2);
- }
- }
- }
在上例中,每次只能有一個線程能夠在兩個同步塊中的任意一個內執行。
如果第二個同步塊不是同步在this實例對象上,那么兩個方法可以被一個線程同時執行。
靜態方法中的同步塊
和上面類似,下面是兩個靜態方法同步的例子。這些方法同步在該方法所屬的類對象上。
- public class MyClass {
- public static synchronized void log1(String msg1, String msg2){
- log.writeln(msg1);
- log.writeln(msg2);
- }
- public static void log2(String msg1, String msg2){
- synchronized(MyClass.class){
- log.writeln(msg1);
- log.writeln(msg2);
- }
- }
- }
這兩個方法不允許同時被一個線程訪問。
如果第二個同步塊不是同步在MyClass.class這個對象上。那么這兩個方法可以同時被一個線程訪問。
Java同步實例
在下面例子中,啟動了兩個線程,都調用Counter類的同一個add方法。因為同步在該方法所屬的實例上,所以同時只能有一個線程訪問該方法。
- public class Counter{
- long count = 0;
- public synchronized void add(long value){
- this.count += value;
- }
- }
- public class CounterThread extends Thread{
- protected Counter counter = null;
- public CounterThread(Counter counter){
- this.counter = counter;
- }
- public void run() {
- for(int i=0; i<10; i++){
- counter.add(i);
- }
- }
- }
- public class Example {
- public static void main(String[] args){
- Counter counter = new Counter();
- Thread threadA = new CounterThread(counter);
- Thread threadB = new CounterThread(counter);
- threadA.start();
- threadB.start();
- }
- }
創建了兩個線程。他們的構造器引用同一個Counter實例。Counter.add方法是同步在實例上,是因為add方法是實例方法並且被標記上synchronized關鍵字。因此每次只允許一個線程調用該方法。另外一個線程必須要等到第一個線程退出add()方法時,才能繼續執行方法。
如果兩個線程引用了兩個不同的Counter實例,那么他們可以同時調用add()方法。這些方法調用了不同的對象,因此這些方法也就同步在不同的對象上。這些方法調用將不會被阻塞。如下面這個例子所示:
- public class Example {
- public static void main(String[] args){
- Counter counterA = new Counter();
- Counter counterB = new Counter();
- Thread threadA = new CounterThread(counterA);
- Thread threadB = new CounterThread(counterB);
- threadA.start();
- threadB.start();
- }
- }
注意這兩個線程,threadA和threadB,不再引用同一個counter實例。CounterA和counterB的add方法同步在他們所屬的對象上。調用counterA的add方法將不會阻塞調用counterB的add方法。
Java並發工具集
synchronized機制是Java中對多個線程共享的對象進行同步訪問的第一個機制。synchronized機制並不很高級。因此Java 5中引入了一組並發工具集API,以幫助開發實現更好的並發控制。
線程通信
線程通信的目標是使線程間能夠互相發送信號。另一方面,線程通信使線程能夠等待其他線程的信號。
例如,線程B可以等待線程A的一個信號,這個信號會通知線程B數據已經准備好了。本文將講解以下幾個JAVA線程間通信的主題:
1、通過共享對象通信
2、忙等待
3、wait(),notify()和notifyAll()
4、丟失的信號
5、假喚醒
6、多線程等待相同信號
7、不要對常量字符串或全局對象調用wait()
1、通過共享對象通信
線程間發送信號的一個簡單方式是在共享對象的變量里設置信號值。線程A在一個同步塊里設置boolean型成員變量hasDataToProcess為true,線程B也在同步塊里讀取hasDataToProcess這個成員變量。這個簡單的例子使用了一個持有信號的對象,並提供了set和check方法:
- public class MySignal{
- protected boolean hasDataToProcess = false;
- public synchronized boolean hasDataToProcess(){
- return this.hasDataToProcess;
- }
- public synchronized void setHasDataToProcess(boolean hasData){
- this.hasDataToProcess = hasData;
- }
- }
線程A和B必須獲得指向一個MySignal共享實例的引用,以便進行通信。如果它們持有的引用指向不同的MySingal實例,那么彼此將不能檢測到對方的信號。需要處理的數據可以存放在一個共享緩存區里,它和MySignal實例是分開存放的。
2、忙等待(Busy Wait)
准備處理數據的線程B正在等待數據變為可用。換句話說,它在等待線程A的一個信號,這個信號使hasDataToProcess()返回true。線程B運行在一個循環里,以等待這個信號:
- protected MySignal sharedSignal = ...
- ...
- while(!sharedSignal.hasDataToProcess()){
- //do nothing... busy waiting
- }
3、wait(),notify()和notifyAll()
忙等待沒有對運行等待線程的CPU進行有效的利用,除非平均等待時間非常短。否則,讓等待線程進入睡眠或者非運行狀態更為明智,直到它接收到它等待的信號。
Java有一個內建的等待機制來允許線程在等待信號的時候變為非運行狀態。java.lang.Object 類定義了三個方法,wait()、notify()和notifyAll()來實現這個等待機制。
一個線程一旦調用了任意對象的wait()方法,就會變為非運行狀態,直到另一個線程調用了同一個對象的notify()方法。為了調用wait()或者notify(),線程必須先獲得那個對象的鎖。也就是說,線程必須在同步塊里調用wait()或者notify()。以下是MySingal的修改版本——使用了wait()和notify()的MyWaitNotify:
- public class MonitorObject{
- }
- public class MyWaitNotify{
- MonitorObject myMonitorObject = new MonitorObject();
- public void doWait(){
- synchronized(myMonitorObject){
- try{
- myMonitorObject.wait();
- } catch(InterruptedException e){...}
- }
- }
- public void doNotify(){
- synchronized(myMonitorObject){
- myMonitorObject.notify();
- }
- }
- }
等待線程將調用doWait(),而喚醒線程將調用doNotify()。當一個線程調用一個對象的notify()方法,正在等待該對象的所有線程中將有一個線程被喚醒並允許執行(校注:這個將被喚醒的線程是隨機的,不可以指定喚醒哪個線程)。同時也提供了一個notifyAll()方法來喚醒正在等待一個給定對象的所有線程。
如你所見,不管是等待線程還是喚醒線程都在同步塊里調用wait()和notify()。這是強制性的!一個線程如果沒有持有對象鎖,將不能調用wait(),notify()或者notifyAll()。如果調用了,會拋出IllegalMonitorStateException異常。
(校注:JVM是這么實現的,當你調用wait時候它首先要檢查下當前線程是否是鎖的擁有者,不是則拋出IllegalMonitorStateExcept,參考JVM源碼的 1422行。)
但是,這怎么可能?等待線程在同步塊里面執行的時候,不是一直持有監視器對象(myMonitor對象)的鎖嗎?等待線程不能阻止喚醒線程進入doNotify()的同步塊嗎?答案是:的確不能。一旦線程調用了wait()方法,它就釋放了所持有的監視器對象上的鎖。這將允許其他線程也可以調用wait()或者notify()。
一旦一個線程被喚醒,不能立刻就退出wait()的方法調用,直到調用notify()的線程退出了它自己的同步塊。換句話說:被喚醒的線程必須重新獲得監視器對象的鎖,才可以退出wait()的方法調用,因為wait方法調用運行在同步塊里面。如果多個線程被notifyAll()喚醒,那么在同一時刻將只有一個線程可以退出wait()方法,因為每個線程在退出wait()前必須獲得監視器對象的鎖。
4、丟失的信號(Missed Signals)
notify()和notifyAll()方法不會保存調用它們的方法,因為當這兩個方法被調用時,有可能沒有線程處於等待狀態。通知信號過后便丟棄了。因此,如果一個線程先於被通知線程調用wait()前調用了notify(),等待的線程將錯過這個信號。這可能是也可能不是個問題。不過,在某些情況下,這可能使等待線程永遠在等待,不再醒來,因為線程錯過了喚醒信號。
為了避免丟失信號,必須把它們保存在信號類里。在MyWaitNotify的例子中,通知信號應被存儲在MyWaitNotify實例的一個成員變量里。以下是MyWaitNotify的修改版本:
- public class MyWaitNotify2{
- MonitorObject myMonitorObject = new MonitorObject();
- boolean wasSignalled = false;
- public void doWait(){
- synchronized(myMonitorObject){
- if(!wasSignalled){
- try{
- myMonitorObject.wait();
- } catch(InterruptedException e){...}
- }
- //clear signal and continue running.
- wasSignalled = false;
- }
- }
- public void doNotify(){
- synchronized(myMonitorObject){
- wasSignalled = true;
- myMonitorObject.notify();
- }
- }
- }
留意doNotify()方法在調用notify()前把wasSignalled變量設為true。同時,留意doWait()方法在調用wait()前會檢查wasSignalled變量。事實上,如果沒有信號在前一次doWait()調用和這次doWait()調用之間的時間段里被接收到,它將只調用wait()。
(校注:為了避免信號丟失, 用一個變量來保存是否被通知過。在notify前,設置自己已經被通知過。在wait后,設置自己沒有被通知過,需要等待通知。)
5、假喚醒
由於莫名其妙的原因,線程有可能在沒有調用過notify()和notifyAll()的情況下醒來。這就是所謂的假喚醒(spurious wakeups)。無端端地醒過來了。
如果在MyWaitNotify2的doWait()方法里發生了假喚醒,等待線程即使沒有收到正確的信號,也能夠執行后續的操作。這可能導致你的應用程序出現嚴重問題。
為了防止假喚醒,保存信號的成員變量將在一個while循環里接受檢查,而不是在if表達式里。這樣的一個while循環叫做自旋鎖(校注:這種做法要慎重,目前的JVM實現自旋會消耗CPU,如果長時間不調用doNotify方法,doWait方法會一直自旋,CPU會消耗太大)。被喚醒的線程會自旋直到自旋鎖(while循環)里的條件變為false。以下MyWaitNotify2的修改版本展示了這點:
- public class MyWaitNotify3{
- MonitorObject myMonitorObject = new MonitorObject();
- boolean wasSignalled = false;
- public void doWait(){
- synchronized(myMonitorObject){
- while(!wasSignalled){
- try{
- myMonitorObject.wait();
- } catch(InterruptedException e){...}
- }
- //clear signal and continue running.
- wasSignalled = false;
- }
- }
- public void doNotify(){
- synchronized(myMonitorObject){
- wasSignalled = true;
- myMonitorObject.notify();
- }
- }
- }
留意wait()方法是在while循環里,而不在if表達式里。如果等待線程沒有收到信號就喚醒,wasSignalled變量將變為false,while循環會再執行一次,促使醒來的線程回到等待狀態。
6、多個線程等待相同信號
如果你有多個線程在等待,被notifyAll()喚醒,但只有一個被允許繼續執行,使用while循環也是個好方法。每次只有一個線程可以獲得監視器對象鎖,意味着只有一個線程可以退出wait()調用並清除wasSignalled標志(設為false)。一旦這個線程退出doWait()中的同步塊,其他線程就可以退出wait()調用,並在while循環里檢查wasSignalled變量值。但是,這個標志已經被第一個喚醒的線程清除了,所以其余醒來的線程將回到等待狀態,直到下次信號到來。
7、不要在字符串常量或全局對象中調用wait()
(校注:本章說的字符串常量指的是值為常量的字符串變量)
本文早期的一個版本在MyWaitNotify例子里使用字符串常量(””)作為管程對象(即監視器對象)。以下是那個例子:
- public class MyWaitNotify{
- String myMonitorObject = "";
- boolean wasSignalled = false;
- public void doWait(){
- synchronized(myMonitorObject){
- while(!wasSignalled){
- try{
- myMonitorObject.wait();
- } catch(InterruptedException e){...}
- }
- //clear signal and continue running.
- wasSignalled = false;
- }
- }
- public void doNotify(){
- synchronized(myMonitorObject){
- wasSignalled = true;
- myMonitorObject.notify();
- }
- }
- }
在空字符串作為鎖的同步塊(或者其他常量字符串)里調用wait()和notify()產生的問題是,JVM/編譯器內部會把常量字符串轉換成同一個對象。這意味着,即使你有2個不同的MyWaitNotify實例,它們都引用了相同的空字符串實例。同時也意味着存在這樣的風險:在第一個MyWaitNotify實例上調用doWait()的線程會被在第二個MyWaitNotify實例上調用doNotify()的線程喚醒。這種情況可以畫成以下這張圖:

起初這可能不像個大問題。畢竟,如果doNotify()在第二個MyWaitNotify實例上被調用,真正發生的事不外乎線程A和B被錯誤的喚醒了 。這個被喚醒的線程(A或者B)將在while循環里檢查信號值,然后回到等待狀態,因為doNotify()並沒有在第一個MyWaitNotify實例上調用,而這個正是它要等待的實例。這種情況相當於引發了一次假喚醒。線程A或者B在信號值沒有更新的情況下喚醒。但是代碼處理了這種情況,所以線程回到了等待狀態。記住,即使4個線程在相同的共享字符串實例上調用wait()和notify(),doWait()和doNotify()里的信號還會被2個MyWaitNotify實例分別保存。在MyWaitNotify1上的一次doNotify()調用可能喚醒MyWaitNotify2的線程,但是信號值只會保存在MyWaitNotify1里。
問題在於,由於doNotify()僅調用了notify()而不是notifyAll(),即使有4個線程在相同的字符串(空字符串)實例上等待,只能有一個線程被喚醒。所以,如果線程A或B被喚醒但信號是發給C或D的,A或B會檢查自己的信號值,看看有沒有信號被接收到,然后回到等待狀態。而C和D都沒被喚醒來檢查它們實際上接收到的信號值,這樣信號便丟失了。這種情況相當於前面所說的丟失信號的問題。C和D被發送過信號,只是都不能對信號作出回應。
如果doNotify()方法調用notifyAll(),而非notify(),所有等待線程都會被喚醒並依次檢查信號值。線程A和B將回到等待狀態,但是C或D只有一個線程注意到信號,並退出doWait()方法調用。C或D中的另一個將回到等待狀態,因為獲得信號的線程在退出doWait()的過程中清除了信號值(置為false)。
看過上面這段后,你可能會設法使用notifyAll()來代替notify(),但是這在性能上是個壞主意。在只有一個線程能對信號進行響應的情況下,沒有理由每次都去喚醒所有線程。
所以:在wait()/notify()機制中,不要使用全局對象,字符串常量等。應該使用對應唯一的對象。例如,每一個MyWaitNotify3的實例(前一節的例子)擁有一個屬於自己的監視器對象,而不是在空字符串上調用wait()/notify()。
校注:
管程 (英語:Monitors,也稱為監視器) 是對多個工作線程實現互斥訪問共享資源的對象或模塊。這些共享資源一般是硬件設備或一群變量。管程實現了在一個時間點,最多只有一個線程在執行它的某個子程序。與那些通過修改數據結構實現互斥訪問的並發程序設計相比,管程很大程度上簡化了程序設計。
死鎖
死鎖是兩個或更多線程阻塞着等待其它處於死鎖狀態的線程所持有的鎖。死鎖通常發生在多個線程同時但以不同的順序請求同一組鎖的時候。
例如,如果線程1鎖住了A,然后嘗試對B進行加鎖,同時線程2已經鎖住了B,接着嘗試對A進行加鎖,這時死鎖就發生了。線程1永遠得不到B,線程2也永遠得不到A,並且它們永遠也不會知道發生了這樣的事情。為了得到彼此的對象(A和B),它們將永遠阻塞下去。這種情況就是一個死鎖。
該情況如下:
- Thread 1 locks A, waits for B
- Thread 2 locks B, waits for A
這里有一個TreeNode類的例子,它調用了不同實例的synchronized方法:
- public class TreeNode {
- TreeNode parent = null;
- List children = new ArrayList();
- public synchronized void addChild(TreeNode child){
- if(!this.children.contains(child)) {
- this.children.add(child);
- child.setParentOnly(this);
- }
- }
- public synchronized void addChildOnly(TreeNode child){
- if(!this.children.contains(child){
- this.children.add(child);
- }
- }
- public synchronized void setParent(TreeNode parent){
- this.parent = parent;
- parent.addChildOnly(this);
- }
- public synchronized void setParentOnly(TreeNode parent){
- this.parent = parent;
- }
- }
如果在同一個parent和child對象上,線程1調用parent.addChild(child)方法的同時有另外一個線程2調用child.setParent(parent)方法,此時就會發生死鎖。下面的偽代碼說明了這個過程:
- Thread 1: parent.addChild(child); //locks parent
- --> child.setParentOnly(parent);
- Thread 2: child.setParent(parent); //locks child
- --> parent.addChildOnly()
首先線程1調用parent.addChild(child)。因為addChild()是同步的,所以線程1會對parent對象加鎖以不讓其它線程訪問該對象。
然后線程2調用child.setParent(parent)。因為setParent()是同步的,所以線程2會對child對象加鎖以不讓其它線程訪問該對象。
現在child和parent對象被兩個不同的線程鎖住了。接下來線程1嘗試調用child.setParentOnly()方法,但是由於child對象現在被線程2鎖住的,所以該調用會被阻塞。線程2也嘗試調用parent.addChildOnly(),但是由於parent對象現在被線程1鎖住,導致線程2也阻塞在該方法處。現在兩個線程都被阻塞並等待着獲取另外一個線程所持有的鎖。
注意:像上文描述的,這兩個線程需要同時調用parent.addChild(child)和child.setParent(parent)方法,並且是同一個parent對象和同一個child對象,才有可能發生死鎖。上面的代碼可能運行一段時間才會出現死鎖。
這些線程需要同時獲得鎖。舉個例子,如果線程1稍微領先線程2,然后成功地鎖住了A和B兩個對象,那么線程2就會在嘗試對B加鎖的時候被阻塞,這樣死鎖就不會發生。因為線程調度通常是不可預測的,因此沒有一個辦法可以准確預測什么時候死鎖會發生,僅僅是可能會發生。
更復雜的死鎖
死鎖可能不止包含2個線程,這讓檢測死鎖變得更加困難。下面是4個線程發生死鎖的例子:
- Thread 1 locks A, waits for B
- Thread 2 locks B, waits for C
- Thread 3 locks C, waits for D
- Thread 4 locks D, waits for A
線程1等待線程2,線程2等待線程3,線程3等待線程4,線程4等待線程1。
數據庫的死鎖
更加復雜的死鎖場景發生在數據庫事務中。一個數據庫事務可能由多條SQL更新請求組成。當在一個事務中更新一條記錄,這條記錄就會被鎖住避免其他事務的更新請求,直到第一個事務結束。同一個事務中每一個更新請求都可能會鎖住一些記錄。
當多個事務同時需要對一些相同的記錄做更新操作時,就很有可能發生死鎖,例如:
- Transaction 1, request 1, locks record 1 for update
- Transaction 2, request 1, locks record 2 for update
- Transaction 1, request 2, tries to lock record 2 for update.
- Transaction 2, request 2, tries to lock record 1 for update.
因為鎖發生在不同的請求中,並且對於一個事務來說不可能提前知道所有它需要的鎖,因此很難檢測和避免數據庫事務中的死鎖。
避免死鎖
在有些情況下死鎖是可以避免的。本文將展示三種用於避免死鎖的技術:
- 加鎖順序
- 加鎖時限
- 死鎖檢測
加鎖順序
當多個線程需要相同的一些鎖,但是按照不同的順序加鎖,死鎖就很容易發生。
如果能確保所有的線程都是按照相同的順序獲得鎖,那么死鎖就不會發生。看下面這個例子:
- Thread 1:
- lock A
- lock B
- Thread 2:
- wait for A
- lock C (when A locked)
- Thread 3:
- wait for A
- wait for B
- wait for C
如果一個線程(比如線程3)需要一些鎖,那么它必須按照確定的順序獲取鎖。它只有獲得了從順序上排在前面的鎖之后,才能獲取后面的鎖。
例如,線程2和線程3只有在獲取了鎖A之后才能嘗試獲取鎖C(譯者注:獲取鎖A是獲取鎖C的必要條件)。因為線程1已經擁有了鎖A,所以線程2和3需要一直等到鎖A被釋放。然后在它們嘗試對B或C加鎖之前,必須成功地對A加了鎖。
按照順序加鎖是一種有效的死鎖預防機制。但是,這種方式需要你事先知道所有可能會用到的鎖(譯者注:並對這些鎖做適當的排序),但總有些時候是無法預知的。
加鎖時限
另外一個可以避免死鎖的方法是在嘗試獲取鎖的時候加一個超時時間,這也就意味着在嘗試獲取鎖的過程中若超過了這個時限該線程則放棄對該鎖請求。若一個線程沒有在給定的時限內成功獲得所有需要的鎖,則會進行回退並釋放所有已經獲得的鎖,然后等待一段隨機的時間再重試。這段隨機的等待時間讓其它線程有機會嘗試獲取相同的這些鎖,並且讓該應用在沒有獲得鎖的時候可以繼續運行(譯者注:加鎖超時后可以先繼續運行干點其它事情,再回頭來重復之前加鎖的邏輯)。
以下是一個例子,展示了兩個線程以不同的順序嘗試獲取相同的兩個鎖,在發生超時后回退並重試的場景:
- Thread 1 locks A
- Thread 2 locks B
- Thread 1 attempts to lock B but is blocked
- Thread 2 attempts to lock A but is blocked
- Thread 1's lock attempt on B times out
- Thread 1 backs up and releases A as well
- Thread 1 waits randomly (e.g. 257 millis) before retrying.
- Thread 2's lock attempt on A times out
- Thread 2 backs up and releases B as well
- Thread 2 waits randomly (e.g. 43 millis) before retrying.
在上面的例子中,線程2比線程1早200毫秒進行重試加鎖,因此它可以先成功地獲取到兩個鎖。這時,線程1嘗試獲取鎖A並且處於等待狀態。當線程2結束時,線程1也可以順利的獲得這兩個鎖(除非線程2或者其它線程在線程1成功獲得兩個鎖之前又獲得其中的一些鎖)。
需要注意的是,由於存在鎖的超時,所以我們不能認為這種場景就一定是出現了死鎖。也可能是因為獲得了鎖的線程(導致其它線程超時)需要很長的時間去完成它的任務。
此外,如果有非常多的線程同一時間去競爭同一批資源,就算有超時和回退機制,還是可能會導致這些線程重復地嘗試但卻始終得不到鎖。如果只有兩個線程,並且重試的超時時間設定為0到500毫秒之間,這種現象可能不會發生,但是如果是10個或20個線程情況就不同了。因為這些線程等待相等的重試時間的概率就高的多(或者非常接近以至於會出現問題)。
(譯者注:超時和重試機制是為了避免在同一時間出現的競爭,但是當線程很多時,其中兩個或多個線程的超時時間一樣或者接近的可能性就會很大,因此就算出現競爭而導致超時后,由於超時時間一樣,它們又會同時開始重試,導致新一輪的競爭,帶來了新的問題。)
這種機制存在一個問題,在Java中不能對synchronized同步塊設置超時時間。你需要創建一個自定義鎖,或使用Java5中java.util.concurrent包下的工具。寫一個自定義鎖類不復雜,但超出了本文的內容。后續的Java並發系列會涵蓋自定義鎖的內容。
死鎖檢測
死鎖檢測是一個更好的死鎖預防機制,它主要是針對那些不可能實現按序加鎖並且鎖超時也不可行的場景。
每當一個線程獲得了鎖,會在線程和鎖相關的數據結構中(map、graph等等)將其記下。除此之外,每當有線程請求鎖,也需要記錄在這個數據結構中。
當一個線程請求鎖失敗時,這個線程可以遍歷鎖的關系圖看看是否有死鎖發生。例如,線程A請求鎖7,但是鎖7這個時候被線程B持有,這時線程A就可以檢查一下線程B是否已經請求了線程A當前所持有的鎖。如果線程B確實有這樣的請求,那么就是發生了死鎖(線程A擁有鎖1,請求鎖7;線程B擁有鎖7,請求鎖1)。
當然,死鎖一般要比兩個線程互相持有對方的鎖這種情況要復雜的多。線程A等待線程B,線程B等待線程C,線程C等待線程D,線程D又在等待線程A。線程A為了檢測死鎖,它需要遞進地檢測所有被B請求的鎖。從線程B所請求的鎖開始,線程A找到了線程C,然后又找到了線程D,發現線程D請求的鎖被線程A自己持有着。這是它就知道發生了死鎖。
下面是一幅關於四個線程(A,B,C和D)之間鎖占有和請求的關系圖。像這樣的數據結構就可以被用來檢測死鎖。

那么當檢測出死鎖時,這些線程該做些什么呢?
一個可行的做法是釋放所有鎖,回退,並且等待一段隨機的時間后重試。這個和簡單的加鎖超時類似,不一樣的是只有死鎖已經發生了才回退,而不會是因為加鎖的請求超時了。雖然有回退和等待,但是如果有大量的線程競爭同一批鎖,它們還是會重復地死鎖(編者注:原因同超時類似,不能從根本上減輕競爭)。
一個更好的方案是給這些線程設置優先級,讓一個(或幾個)線程回退,剩下的線程就像沒發生死鎖一樣繼續保持着它們需要的鎖。如果賦予這些線程的優先級是固定不變的,同一批線程總是會擁有更高的優先級。為避免這個問題,可以在死鎖發生的時候設置隨機的優先級。
飢餓和公平
如果一個線程因為CPU時間全部被其他線程搶走而得不到CPU運行時間,這種狀態被稱之為“飢餓”。而該線程被“飢餓致死”正是因為它得不到CPU運行時間的機會。解決飢餓的方案被稱之為“公平性” – 即所有線程均能公平地獲得運行機會。
下面是本文討論的主題:
1. Java中導致飢餓的原因:
- 高優先級線程吞噬所有的低優先級線程的CPU時間。
- 線程被永久堵塞在一個等待進入同步塊的狀態。
- 線程在等待一個本身也處於永久等待完成的對象(比如調用這個對象的wait方法)。
2. 在Java中實現公平性方案,需要:
- 使用鎖,而不是同步塊。
- 公平鎖。
- 注意性能方面。
Java中導致飢餓的原因
在Java中,下面三個常見的原因會導致線程飢餓:
- 高優先級線程吞噬所有的低優先級線程的CPU時間。
- 線程被永久堵塞在一個等待進入同步塊的狀態,因為其他線程總是能在它之前持續地對該同步塊進行訪問。
- 線程在等待一個本身(在其上調用wait())也處於永久等待完成的對象,因為其他線程總是被持續地獲得喚醒。
高優先級線程吞噬所有的低優先級線程的CPU時間
你能為每個線程設置獨自的線程優先級,優先級越高的線程獲得的CPU時間越多,線程優先級值設置在1到10之間,而這些優先級值所表示行為的准確解釋則依賴於你的應用運行平台。對大多數應用來說,你最好是不要改變其優先級值。
線程被永久堵塞在一個等待進入同步塊的狀態
Java的同步代碼區也是一個導致飢餓的因素。Java的同步代碼區對哪個線程允許進入的次序沒有任何保障。這就意味着理論上存在一個試圖進入該同步區的線程處於被永久堵塞的風險,因為其他線程總是能持續地先於它獲得訪問,這即是“飢餓”問題,而一個線程被“飢餓致死”正是因為它得不到CPU運行時間的機會。
線程在等待一個本身(在其上調用wait())也處於永久等待完成的對象
如果多個線程處在wait()方法執行上,而對其調用notify()不會保證哪一個線程會獲得喚醒,任何線程都有可能處於繼續等待的狀態。因此存在這樣一個風險:一個等待線程從來得不到喚醒,因為其他等待線程總是能被獲得喚醒。
在Java中實現公平性
雖Java不可能實現100%的公平性,我們依然可以通過同步結構在線程間實現公平性的提高。
首先來學習一段簡單的同步態代碼:
- public class Synchronizer{
- public synchronized void doSynchronized(){
- //do a lot of work which takes a long time
- }
- }
如果有一個以上的線程調用doSynchronized()方法,在第一個獲得訪問的線程未完成前,其他線程將一直處於阻塞狀態,而且在這種多線程被阻塞的場景下,接下來將是哪個線程獲得訪問是沒有保障的。
使用鎖方式替代同步塊
為了提高等待線程的公平性,我們使用鎖方式來替代同步塊。
- public class Synchronizer{
- Lock lock = new Lock();
- public void doSynchronized() throws InterruptedException{
- this.lock.lock(); //當前線程鎖住lock對象
- //critical section, do a lot of work which takes a long time
- this.lock.unlock(); //當前線程釋放lock對象上的鎖
- }
- }
注意到doSynchronized()不再聲明為synchronized,而是用lock.lock()和lock.unlock()來替代。
下面是用Lock類做的一個實現:
- public class Lock{
- private boolean isLocked = false; //是否加過鎖的信號
- private Thread lockingThread = null; //進行加鎖的線程
- public synchronized void lock() throws InterruptedException{
- while(isLocked){ //如果lock對象已被其他線程加鎖了(這個線程已經退出了本lock()方法)
- wait(); //當前線程阻塞,它釋放鎖對象上的鎖,其他線程可以再進入本lock()
- }
- isLocked = true; //如果沒加鎖,則當前線程對鎖對象加鎖
- lockingThread = Thread.currentThread();
- }
- public synchronized void unlock(){
- if(this.lockingThread != Thread.currentThread()){ //如果調用lock()加鎖的不是當前線程
- throw new IllegalMonitorStateException(
- "Calling thread has not locked this lock");
- }
- isLocked = false; //釋放鎖,標記為未加鎖
- lockingThread = null;
- notify(); //通知阻塞在鎖對象上的線程隊列,喚醒其中某一個線程
- }
- }
注意到上面對Lock的實現,如果存在多線程並發訪問lock(),這些線程將阻塞在對lock()方法的訪問上。另外,如果鎖已經鎖上(校對注:這里指的是isLocked等於true時),這些線程將阻塞在while(isLocked)循環的wait()調用里面。要記住的是,當線程正在等待進入lock() 時,可以調用wait()釋放其鎖實例對應的同步鎖,使得其他多個線程可以進入lock()方法,並調用wait()方法。
這回看下doSynchronized(),你會注意到在lock()和unlock()之間的注釋:在這兩個調用之間的代碼將運行很長一段時間。進一步設想,這段代碼將長時間運行,和進入lock()並調用wait()來比較的話。這意味着大部分時間用在等待進入鎖和進入臨界區的過程是用在wait()的等待中,而不是被阻塞在試圖進入lock()方法中。
在早些時候提到過,同步塊不會對等待進入的多個線程誰能獲得訪問做任何保障,同樣當調用notify()時,wait()也不會做保障一定能喚醒線程(至於為什么,請看線程通信)。因此這個版本的Lock類和doSynchronized()那個版本就保障公平性而言,沒有任何區別。
但我們能改變這種情況。當前的Lock類版本調用自己的wait()方法,如果每個線程在不同的對象上調用wait(),那么只有一個線程會在該對象上調用wait(),Lock類可以決定哪個對象能對其調用notify(),因此能做到有效的選擇喚醒哪個線程。
公平鎖
下面來講述將上面Lock類轉變為公平鎖FairLock。你會注意到新的實現和之前的Lock類中的同步和wait()/notify()稍有不同。
准確地說如何從之前的Lock類做到公平鎖的設計是一個漸進設計的過程,每一步都是在解決上一步的問題而前進的:Nested Monitor Lockout(嵌套管程鎖死), Slipped Conditions(滑漏條件)和Missed Signals(丟失信號)。這些本身的討論雖已超出本文的范圍,但其中每一步的內容都將會專題進行討論。重要的是,每一個調用lock()的線程都會進入一個隊列,當解鎖后,只有隊列里的第一個線程被允許鎖住FairLock實例,所有其它的線程都將處於等待狀態,直到他們處於隊列頭部。
- public class FairLock {
- private boolean isLocked = false; //是否加鎖的信號
- private Thread lockingThread = null; //加鎖的線程
- private List<QueueObject> waitingThreads =
- new ArrayList<QueueObject>(); //信號量隊列
- public void lock() throws InterruptedException{ //多個線程可同時進入
- QueueObject queueObject = new QueueObject(); //局部對象,線程安全
- boolean isLockedForThisThread = true; //是否為當前線程加鎖
- synchronized(this){ //將當前線程(用信號量)推入隊列
- waitingThreads.add(queueObject);
- }
- while(isLockedForThisThread){
- synchronized(this){ //加鎖操作需要同步
- //鎖狀態依然被檢查和設置,以避免出現滑漏條件
- isLockedForThisThread = isLocked || waitingThreads.get(0) != queueObject;
- if(!isLockedForThisThread){ //如果對象未加鎖且隊列頭部是當前線程
- isLocked = true; //加鎖
- waitingThreads.remove(queueObject); //從隊列中移除當前線程
- lockingThread = Thread.currentThread(); return;
- }
- }
- try{ //放在同步塊之外,避免monitor嵌套鎖死
- queueObject.doWait(); //監視器對象(持有信號量isNotified)等待
- }catch(InterruptedException e){
- synchronized(this) { waitingThreads.remove(queueObject); }
- throw e;
- }
- }
- }
- public synchronized void unlock(){
- if(this.lockingThread != Thread.currentThread()){ //加鎖的不是當前線程
- throw new IllegalMonitorStateException( "Calling thread has not locked this lock");
- }
- isLocked = false; //解鎖
- lockingThread = null;
- if(waitingThreads.size() > 0){ //喚醒第一個線程
- waitingThreads.get(0).doNotify();
- }
- }
- }
- public class QueueObject {
- private boolean isNotified = false;
- public synchronized void doWait() throws InterruptedException {
- while(!isNotified){
- this.wait();
- }
- this.isNotified = false;
- }
- public synchronized void doNotify() {
- this.isNotified = true;
- this.notify();
- }
- public boolean equals(Object o) {
- return this == o;
- }
- }
首先注意到lock()方法不再聲明為synchronized,取而代之的是對必需同步的代碼,在synchronized中進行嵌套。
FairLock新創建了一個QueueObject的實例,並對每個調用lock()的線程都將其QueueObject實例推入隊列。調用unlock()的線程將從隊列頭部獲取QueueObject,並對其調用doNotify(),以喚醒在該對象上等待的線程。通過這種方式,在同一時間僅有一個等待線程獲得喚醒,而不是所有的等待線程。這也是實現FairLock公平性的核心所在。
請注意,在同一個同步塊中,鎖狀態依然被檢查和設置,以避免出現滑漏條件。
還需注意到,QueueObject實際是一個semaphore。doWait()和doNotify()方法在QueueObject中保存着信號。這樣做以避免一個線程在調用queueObject.doWait()之前被另一個調用unlock()並隨之調用queueObject.doNotify()的線程重入,從而導致信號丟失。queueObject.doWait()調用放置在synchronized(this)塊之外,以避免被monitor嵌套鎖死,所以另外的線程可以進入unlock()來解鎖,只要當沒有線程在lock方法的synchronized(this)塊中執行即可。
最后,注意到queueObject.doWait()在try – catch塊中是怎樣調用的。在InterruptedException拋出的情況下,線程得以離開lock(),並需讓它從隊列中移除。
性能考慮
如果比較Lock和FairLock類,你會注意到在FairLock類中lock()和unlock()還有更多需要深入的地方。這些額外的代碼會導致FairLock的同步機制實現比Lock要稍微慢些。究竟存在多少影響,還依賴於應用在FairLock臨界區執行的時長。執行時長越大,FairLock帶來的負擔影響就越小,當然這也和代碼執行的頻繁度相關。
嵌套管程鎖死
嵌套管程鎖死類似於死鎖, 下面是一個嵌套管程鎖死的場景:
- 線程1 獲得A對象的鎖。
- 線程1 獲得對象B的鎖(同時持有對象A的鎖)。
- 線程1 決定等待另一個線程的信號再繼續。
- 線程1 調用B.wait(),從而釋放了B對象上的鎖,但仍然持有對象A的鎖。
- 線程2 需要同時持有對象A和對象B的鎖,才能向線程1發信號。
- 線程2 無法獲得對象A上的鎖,因為對象A上的鎖當前正被線程1持有。
- 線程2 一直被阻塞,等待線程1釋放對象A上的鎖。
- 線程1 一直阻塞,等待線程2的信號,因此,不會釋放對象A上的鎖,
- 而線程2需要對象A上的鎖才能給線程1發信號……
你可以能會說,這是個空想的場景,好吧,讓我們來看看下面這個比較挫的Lock實現:
- //lock implementation with nested monitor lockout problem
- public class Lock{
- protected MonitorObject monitorObject = new MonitorObject();
- protected boolean isLocked = false;
- public void lock() throws InterruptedException{
- synchronized(this){
- while(isLocked){
- synchronized(this.monitorObject){
- this.monitorObject.wait();
- }
- }
- isLocked = true;
- }
- }
- public void unlock(){
- synchronized(this){
- this.isLocked = false;
- synchronized(this.monitorObject){
- this.monitorObject.notify();
- }
- }
- }
- }
可以看到,lock()方法首先在”this”上同步,然后在monitorObject上同步。如果isLocked等於false,因為線程不會繼續調用monitorObject.wait(),那么一切都沒有問題 。但是如果isLocked等於true,調用lock()方法的線程會在monitorObject.wait()上阻塞。
這里的問題在於,調用monitorObject.wait()方法只釋放了monitorObject上的管程對象,而與”this“關聯的管程對象並沒有釋放。換句話說,這個剛被阻塞的線程仍然持有”this”上的鎖。
(校對注:如果一個線程持有這種Lock的時候另一個線程執行了lock操作)當一個已經持有這種Lock的線程想調用unlock(),就會在unlock()方法進入synchronized(this)塊時阻塞。這會一直阻塞到在lock()方法中等待的線程離開synchronized(this)塊。但是,在unlock中isLocked變為false,monitorObject.notify()被執行之后,lock()中等待的線程才會離開synchronized(this)塊。
簡而言之,在lock方法中等待的線程需要其它線程成功調用unlock方法來退出lock方法,但是,在lock()方法離開外層同步塊之前,沒有線程能成功執行unlock()。
結果就是,任何調用lock方法或unlock方法的線程都會一直阻塞。這就是嵌套管程鎖死。
一個更現實的例子
你可能會說,這么挫的實現方式我怎么可能會做呢?你或許不會在里層的管程對象上調用wait或notify方法,但完全有可能會在外層的this上調。
有很多類似上面例子的情況。例如,如果你准備實現一個公平鎖。你可能希望每個線程在它們各自的QueueObject上調用wait(),這樣就可以每次喚醒一個線程。
下面是一個比較挫的公平鎖實現方式:
- //Fair Lock implementation with nested monitor lockout problem
- public class FairLock {
- private boolean isLocked = false;
- private Thread lockingThread = null;
- private List<QueueObject> waitingThreads =
- new ArrayList<QueueObject>();
- public void lock() throws InterruptedException{
- QueueObject queueObject = new QueueObject();
- synchronized(this){
- waitingThreads.add(queueObject);
- while(isLocked || waitingThreads.get(0) != queueObject){
- synchronized(queueObject){
- try{
- queueObject.wait();
- }catch(InterruptedException e){
- waitingThreads.remove(queueObject);
- throw e;
- }
- }
- }
- waitingThreads.remove(queueObject);
- isLocked = true;
- lockingThread = Thread.currentThread();
- }
- }
- public synchronized void unlock(){
- if(this.lockingThread != Thread.currentThread()){
- throw new IllegalMonitorStateException(
- "Calling thread has not locked this lock");
- }
- isLocked = false;
- lockingThread = null;
- if(waitingThreads.size() > 0){
- QueueObject queueObject = waitingThread.get(0);
- synchronized(queueObject){
- queueObject.notify();
- }
- }
- }
- }
- public class QueueObject {}
乍看之下,嗯,很好,但是請注意lock方法是怎么調用queueObject.wait()的,在方法內部有兩個synchronized塊,一個鎖定this,一個嵌在上一個synchronized塊內部,它鎖定的是局部變量queueObject。
當一個線程調用queueObject.wait()方法的時候,它僅僅釋放的是在queueObject對象實例的鎖,並沒有釋放”this”上面的鎖。
現在我們還有一個地方需要特別注意, unlock方法被聲明成了synchronized,這就相當於一個synchronized(this)塊。這就意味着,如果一個線程在lock()中等待,該線程將持有與this關聯的管程對象。所有調用unlock()的線程將會一直保持阻塞,等待着前面那個已經獲得this鎖的線程釋放this鎖,但這永遠也發生不了,因為只有某個線程成功地給lock()中等待的線程發送了信號,this上的鎖才會釋放,但只有執行unlock()方法才會發送這個信號。
因此,上面的公平鎖的實現會導致嵌套管程鎖死。更好的公平鎖實現方式可以參考Starvation and Fairness。
總結:
嵌套管程鎖死:在加鎖函數lock()中,線程在嵌套的同步塊內調用管程對象上的wait()阻塞自己,這只會釋放管程對象的上鎖,而不會釋放鎖對象上的鎖。導致其他線程不能進入unlock()來喚醒前面阻塞的線程,而是自己也會被阻塞。一方持有鎖並阻塞,等待另一方的喚醒信號,而另一方需要這個鎖才能發信號,因此也只能阻塞,從而鎖死,這就是嵌套管程鎖死。解決方法是把調用管程對象的wait()操作的嵌套同步塊移到外層,這樣其他線程可進入unlock()來喚醒阻塞的線程。
嵌套管程鎖死 VS 死鎖
嵌套管程鎖死與死鎖很像:都是線程最后被一直阻塞着互相等待。
但是兩者又不完全相同。在死鎖中我們已經對死鎖有了個大概的解釋,死鎖通常是因為兩個線程獲取鎖的順序不一致造成的,線程1鎖住A,等待獲取B,線程2已經獲取了B,再等待獲取A。如死鎖避免中所說的,死鎖可以通過總是以相同的順序獲取鎖來避免。
但是發生嵌套管程鎖死時鎖獲取的順序是一致的。線程1獲得A和B,然后釋放B,等待線程2的信號。線程2需要同時獲得A和B,才能向線程1發送信號。所以,一個線程在等待喚醒,另一個線程在等待想要的鎖被釋放。
不同點歸納如下:
- 死鎖中,二個線程都在等待對方釋放鎖。
- 嵌套管程鎖死中,線程1持有鎖A,同時等待從線程2發來的信號,線程2需要鎖A來發信號給線程1。
滑漏條件
所謂滑漏條件Slipped conditions,就是說, 從一個線程檢查某一特定條件到該線程操作此條件期間,這個條件已經被其它線程改變,導致第一個線程在該條件上執行了錯誤的操作。這里有一個簡單的例子:
- public class Lock {
- private boolean isLocked = true;
- public void lock(){
- synchronized(this){
- while(isLocked){
- try{
- this.wait();
- } catch(InterruptedException e){
- //do nothing, keep waiting
- }
- }
- }
- synchronized(this){
- isLocked = true;
- }
- }
- public synchronized void unlock(){
- isLocked = false;
- this.notify();
- }
- }
我們可以看到,lock()方法包含了兩個同步塊。第一個同步塊執行wait操作直到isLocked變為false才退出,第二個同步塊將isLocked置為true,以此來鎖住這個Lock實例避免其它線程通過lock()方法。
我們可以設想一下,假如在某個時刻isLocked為false, 這個時候,有兩個線程同時訪問lock方法。如果第一個線程先進入第一個同步塊,這個時候它會發現isLocked為false,在退出第一個同步塊而進入第二個同步塊之前,若此時允許第二個線程執行,它也進入第一個同步塊,同樣發現isLocked是false。現在兩個線程都檢查了這個條件為false,然后它們都會繼續進入第二個同步塊中並設置isLocked為true。(譯注:導致兩個線程都會進入lock()后面的臨界區,不能保證只有一個線程進入臨界區而其他線程等待。)
這個場景就是slipped conditions的例子,兩個線程檢查同一個條件, 然后退出同步塊,因此在這兩個線程改變條件之前,就允許其它線程來檢查這個條件。換句話說,條件被某個線程檢查到該條件被此線程改變期間,這個條件已經被其它線程改變過了。
為避免slipped conditions,條件的檢查與設置必須是原子的,也就是說,在第一個線程檢查和設置條件期間,不會有其它線程檢查這個條件。
解決上面問題的方法很簡單,只是簡單的把isLocked = true這行代碼移到第一個同步塊中,放在while循環后面即可:
- public class Lock {
- private boolean isLocked = true;
- public void lock(){
- synchronized(this){
- while(isLocked){
- try{
- this.wait();
- } catch(InterruptedException e){
- //do nothing, keep waiting
- }
- }
- isLocked = true;
- }
- }
- public synchronized void unlock(){
- isLocked = false;
- this.notify();
- }
- }
現在檢查和設置isLocked條件是在同一個同步塊中原子地執行了。
一個更現實的例子
也許你會說,我才不可能寫這么挫的代碼,還覺得slipped conditions是個相當理論的問題。但是第一個簡單的例子只是用來更好的展示slipped conditions。
飢餓和公平中實現的公平鎖也許是個更現實的例子。再看下嵌套管程鎖死中那個幼稚的實現,如果我們試圖解決其中的嵌套管程鎖死問題,很容易產生slipped conditions問題。 首先讓我們看下嵌套管程鎖死中的例子:
- //Fair Lock implementation with nested monitor lockout problem
- public class FairLock {
- private boolean isLocked = false;
- private Thread lockingThread = null;
- private List<QueueObject> waitingThreads =
- new ArrayList<QueueObject>();
- public void lock() throws InterruptedException{
- QueueObject queueObject = new QueueObject();
- synchronized(this){
- waitingThreads.add(queueObject);
- while(isLocked || waitingThreads.get(0) != queueObject){
- synchronized(queueObject){
- try{
- queueObject.wait();
- }catch(InterruptedException e){
- waitingThreads.remove(queueObject);
- throw e;
- }
- }
- }
- waitingThreads.remove(queueObject);
- isLocked = true;
- lockingThread = Thread.currentThread();
- }
- }
- public synchronized void unlock(){
- if(this.lockingThread != Thread.currentThread()){
- throw new IllegalMonitorStateException(
- "Calling thread has not locked this lock");
- }
- isLocked = false;
- lockingThread = null;
- if(waitingThreads.size() > 0){
- QueueObject queueObject = waitingThread.get(0);
- synchronized(queueObject){
- queueObject.notify();
- }
- }
- }
- }
我們可以看到synchronized(queueObject)及其中的queueObject.wait()調用是嵌在synchronized(this)塊里面的,這會導致嵌套管程鎖死問題。為避免這個問題,我們必須將synchronized(queueObject)塊移出synchronized(this)塊。移出來之后的代碼可能是這樣的:
- //Fair Lock implementation with slipped conditions problem
- public class FairLock {
- private boolean isLocked = false;
- private Thread lockingThread = null;
- private List<QueueObject> waitingThreads =
- new ArrayList<QueueObject>();
- public void lock() throws InterruptedException{
- QueueObject queueObject = new QueueObject();
- synchronized(this){
- waitingThreads.add(queueObject);
- }
- boolean mustWait = true;
- while(mustWait){
- synchronized(this){
- mustWait = isLocked || waitingThreads.get(0) != queueObject;
- }
- synchronized(queueObject){
- if(mustWait){
- try{
- queueObject.wait();
- }catch(InterruptedException e){
- waitingThreads.remove(queueObject);
- throw e;
- }
- }
- }
- }
- synchronized(this){
- waitingThreads.remove(queueObject);
- isLocked = true;
- lockingThread = Thread.currentThread();
- }
- }
- }
注意:因為我只改動了lock()方法,這里只展現了lock方法。
現在lock()方法包含了3個同步塊。
第一個,synchronized(this)塊通過mustWait = isLocked || waitingThreads.get(0) != queueObject檢查內部變量的值。
第二個,synchronized(queueObject)塊檢查線程是否需要等待。也有可能其它線程在這個時候已經解鎖了,但我們暫時不考慮這個問題。我們就假設這個鎖處在解鎖狀態,所以線程會立馬退出synchronized(queueObject)塊。
第三個,synchronized(this)塊只會在mustWait為false的時候執行。它將isLocked重新設回true,然后離開lock()方法。
設想一下,在鎖處於解鎖狀態時,如果有兩個線程同時調用lock()方法會發生什么。首先,線程1會檢查到isLocked為false,然后線程2同樣檢查到isLocked為false。接着,它們都不會等待,都會去設置isLocked為true。這就是slipped conditions的一個最好的例子。
消除滑漏條件問題
要消除上面例子中的slipped conditions問題,最后一個synchronized(this)塊中的代碼必須向上移到第一個同步塊中。為適應這種變動,代碼需要做點小改動。下面是改動過的代碼:
- //Fair Lock implementation without nested monitor lockout problem,
- //but with missed signals problem.
- public class FairLock {
- private boolean isLocked = false;
- private Thread lockingThread = null;
- private List<QueueObject> waitingThreads =
- new ArrayList<QueueObject>();
- public void lock() throws InterruptedException{
- QueueObject queueObject = new QueueObject();
- synchronized(this){
- waitingThreads.add(queueObject);
- }
- boolean mustWait = true;
- while(mustWait){
- synchronized(this){
- mustWait = isLocked || waitingThreads.get(0) != queueObject;
- if(!mustWait){
- waitingThreads.remove(queueObject);
- isLocked = true;
- lockingThread = Thread.currentThread();
- return;
- }
- }
- synchronized(queueObject){
- if(mustWait){
- try{
- queueObject.wait();
- }catch(InterruptedException e){
- waitingThreads.remove(queueObject);
- throw e;
- }
- }
- }
- }
- }
- public synchronized void unlock(){
- if(this.lockingThread != Thread.currentThread()){
- throw new IllegalMonitorStateException(
- "Calling thread has not locked this lock");
- }
- isLocked = false;
- lockingThread = null;
- if(waitingThreads.size() > 0){
- QueueObject queueObject = waitingThread.get(0);
- synchronized(queueObject){
- queueObject.notify();
- }
- }
- }
- }
我們可以看到對局部變量mustWait的檢查與賦值是在同一個同步塊中完成的。還可以看到,即使在synchronized(this)塊外面檢查了mustWait,在while(mustWait)子句中,mustWait變量從來沒有在synchronized(this)同步塊外被賦值。當一個線程檢查到mustWait是false的時候,它將自動設置內部的條件(isLocked),所以其它線程再來檢查這個條件的時候,它們就會發現這個條件的值現在為true了。
synchronized(this)塊中的return;
語句不是必須的。這只是個小小的優化。如果一個線程肯定不會等待(即mustWait為false),那么就沒必要讓它進入到synchronized(queueObject)同步塊中和執行if(mustWait)子句了。
細心的讀者可能會注意到上面的公平鎖實現仍然有可能丟失信號。設想一下,當該FairLock實例處於鎖定狀態時,有個線程來調用lock()方法。執行完第一個synchronized(this)塊后,mustWait變量的值為true。再設想一下調用lock()的線程被其他線程搶占了,擁有鎖的那個線程此時調用了unlock()方法,但是看下之前的unlock()的實現你會發現,它調用了queueObject.notify()。但是,因為lock()中的線程還沒有來得及調用queueObject.wait(),所以queueObject.notify()調用也就沒有作用了,信號就丟失掉了。如果調用lock()的線程在另一個線程調用queueObject.notify()之后調用queueObject.wait(),這個線程會一直阻塞到其它線程調用unlock方法為止,但這永遠也不會發生。
公平鎖實現的信號丟失問題在飢餓和公平一文中我們已有過討論,把QueueObject轉變成一個信號量,並提供兩個方法:doWait()和doNotify()。這些方法會在QueueObject內部對信號進行存儲和響應。用這種方式,即使doNotify()在doWait()之前調用,信號也不會丟失。
總結:
滑漏條件:在加鎖函數lock()中,如果鎖狀態的檢查和設置在兩個獨立的同步塊中,這會導致一個線程在檢查鎖狀態為false並且還沒進入另一個同步塊設置加鎖時,其他線程也可以進入前一同步塊並且檢查出鎖狀態為false,從而多個線程都不會等待,都會進入后一同步塊設置鎖為true。然后多個線程都會進入lock()后面的臨界區,同步失效。這就是滑漏條件。解決方法是把鎖狀態的檢查與設置放在一個同步塊中,保持原子性。
丟失信號:一個線程在被通知線程調用wait()之前去調用notify()進行通知,導致通知信號丟失。被通知線程錯過了喚醒信號,將永遠等待。解決方法是把信號保存在一個類中,在這個信號類中用一個變量來保存是否被通知過。在notify前,設置自己已經被通知過。在線程准備wait時,檢查這個變量,如果已經被通知過,就不調用wait()了,從而避免永久等待。然后設置自己沒有被通知過,需要等待通知。
假喚醒:線程在沒有調用過notify()和notifyAll()的情況下無端地醒來,比如由於程序的bug或不正確的信號,這就是假喚醒(spurious wakeups)。假喚醒后,線程能夠執行后續的操作,這可能導致應用程序出現嚴重問題。解決方法是把保存信號的成員變量放在一個while循環里接受檢查(如上面的mustWait),而不是在if表達式里。這樣的一個while循環叫做自旋鎖。等待線程會一直在while循環中自旋,直到收到喚醒信號使循環條件為false。如果它沒有收到信號就喚醒,循環條件仍為true,while循環會再執行一次,促使醒來的線程回到等待狀態。
英文原文:http://tutorials.jenkov.com/java-concurrency/index.html
中文參考:http://ifeve.com/java-concurrency-thread-directory/
http://blog.csdn.net/linfanhehe/article/details/51130492