本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接:http://item.jd.com/12299018.html

我們在67節和68節實現了線程的一些基本協作機制,那是利用基本的wait/notify實現的,我們提到,Java並發包中有一些專門的同步工具類,本節,我們就來探討它們。
我們要探討的工具類包括:
- 讀寫鎖ReentrantReadWriteLock
- 信號量Semaphore
- 倒計時門栓CountDownLatch
- 循環柵欄CyclicBarrier
與71節介紹的顯示鎖和72節介紹的顯示條件類似,它們也都是基於AQS實現的,AQS可參看71節。在一些特定的同步協作場景中,相比使用最基本的wait/notify,顯示鎖/條件,它們更為方便,效率更高。下面,我們就來探討它們的基本概念、用法、用途和基本原理。
讀寫鎖ReentrantReadWriteLock
之前章節我們介紹了兩種鎖,66節介紹了synchronized,71節介紹了顯示鎖ReentrantLock。對於同一受保護對象的訪問,無論是讀還是寫,它們都要求獲得相同的鎖。在一些場景中,這是沒有必要的,多個線程的讀操作完全可以並行,在讀多寫少的場景中,讓讀操作並行可以明顯提高性能。
怎么讓讀操作能夠並行,又不影響一致性呢?答案是使用讀寫鎖。在Java並發包中,接口ReadWriteLock表示讀寫鎖,主要實現類是可重入讀寫鎖ReentrantReadWriteLock。
ReadWriteLock的定義為:
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
通過一個ReadWriteLock產生兩個鎖,一個讀鎖,一個寫鎖。讀操作使用讀鎖,寫操作使用寫鎖。
需要注意的是,只有"讀-讀"操作是可以並行的,"讀-寫"和"寫-寫"都不可以。只有一個線程可以進行寫操作,在獲取寫鎖時,只有沒有任何線程持有任何鎖才可以獲取到,在持有寫鎖時,其他任何線程都獲取不到任何鎖。在沒有其他線程持有寫鎖的情況下,多個線程可以獲取和持有讀鎖。
ReentrantReadWriteLock是可重入的讀寫鎖,它有兩個構造方法,如下所示:
public ReentrantLock() public ReentrantLock(boolean fair)
fire表示是否公平,不傳遞的話是false,含義與顯式鎖一節介紹的類似,就不贅述了。
我們看個簡單的例子,使用ReentrantReadWriteLock實現一個緩存類MyCache,代碼如下:
public class MyCache { private Map<String, Object> map = new HashMap<>(); private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock readLock = readWriteLock.readLock(); private Lock writeLock = readWriteLock.writeLock(); public Object get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public Object put(String key, Object value) { writeLock.lock(); try { return map.put(key, value); } finally { writeLock.unlock(); } } public void clear() { writeLock.lock(); try { map.clear(); } finally { writeLock.unlock(); } } }
代碼比較簡單,就不贅述了。
讀寫鎖是怎么實現的呢?讀鎖和寫鎖看上去是兩個鎖,它們是怎么協調的?具體實現比較復雜,我們簡述下其思路。
內部,它們使用同一個整數變量表示鎖的狀態,16位給讀鎖用,16位給寫鎖用,使用一個變量便於進行CAS操作,鎖的等待隊列其實也只有一個。
寫鎖的獲取,就是確保當前沒有其他線程持有任何鎖,否則就等待。寫鎖釋放后,也就是將等待隊列中的第一個線程喚醒,喚醒的可能是等待讀鎖的,也可能是等待寫鎖的。
讀鎖的獲取不太一樣,首先,只要寫鎖沒有被持有,就可以獲取到讀鎖,此外,在獲取到讀鎖后,它會檢查等待隊列,逐個喚醒最前面的等待讀鎖的線程,直到第一個等待寫鎖的線程。如果有其他線程持有寫鎖,獲取讀鎖會等待。讀鎖釋放后,檢查讀鎖和寫鎖數是否都變為了0,如果是,喚醒等待隊列中的下一個線程。
信號量Semaphore
之前介紹的鎖都是限制只有一個線程可以同時訪問一個資源。現實中,資源往往有多個,但每個同時只能被一個線程訪問,比如,飯店的飯桌、火車上的衛生間。有的單個資源即使可以被並發訪問,但並發訪問數多了可能影響性能,所以希望限制並發訪問的線程數。還有的情況,與軟件的授權和計費有關,對不同等級的賬戶,限制不同的最大並發訪問數。
信號量類Semaphore就是用來解決這類問題的,它可以限制對資源的並發訪問數,它有兩個構造方法:
public Semaphore(int permits) public Semaphore(int permits, boolean fair)
fire表示公平,含義與之前介紹的是類似的,permits表示許可數量。
Semaphore的方法與鎖是類似的,主要的方法有兩類,獲取許可和釋放許可,主要方法有:
//阻塞獲取許可 public void acquire() throws InterruptedException //阻塞獲取許可,不響應中斷 public void acquireUninterruptibly() //批量獲取多個許可 public void acquire(int permits) throws InterruptedException public void acquireUninterruptibly(int permits) //嘗試獲取 public boolean tryAcquire() //限定等待時間獲取 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException //釋放許可 public void release()
我們看個簡單的示例,限制並發訪問的用戶數不超過100,代碼如下:
public class AccessControlService { public static class ConcurrentLimitException extends RuntimeException { private static final long serialVersionUID = 1L; } private static final int MAX_PERMITS = 100; private Semaphore permits = new Semaphore(MAX_PERMITS, true); public boolean login(String name, String password) { if (!permits.tryAcquire()) { // 同時登錄用戶數超過限制 throw new ConcurrentLimitException(); } // ..其他驗證 return true; } public void logout(String name) { permits.release(); } }
代碼比較簡單,就不贅述了。
需要說明的是,如果我們將permits的值設為1,你可能會認為它就變成了一般的鎖,不過,它與一般的鎖是不同的。一般鎖只能由持有鎖的線程釋放,而Semaphore表示的只是一個許可數,任意線程都可以調用其release方法。主要的鎖實現類ReentrantLock是可重入的,而Semaphore不是,每一次的acquire調用都會消耗一個許可,比如,看下面代碼段:
Semaphore permits = new Semaphore(1); permits.acquire(); permits.acquire(); System.out.println("acquired");
程序會阻塞在第二個acquire調用,永遠都不會輸出"acquired"。
信號量的基本原理比較簡單,也是基於AQS實現的,permits表示共享的鎖個數,acquire方法就是檢查鎖個數是否大於0,大於則減一,獲取成功,否則就等待,release就是將鎖個數加一,喚醒第一個等待的線程。
倒計時門栓CountDownLatch
我們在68節使用wait/notify實現了一個簡單的門栓MyLatch,我們提到,Java並發包中已經提供了類似工具,就是CountDownLatch。它的大概含義是指,它相當於是一個門栓,一開始是關閉的,所有希望通過該門的線程都需要等待,然后開始倒計時,倒計時變為0后,門栓打開,等待的所有線程都可以通過,它是一次性的,打開后就不能再關上了。
CountDownLatch里有一個計數,這個計數通過構造方法進行傳遞:
public CountDownLatch(int count)
多個線程可以基於這個計數進行協作,它的主要方法有:
public void await() throws InterruptedException public boolean await(long timeout, TimeUnit unit) throws InterruptedException public void countDown()
await()檢查計數是否為0,如果大於0,就等待,await()可以被中斷,也可以設置最長等待時間。countDown檢查計數,如果已經為0,直接返回,否則減少計數,如果新的計數變為0,則喚醒所有等待的線程。
在68節,我們介紹了門栓的兩種應用場景,一種是同時開始,另一種是主從協作。它們都有兩類線程,互相需要同步,我們使用CountDownLatch重新演示下。
在同時開始場景中,運行員線程等待主裁判線程發出開始指令的信號,一旦發出后,所有運動員線程同時開始,計數初始為1,運動員線程調用await,主線程調用countDown,示例代碼如下:
public class RacerWithCountDownLatch { static class Racer extends Thread { CountDownLatch latch; public Racer(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { this.latch.await(); System.out.println(getName() + " start run "+System.currentTimeMillis()); } catch (InterruptedException e) { } } } public static void main(String[] args) throws InterruptedException { int num = 10; CountDownLatch latch = new CountDownLatch(1); Thread[] racers = new Thread[num]; for (int i = 0; i < num; i++) { racers[i] = new Racer(latch); racers[i].start(); } Thread.sleep(1000); latch.countDown(); } }
代碼比較簡單,就不贅述了。在主從協作模式中,主線程依賴工作線程的結果,需要等待工作線程結束,這時,計數初始值為工作線程的個數,工作線程結束后調用countDown,主線程調用await進行等待,示例代碼如下:
public class MasterWorkerDemo { static class Worker extends Thread { CountDownLatch latch; public Worker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { // simulate working on task Thread.sleep((int) (Math.random() * 1000)); // simulate exception if (Math.random() < 0.02) { throw new RuntimeException("bad luck"); } } catch (InterruptedException e) { } finally { this.latch.countDown(); } } } public static void main(String[] args) throws InterruptedException { int workerNum = 100; CountDownLatch latch = new CountDownLatch(workerNum); Worker[] workers = new Worker[workerNum]; for (int i = 0; i < workerNum; i++) { workers[i] = new Worker(latch); workers[i].start(); } latch.await(); System.out.println("collect worker results"); } }
需要強調的是,在這里,countDown的調用應該放到finally語句中,確保在工作線程發生異常的情況下也會被調用,使主線程能夠從await調用中返回。
循環柵欄CyclicBarrier
我們在68節使用wait/notify實現了一個簡單的集合點AssemblePoint,我們提到,Java並發包中已經提供了類似工具,就是CyclicBarrier。它的大概含義是指,它相當於是一個柵欄,所有線程在到達該柵欄后都需要等待其他線程,等所有線程都到達后再一起通過,它是循環的,可以用作重復的同步。
CyclicBarrier特別適用於並行迭代計算,每個線程負責一部分計算,然后在柵欄處等待其他線程完成,所有線程到齊后,交換數據和計算結果,再進行下一次迭代。
與CountDownLatch類似,它也有一個數字,但表示的是參與的線程個數,這個數字通過構造方法進行傳遞:
public CyclicBarrier(int parties)
它還有一個構造方法,接受一個Runnable參數,如下所示:
public CyclicBarrier(int parties, Runnable barrierAction)
這個參數表示柵欄動作,當所有線程到達柵欄后,在所有線程執行下一步動作前,運行參數中的動作,這個動作由最后一個到達柵欄的線程執行。
CyclicBarrier的主要方法就是await:
public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
await在等待其他線程到達柵欄,調用await后,表示自己已經到達,如果自己是最后一個到達的,就執行可選的命令,執行后,喚醒所有等待的線程,然后重置內部的同步計數,以循環使用。
await可以被中斷,可以限定最長等待時間,中斷或超時后會拋出異常。需要說明的是異常BrokenBarrierException,它表示柵欄被破壞了,什么意思呢?在CyclicBarrier中,參與的線程是互相影響的,只要其中一個線程在調用await時被中斷了,或者超時了,柵欄就會被破壞,此外,如果柵欄動作拋出了異常,柵欄也會被破壞,被破壞后,所有在調用await的線程就會退出,拋出BrokenBarrierException。
我們看一個簡單的例子,多個游客線程分別在集合點A和B同步:
public class CyclicBarrierDemo { static class Tourist extends Thread { CyclicBarrier barrier; public Tourist(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { // 模擬先各自獨立運行 Thread.sleep((int) (Math.random() * 1000)); // 集合點A barrier.await(); System.out.println(this.getName() + " arrived A " + System.currentTimeMillis()); // 集合后模擬再各自獨立運行 Thread.sleep((int) (Math.random() * 1000)); // 集合點B barrier.await(); System.out.println(this.getName() + " arrived B " + System.currentTimeMillis()); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } public static void main(String[] args) { int num = 3; Tourist[] threads = new Tourist[num]; CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() { @Override public void run() { System.out.println("all arrived " + System.currentTimeMillis() + " executed by " + Thread.currentThread().getName()); } }); for (int i = 0; i < num; i++) { threads[i] = new Tourist(barrier); threads[i].start(); } } }
在我的電腦上的一次輸出為:
all arrived 1490053578552 executed by Thread-1 Thread-1 arrived A 1490053578555 Thread-2 arrived A 1490053578555 Thread-0 arrived A 1490053578555 all arrived 1490053578889 executed by Thread-0 Thread-0 arrived B 1490053578890 Thread-2 arrived B 1490053578890 Thread-1 arrived B 1490053578890
多個線程到達A和B的時間是一樣的,使用CyclicBarrier,達到了重復同步的目的。
CyclicBarrier與CountDownLatch可能容易混淆,我們強調下其區別:
- CountDownLatch的參與線程是有不同角色的,有的負責倒計時,有的在等待倒計時變為0,負責倒計時和等待倒計時的線程都可以有多個,它用於不同角色線程間的同步。
- CyclicBarrier的參與線程角色是一樣的,用於同一角色線程間的協調一致。
- CountDownLatch是一次性的,而CyclicBarrier是可以重復利用的。
小結
本節介紹了Java並發包中的一些同步協作工具:
- 在讀多寫少的場景中使用ReentrantReadWriteLock替代ReentrantLock,以提高性能
- 使用Semaphore限制對資源的並發訪問數
- 使用CountDownLatch實現不同角色線程間的同步
- 使用CyclicBarrier實現同一角色線程間的協調一致
實際中,應該優先使用這些工具,而不是手工用wait/notify或者顯示鎖/條件同步。
下一節,我們來探討一個特殊的概念,線程局部變量ThreadLocal,它是什么呢?
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。

