Java中的Lock和Condition


Lock和Condition(一)

Java SDK 並發包內容很豐富,包羅萬象,但是我覺得最核心的還是其對管程的實現。因為理論上利用管程,你幾乎可以實現並發包里所有的工具類。在《Java中的管程》中我們提到過在並發編程領域,有兩大核心問題:一個是互斥,即同一時刻只允許一個線程訪問共享資源;另一個是同步,即線程之間如何通信、協作。這兩大問題,管程都是能夠解決的。Java SDK 並發包通過 Lock 和 Condition 兩個接口來實現管程,其中 Lock 用於解決互斥問題,Condition 用於解決同步問題

今天我們重點介紹 Lock 的使用,在介紹 Lock 的使用之前,有個問題需要你首先思考一下:Java 語言本身提供的 synchronized 也是管程的一種實現,既然 Java 從語言層面已經實現了管程了,那為什么還要在 SDK 里提供另外一種實現呢?難道 Java 標准委員會還能同意“重復造輪子”的方案?很顯然它們之間是有巨大區別的。那區別在哪里呢?如果能深入理解這個問題,對你用好 Lock 幫助很大。下面我們就一起來剖析一下這個問題。

再造管程的理由

你也許曾經聽到過很多這方面的傳說,例如在 Java 的 1.5 版本中,synchronized 性能不如 SDK 里面的 Lock,但 1.6 版本之后,synchronized 做了很多優化,將性能追了上來,所以 1.6 之后的版本又有人推薦使用 synchronized 了。那性能是否可以成為“重復造輪子”的理由呢?顯然不能。因為性能問題優化一下就可以了,完全沒必要“重復造輪子”。

到這里,關於這個問題,你是否能夠想出一條理由來呢?如果你細心的話,也許能想到一點。那就是破壞不可搶占條件方案可以解決死鎖問題,但是這個方案 synchronized 沒有辦法解決。原因是 synchronized 申請資源的時候,如果申請不到,線程直接進入阻塞狀態了,而線程進入阻塞狀態,啥都干不了,也釋放不了線程已經占有的資源。但我們希望的是:對於“不可搶占”這個條件,占用部分資源的線程進一步申請其他資源時,如果申請不到,可以主動釋放它占有的資源,這樣不可搶占這個條件就破壞掉了。

如果我們重新設計一把互斥鎖去解決這個問題,那該怎么設計呢?我覺得有三種方案。

能夠響應中斷。synchronized 的問題是,持有鎖 A 后,如果嘗試獲取鎖 B 失敗,那么線程就進入阻塞狀態,一旦發生死鎖,就沒有任何機會來喚醒阻塞的線程。但如果阻塞狀態的線程能夠響應中斷信號,也就是說當我們給阻塞的線程發送中斷信號的時候,能夠喚醒它,那它就有機會釋放曾經持有的鎖 A。這樣就破壞了不可搶占條件了。

支持超時。如果線程在一段時間之內沒有獲取到鎖,不是進入阻塞狀態,而是返回一個錯誤,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。

非阻塞地獲取鎖。如果嘗試獲取鎖失敗,並不進入阻塞狀態,而是直接返回,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。

這三種方案可以全面彌補 synchronized 的問題。到這里相信你應該也能理解了,這三個方案就是“重復造輪子”的主要原因,體現在 API 上,就是 Lock 接口的三個方法。詳情如下:

// 支持中斷的API
void lockInterruptibly() throws InterruptedException;

// 支持超時的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

// 支持非阻塞獲取鎖的API
boolean tryLock();

如何保證可見性

Java SDK 里面 Lock 的使用,有一個經典的范例,就是try{}finally{},需要重點關注的是在 finally 里面釋放鎖。這個范例無需多解釋,你看一下下面的代碼就明白了。但是有一點需要解釋一下,那就是可見性是怎么保證的。你已經知道 Java 里多線程的可見性是通過 Happens-Before 規則保證的,而 synchronized 之所以能夠保證可見性,也是因為有一條 synchronized 相關的規則:synchronized 的解鎖 Happens-Before 於后續對這個鎖的加鎖。那 Java SDK 里面 Lock 靠什么保證可見性呢?例如在下面的代碼中,線程 T1 對 value 進行了 +=1 操作,那后續的線程 T2 能夠看到 value 的正確結果嗎?

class X {
	private final Lock rtl = new ReentrantLock();
  	int value;
    
  	public void addOne() {
	// 獲取鎖
	rtl.lock();  
	try {
		value+=1;
    } finally {
      	// 保證鎖能釋放
      	rtl.unlock();
    }
  }
}

答案必須是肯定的。Java SDK 里面鎖的實現非常復雜,這里我就不展開細說了,但是原理還是需要簡單介紹一下:它是利用了 volatile 相關的 Happens-Before 規則。Java SDK 里面的 ReentrantLock,內部持有一個 volatile 的成員變量 state,獲取鎖的時候,會讀寫 state 的值;解鎖的時候,也會讀寫 state 的值(簡化后的代碼如下面所示)。也就是說,在執行 value+=1 之前,程序先讀寫了一次 volatile 變量 state,在執行 value+=1 之后,又讀寫了一次 volatile 變量 state。根據相關的 Happens-Before 規則:

volatile 變量規則:對於線程A而言由於lock和unlock都需要操作state變量,因此lock操作 Happens-Before value操作,而value 操作Happens-Before unlock操作。對於線程B而言,只有等線程A unlock之后才能lock,因此線程B lock Happens-Before 線程A unlock。

傳遞性規則:線程 T1 的 value+=1 Happens-Before 線程 T2 的 lock() 操作

class SampleLock {

  volatile int state;
  // 加鎖
  lock() {
      // 省略代碼無數
      state = 1;
  }
  // 解鎖
  unlock() {
	// 省略代碼無數
	state = 0;
  }
}

所以說,后續線程 T2 能夠看到 value 的正確結果。如果你覺得理解起來還有點困難,建議你重溫一下前面我們講過的Java中的可見性里面的相關內容。

什么是可重入鎖

如果你細心觀察,會發現我們創建的鎖的具體類名是 ReentrantLock,這個翻譯過來叫可重入鎖,這個概念前面我們一直沒有介紹過。所謂可重入鎖,顧名思義,指的是線程可以重復獲取同一把鎖。例如下面代碼中,當線程 T1 執行到 ① 處時,已經獲取到了鎖 rtl ,當在 ① 處調用 get() 方法時,會在 ② 再次對鎖 rtl 執行加鎖操作。此時,如果鎖 rtl 是可重入的,那么線程 T1 可以再次加鎖成功;如果鎖 rtl 是不可重入的,那么線程 T1 此時會被阻塞。

除了可重入鎖,可能你還聽說過可重入函數,可重入函數怎么理解呢?指的是線程可以重復調用?顯然不是,所謂可重入函數,指的是多個線程可以同時調用該函數,每個線程都能得到正確結果;同時在一個線程內支持線程切換,無論被切換多少次,結果都是正確的。多線程可以同時執行,還支持線程切換,這意味着什么呢?線程安全啊。所以,可重入函數是線程安全的。

class X {
    private final Lock rtl = new ReentrantLock();
 	int value;

  	public int get() {
    	// 獲取鎖
    	rtl.lock();                (2)
    	try {
        	return value;
    	} finally {
      		// 保證鎖能釋放
      		rtl.unlock();
    	}
  	}

  	public void addOne() {
    	// 獲取鎖
        rtl.lock();               
	    try {
      		value = 1 + get();      (1)
		} finally {
	    	// 保證鎖能釋放
      		rtl.unlock();
    	}
  	}
}

公平鎖與非公平鎖

在使用 ReentrantLock 的時候,你會發現 ReentrantLock 這個類有兩個構造函數,一個是無參構造函數,一個是傳入 fair 參數的構造函數。fair 參數代表的是鎖的公平策略,如果傳入 true 就表示需要構造一個公平鎖,反之則表示要構造一個非公平鎖。

//無參構造函數:默認非公平鎖

public ReentrantLock() {
    sync = new NonfairSync();
}

//根據公平策略參數創建鎖

public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() : new NonfairSync();
}

在前面《Java中的管程》中,我們介紹過入口等待隊列,鎖都對應着一個等待隊列,如果一個線程沒有獲得鎖,就會進入等待隊列,當有線程釋放鎖的時候,就需要從等待隊列中喚醒一個等待的線程。如果是公平鎖,喚醒的策略就是誰等待的時間長,就喚醒誰,很公平;如果是非公平鎖,則不提供這個公平保證,有可能等待時間短的線程反而先被喚醒。

用鎖的最佳實踐

你已經知道,用鎖雖然能解決很多並發問題,但是風險也是挺高的。可能會導致死鎖,也可能影響性能。這方面有是否有相關的最佳實踐呢?有,還很多。但是我覺得最值得推薦的是並發大師 Doug Lea《Java 並發編程:設計原則與模式》一書中,推薦的三個用鎖的最佳實踐,它們分別是:

永遠只在更新對象的成員變量時加鎖

永遠只在訪問可變的成員變量時加鎖

永遠不在調用其他對象的方法時加鎖

這三條規則,前兩條估計你一定會認同,最后一條你可能會覺得過於嚴苛。但是我還是傾向於你去遵守,因為調用其他對象的方法,實在是太不安全了,也許“其他”方法里面有線程 sleep() 的調用,也可能會有奇慢無比的 I/O 操作,這些都會嚴重影響性能。更可怕的是,“其他”類的方法可能也會加鎖,然后雙重加鎖就可能導致死鎖。

並發問題,本來就難以診斷,所以你一定要讓你的代碼盡量安全,盡量簡單,哪怕有一點可能會出問題,都要努力避免。

總結

Java SDK 並發包里的 Lock 接口里面的每個方法,你可以感受到,都是經過深思熟慮的。除了支持類似 synchronized 隱式加鎖的 lock() 方法外,還支持超時、非阻塞、可中斷的方式獲取鎖,這三種方式為我們編寫更加安全、健壯的並發程序提供了很大的便利。希望你以后在使用鎖的時候,一定要仔細斟酌。

除了並發大師 Doug Lea 推薦的三個最佳實踐外,你也可以參考一些諸如:減少鎖的持有時間、減小鎖的粒度等業界廣為人知的規則,其實本質上它們都是相通的,不過是在該加鎖的地方加鎖而已。你可以自己體會,自己總結,最終總結出自己的一套最佳實踐來。

Lock和Condition(二)

《Java中的管程》里我們提到過 Java 語言內置的管程里只有一個條件變量,而 Lock&Condition 實現的管程是支持多個條件變量的,這是二者的一個重要區別。
在很多並發場景下,支持多個條件變量能夠讓我們的並發程序可讀性更好,實現起來也更容易。例如,實現一個阻塞隊列,就需要兩個條件變量。
那如何利用兩個條件變量快速實現阻塞隊列呢?
一個阻塞隊列,需要兩個條件變量,一個是隊列不空(空隊列不允許出隊),另一個是隊列不滿(隊列已滿不允許入隊),這個例子我們前面在介紹管程的時候詳細說過,這里就不再贅述。相關的代碼,我這里重新列了出來,你可以溫故知新一下。需要注意await()執行時會隱式地釋放lock,不然會導致鎖未釋放。

public class BlockedQueue<T>{
  final Lock lock = new ReentrantLock();
  // 條件變量:隊列不滿  
  final Condition notFull = lock.newCondition();
  // 條件變量:隊列不空  
  final Condition notEmpty = lock.newCondition();
  
  // 入隊
  void enq(T x) {
    lock.lock();
    try {
      while (隊列已滿){
        // 等待隊列不滿
        notFull.await();
      }  
      // 省略入隊操作...
      //入隊后,通知可出隊
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
    
  // 出隊
  void deq(){
    lock.lock();
    try {
      while (隊列已空){
        // 等待隊列不空
        notEmpty.await();
      }  
      // 省略出隊操作...
      //出隊后,通知可入隊
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

不過,這里你需要注意,Lock 和 Condition 實現的管程,線程等待和通知需要調用 await()、signal()、signalAll(),它們的語義和 wait()、notify()、notifyAll() 是相同的。但是不一樣的是,Lock&Condition 實現的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 實現的管程里才能使用。如果一不小心在 Lock&Condition 實現的管程里調用了 wait()、notify()、notifyAll(),那程序可就徹底玩兒完了。

Java SDK 並發包里的 Lock 和 Condition 不過就是管程的一種實現而已,管程你已經很熟悉了,那 Lock 和 Condition 的使用自然是小菜一碟。下面我們就來看看在知名項目 Dubbo 中,Lock 和 Condition 是怎么用的。不過在開始介紹源碼之前,我還先要介紹兩個概念:同步和異步。

同步與異步

我們平時寫的代碼,基本都是同步的。但最近幾年,異步編程大火。那同步和異步的區別到底是什么呢?通俗點來講就是調用方是否需要等待結果,如果需要等待結果,就是同步;如果不需要等待結果,就是異步。
比如在下面的代碼里,有一個計算圓周率小數點后 100 萬位的方法pai1M(),這個方法可能需要執行倆禮拜,如果調用pai1M()之后,線程一直等着計算結果,等倆禮拜之后結果返回,就可以執行 printf("hello world")了,這個屬於同步;如果調用pai1M()之后,線程不用等待計算結果,立刻就可以執行 printf("hello world"),這個就屬於異步。

// 計算圓周率小說點后100萬位 
String pai1M() {
  //省略代碼無數
}
pai1M()
printf("hello world")

同步是 Java 代碼默認的處理方式。如果你想讓你的程序支持異步,可以通過下面兩種方式來實現:

  1. 調用方創建一個子線程,在子線程中執行方法調用,這種調用我們稱為異步調用;
  2. 方法實現的時候,創建一個新的線程執行主要邏輯,主線程直接 return,這種方法我們一般稱為異步方法。

Dubbo 源碼分析

其實在編程領域,異步的場景還是挺多的,比如 TCP 協議本身就是異步的,我們工作中經常用到的 RPC 調用,在 TCP 協議層面,發送完 RPC 請求后,線程是不會等待 RPC 的響應結果的。可能你會覺得奇怪,平時工作中的 RPC 調用大多數都是同步的啊?這是怎么回事呢?

其實很簡單,一定是有人幫你做了異步轉同步的事情。例如目前知名的 RPC 框架 Dubbo 就給我們做了異步轉同步的事情,那它是怎么做的呢?下面我們就來分析一下 Dubbo 的相關源碼。

對於下面一個簡單的 RPC 調用,默認情況下 sayHello() 方法,是個同步方法,也就是說,執行 service.sayHello(“dubbo”) 的時候,線程會停下來等結果。

DemoService service = 初始化部分省略;
String message = service.sayHello("dubbo");
System.out.println(message);

如果此時你將調用線程 dump 出來的話,會是下圖這個樣子,你會發現調用線程阻塞了,線程狀態是 TIMED_WAITING。本來發送請求是異步的,但是調用線程卻阻塞了,說明 Dubbo 幫我們做了異步轉同步的事情。通過調用棧,你能看到線程是阻塞在 DefaultFuture.get() 方法上,所以可以推斷:Dubbo 異步轉同步的功能應該是通過 DefaultFuture 這個類實現的。

11

不過為了理清前后關系,還是有必要分析一下調用 DefaultFuture.get() 之前發生了什么。DubboInvoker 的 108 行調用了 DefaultFuture.get(),這一行很關鍵,我稍微修改了一下列在了下面。這一行先調用了 request(inv, timeout) 方法,這個方法其實就是發送 RPC 請求,之后通過調用 get() 方法等待 RPC 返回結果。

public class DubboInvoker{
  Result doInvoke(Invocation inv){
    // 下面這行就是源碼中108行
    // 為了便於展示,做了修改
    return currentClient 
      .request(inv, timeout)
      .get();
  }
}

DefaultFuture 這個類是很關鍵,我把相關的代碼精簡之后,列到了下面。不過在看代碼之前,你還是有必要重復一下我們的需求:當 RPC 返回結果之前,阻塞調用線程,讓調用線程等待;當 RPC 返回結果后,喚醒調用線程,讓調用線程重新執行。不知道你有沒有似曾相識的感覺,這不就是經典的等待 - 通知機制嗎?這個時候想必你的腦海里應該能夠浮現出管程的解決方案了。有了自己的方案之后,我們再來看看 Dubbo 是怎么實現的。

// 創建鎖與條件變量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 調用方通過該方法等待結果
Object get(int timeout) {
    long start = System.nanoTime();
    lock.lock();
    try {
        while (!isDone()) {
            done.await(timeout);
            long cur = System.nanoTime();
            if (isDone() ||
                cur - start > timeout) {
                break;
            }
        }
    } finally {
        lock.unlock();
    }
    if (!isDone()) {
        throw new TimeoutException();
    }
    return returnFromResponse();
}

// RPC結果是否已經返回
boolean isDone() {
    return response != null;
}

// RPC結果返回時調用該方法   
private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
}

調用線程通過調用 get() 方法等待 RPC 返回結果,這個方法里面,你看到的都是熟悉的“面孔”:調用 lock() 獲取鎖,在 finally 里面調用 unlock() 釋放鎖;獲取鎖后,通過經典的在循環中調用 await() 方法來實現等待。

當 RPC 結果返回時,會調用 doReceived() 方法,這個方法里面,調用 lock() 獲取鎖,在 finally 里面調用 unlock() 釋放鎖,獲取鎖后通過調用 signal() 來通知調用線程,結果已經返回,不用繼續等待了。

至此,Dubbo 里面的異步轉同步的源碼就分析完了,有沒有覺得還挺簡單的?最近這幾年,工作中需要異步處理的越來越多了,其中有一個主要原因就是有些 API 本身就是異步 API。例如 websocket 也是一個異步的通信協議,如果基於這個協議實現一個簡單的 RPC,你也會遇到異步轉同步的問題。現在很多公有雲的 API 本身也是異步的,例如創建雲主機,就是一個異步的 API,調用雖然成功了,但是雲主機並沒有創建成功,你需要調用另外一個 API 去輪詢雲主機的狀態。如果你需要在項目內部封裝創建雲主機的 API,你也會面臨異步轉同步的問題,因為同步的 API 更易用。

總結

Lock&Condition 是管程的一種實現,所以能否用好 Lock 和 Condition 要看你對管程模型理解得怎么樣。管程的技術前面我們已經專門用了一篇文章做了介紹,你可以結合着來學,理論聯系實踐,有助於加深理解。

Lock&Condition 實現的管程相對於 synchronized 實現的管程來說更加靈活、功能也更豐富。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM