譯序
本指南根據 Jakob Jenkov 最新博客翻譯,請隨時關注博客更新
本指南已做成中英文對照閱讀版的 pdf 文檔,有興趣的朋友可以去 Java並發工具包java.util.concurrent用戶指南中英文對照閱讀版 進行下載。
1. java.util.concurrent - Java並發工具包
Java 5 添加了一個新的包到 Java 平台,java.util.concurrent 包。這個包包含有一系列能夠讓 Java 的並發編程變得更加簡單輕松的類。在這個包被添加以前,你需要自己去動手實現自己的相關工具類。
本文我將帶你一一認識 java.util.concurrent 包里的這些類,然后你可以嘗試着如何在項目中使用它們。本文中我將使用 Java 6 版本,我不確定這和 Java 5 版本里的是否有一些差異。
我不會去解釋關於 Java 並發的核心問題 - 其背后的原理,也就是說,如果你對那些東西感興趣,參考《Java 並發指南》
半成品
本文很大程度上還是個 “半成品”,所以當你發現一些被漏掉的類或接口時,請耐心等待。在我空閑的時候會把它們加進來的。
2. 阻塞隊列BlockingQueue
java.util.concurrent 包里的 BlockingQueue 接口表示一個線程安放入和提取實例的隊列。本小節我將給你演示如何使用這個 BlockingQueue。
本節不會討論如何在 Java 中實現一個你自己的 BlockingQueue。如果你對那個感興趣,參考《Java 並發指南》
2.1 BlockingQueue用法
BlockingQueue 通常用於一個線程生產對象,而另外一個線程消費這些對象的場景。下圖是對這個原理的闡述:
一個線程往里邊放,另外一個線程從里邊取的一個 BlockingQueue。
一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是說,它是有限的。如果該阻塞隊列到達了其臨界點,負責生產的線程將會在往里邊插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。
負責消費的線程將會一直從該阻塞隊列中拿出對象。如果消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。
2.2 BlockingQueue的方法
BlockingQueue 具有 4 組不同的方法用於插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
操作 | 拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
檢查 | element(o) | peek(o) | 不可用 | 不可用 |
四組不同的行為方式解釋:
- 拋異常:如果試圖的操作無法立即執行,拋一個異常。
- 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
- 阻塞:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
- 超時:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
無法向一個 BlockingQueue 中插入 null。如果你試圖插入 null,BlockingQueue 將會拋出一個 NullPointerException。
可以訪問到 BlockingQueue 中的所有元素,而不僅僅是開始和結束的元素。比如說,你將一個對象放入隊列之中以等待處理,但你的應用想要將其取消掉。那么你可以調用諸如 remove(o) 方法來將隊列之中的特定對象進行移除。但是這么干效率並不高(譯者注:基於隊列的數據結構,獲取除開始或結束位置的其他對象的效率不會太高),因此你盡量不要用這一類的方法,除非你確實不得不那么做。
2.3 BlockingQueue的實現
BlockingQueue 是個接口,你需要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的實現(Java 6):
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
2.4 Java中使用BlockingQueue的例子
這里是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 實現。
首先,BlockingQueueExample 類分別在兩個獨立的線程中啟動了一個 Producer 和 一個 Consumer。Producer 向一個共享的 BlockingQueue 中注入字符串,而 Consumer 則會從中把它們拿出來。
public class BlockingQueueExample { public static void main(String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue(1024); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); Thread.sleep(4000); } }
以下是 Producer 類。注意它在每次 put() 調用時是如何休眠一秒鍾的。這將導致 Consumer 在等待隊列中對象的時候發生阻塞。
public class Producer implements Runnable{ protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("1"); Thread.sleep(1000); queue.put("2"); Thread.sleep(1000); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } }
以下是 Consumer 類。它只是把對象從隊列中抽取出來,然后將它們打印到 System.out
public class Consumer implements Runnable{ protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
3. 數組阻塞隊列ArrayBlockingQueue
ArrayBlockingQueue 類實現了 BlockingQueue 接口。
ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組里。有界也就意味着,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你可以在對其初始化的時候設定這個上限,但之后就無法對這個上限進行修改了(譯者注:因為它是基於數組實現的,也就具有數組的特性:一旦初始化,大小就無法修改)。
ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
以下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:
BlockingQueue queue = new ArrayBlockingQueue(1024); queue.put("1"); Object object = queue.take();
以下是使用了 Java 泛型的一個 BlockingQueue 示例。注意其中是如何對 String 元素放入和提取的:
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024); queue.put("1"); String string = queue.take();
4. 延遲隊列DelayQueue
DelayQueue 實現了 BlockingQueue 接口。
DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口,該接口定義:
public interface Delayed extends Comparable<Delayed> { public long getDelay(TimeUnit timeUnit); }
DelayQueue 將會在每個元素的 getDelay() 方法返回的值的時間段之后才釋放掉該元素。如果返回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。
傳遞給 getDelay 方法的 getDelay 實例是一個枚舉類型,它表明了將要延遲的時間段。TimeUnit 枚舉將會取以下值:
- DAYS
- HOURS
- MINUTES
- SECONDS
- MILLISECONDS
- MICROSECONDS
- NANOSECONDS
正如你所看到的,Delayed 接口也繼承了 java.lang.Comparable 接口,這也就意味着 Delayed 對象之間可以進行對比。這個可能在對 DelayQueue 隊列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。
以下是使用 DelayQueue 的例子:
public class DelayQueueExample { public static void main(String[] args) { DelayQueue queue = new DelayQueue(); Delayed element1 = new DelayedElement(); queue.put(element1); Delayed element2 = queue.take(); } }
DelayedElement 是我所創建的一個 DelayedElement 接口的實現類,它不在 Java.util.concurrent 包里。你需要自行創建你自己的 Delayed 接口的實現以使用 DelayQueue 類。
5. 鏈阻塞隊列LinkedBlockingQueue
LinkedBlockingQueue 類實現了 BlockingQueue 接口。
LinkedBlockingQueue 內部以一個鏈式結構(鏈接節點)對其元素進行存儲。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。
LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
以下是 LinkedBlockingQueue 的初始化和使用示例代碼:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>(); BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024); bounded.put("Value"); String value = bounded.take();
6. 具有優先級的阻塞隊列PriorityBlockingQueue
PriorityBlockingQueue 類實現了 BlockingQueue 接口。
PriorityBlockingQueue 是一個無界的並發隊列。它使用了和類 java.util.PriorityQueue 一樣的排序規則。你無法向這個隊列中插入 null 值。
所有插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。因此該隊列中元素的排序就取決於你自己的 Comparable 實現。
注意 PriorityBlockingQueue 對於具有相等優先級(compare() == 0)的元素並不強制任何特定行為。
同時注意,如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先級為序的。
以下是使用 PriorityBlockingQueue 的示例:
BlockingQueue queue = new PriorityBlockingQueue(); //String implements java.lang.Comparable queue.put("Value"); String value = queue.take();
7. 同步隊列SynchronousQueue
SynchronousQueue 類實現了 BlockingQueue 接口。
SynchronousQueue 是一個特殊的隊列,它的內部同時只能夠容納單個元素。如果該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個線程將該元素從隊列中抽走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另一個線程向隊列中插入了一條新的元素。
據此,把這個類稱作一個隊列顯然是誇大其詞了。它更多像是一個匯合點。
8. 阻塞雙端隊列BlockingDeque
java.util.concurrent 包里的 BlockingDeque 接口表示一個線程安放入和提取實例的雙端隊列。本小節我將給你演示如何使用 BlockingDeque。
BlockingDeque 類是一個雙端隊列,在不能夠插入元素時,它將阻塞住試圖插入元素的線程;在不能夠抽取元素時,它將阻塞住試圖抽取的線程。
deque(雙端隊列) 是 “Double Ended Queue” 的縮寫。因此,雙端隊列是一個你可以從任意一端插入或者抽取元素的隊列。
BlockingDeque的使用
在線程既是一個隊列的生產者又是這個隊列的消費者的時候可以使用到 BlockingDeque。如果生產者線程需要在隊列的兩端都可以插入數據,消費者線程需要在隊列的兩端都可以移除數據,這個時候也可以使用 BlockingDeque。BlockingDeque 圖解:
一個 BlockingDeque - 線程在雙端隊列的兩端都可以插入和提取元素。
一個線程生產元素,並把它們插入到隊列的任意一端。如果雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。如果雙端隊列為空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。
BlockingDeque的方法
BlockingDeque 具有 4 組不同的方法用於插入、移除以及對雙端隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
操作 | 拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
移除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
檢查 | getFirst(o) | peekFirst(o) | 無 | 無 |
操作 | 拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
移除 | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
檢查 | getLast(o) | peekLast(o) | 無 | 無 |
四組不同的行為方式解釋:
- 拋異常:如果試圖的操作無法立即執行,拋一個異常。
- 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
- 阻塞:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
- 超時:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
BlockingDeque 繼承自BlockingQueue
BlockingDeque 接口繼承自 BlockingQueue 接口。這就意味着你可以像使用一個 BlockingQueue 那樣使用BlockingDeque。如果你這么干的話,各種插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一樣。
以下是 BlockingDeque 對 BlockingQueue 接口的方法的具體內部實現:
BlockingQueue | BlockingDeque |
---|---|
add() | addLast() |
offer() | offerLast() |
put() | putLast() |
offer(e, time, unit) | offerLast(e, time, unit) |
remove() | removeFirst() |
poll() | pollFirst() |
take() | takeFirst() |
poll(time, unit) | pollLast(time, unit) |
element() | getFirst() |
peek() | peekFirst() |
BlockingDeque 的實現
既然 BlockingDeque 是一個接口,那么你想要使用它的話就得使用它的眾多的實現類的其中一個。java.util.concurrent 包提供了以下 BlockingDeque 接口的實現類:LinkedBlockingDeque
BlockingDeque 代碼示例
以下是如何使用 BlockingDeque 方法的一個簡短代碼示例:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>(); deque.addFirst("1"); deque.addLast("2"); String two = deque.takeLast(); String one = deque.takeFirst();
9. 鏈阻塞雙端隊列 LinkedBlockingDeque
LinkedBlockingDeque 類實現了 BlockingDeque 接口。
deque(雙端隊列) 是 “Double Ended Queue” 的縮寫。因此,雙端隊列是一個你可以從任意一端插入或者抽取元素的隊列。
LinkedBlockingDeque 是一個雙端隊列,在它為空的時候,一個試圖從中抽取數據的線程將會阻塞,無論該線程是試圖從哪一端抽取數據。
以下是 LinkedBlockingDeque 實例化以及使用的示例:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>(); deque.addFirst("1"); deque.addLast("2"); String two = deque.takeLast(); String one = deque.takeFirst();
10. 並發 Map(映射) ConcurrentMap
java.util.concurrent.ConcurrentMap
java.util.concurrent.ConcurrentMap 接口表示了一個能夠對別人的訪問(插入和提取)進行並發處理的 java.util.Map。
ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。
ConcurrentMap的實現
既然 ConcurrentMap 是個接口,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具備 ConcurrentMap 接口的以下實現類:ConcurrentHashMap
ConcurrentHashMap
ConcurrentHashMap 和 java.util.HashTable 類很相似,但 ConcurrentHashMap 能夠提供比 HashTable 更好的並發性能。在你從中讀取對象的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。此外,在你向其中寫入對象的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。
另外一個不同點是,在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋ConcurrentModificationException。盡管 Iterator 的設計不是為多個線程的同時使用。
更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方文檔。
ConcurrentMap 例子
以下是如何使用 ConcurrentMap 接口的一個例子。本示例使用了 ConcurrentHashMap 實現類:
ConcurrentMap concurrentMap = new ConcurrentHashMap(); concurrentMap.put("key", "value"); Object value = concurrentMap.get("key");
11. 並發導航映射 ConcurrentNavigableMap
java.util.concurrent.ConcurrentNavigableMap 是一個支持並發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備並發訪問的能力。所謂的 “子 map” 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。
NavigableMap 中的方法不再贅述,本小節我們來看一下 ConcurrentNavigableMap 添加的方法。
headMap()
headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。
如果你對原始 map 里的元素做了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
以下示例演示了對 headMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap headMap = map.headMap("2");
headMap 將指向一個只含有鍵 “1” 的 ConcurrentNavigableMap,因為只有這一個鍵小於 “2”。關於這個方法及其重載版本具體是怎么工作的細節請參考 Java 文檔。
tailMap()
tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。
如果你對原始 map 里的元素做了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
以下示例演示了對 tailMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap tailMap = map.tailMap("2");
tailMap 將擁有鍵 “2” 和 “3”,因為它們不小於給定鍵 “2”。關於這個方法及其重載版本具體是怎么工作的細節請參考 Java 文檔。
subMap()
subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。示例如下:
ConcurrentNavigableMap map = new ConcurrentSkipListMap(); map.put("1", "one"); map.put("2", "two"); map.put("3", "three"); ConcurrentNavigableMap subMap = map.subMap("2", "3");
返回的 submap 只包含鍵 “2”,因為只有它滿足不小於 “2”,比 “3” 小。
更多方法
ConcurrentNavigableMap 接口還有其他一些方法可供使用,比如:
- descendingKeySet()
- descendingMap()
- navigableKeySet()
關於這些方法更多信息參考官方 Java 文檔。
12. 閉鎖 CountDownLatch
java.util.concurrent.CountDownLatch 是一個並發構造,它允許一個或多個線程等待一系列指定操作的完成。
CountDownLatch 以一個給定的數量初始化。countDown() 每被調用一次,這一數量就減一。通過調用 await() 方法之一,線程可以阻塞等待這一數量到達零。
以下是一個簡單示例。Decrementer 三次調用 countDown() 之后,等待中的 Waiter 才會從 await() 調用中釋放出來。
CountDownLatch latch = new CountDownLatch(3); Waiter waiter = new Waiter(latch); Decrementer decrementer = new Decrementer(latch); new Thread(waiter).start(); new Thread(decrementer).start(); Thread.sleep(4000); public class Waiter implements Runnable{ CountDownLatch latch = null; public Waiter(CountDownLatch latch) { this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Waiter Released"); } } public class Decrementer implements Runnable { CountDownLatch latch = null; public Decrementer(CountDownLatch latch) { this.latch = latch; } public void run() { try { Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); Thread.sleep(1000); this.latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }
13. 柵欄 CyclicBarrier
java.util.concurrent.CyclicBarrier 類是一種同步機制,它能夠對處理一些算法的線程實現同步。換句話講,它就是一個所有線程必須等待的一個柵欄,直到所有線程都到達這里,然后所有線程才可以繼續做其他事情。圖示如下:
兩個線程在柵欄旁等待對方。
通過調用 CyclicBarrier 對象的 await() 方法,兩個線程可以實現互相等待。一旦 N 個線程在等待 CyclicBarrier 達成,所有線程將被釋放掉去繼續運行。
創建一個 CyclicBarrier
在創建一個 CyclicBarrier 的時候你需要定義有多少線程在被釋放之前等待柵欄。創建 CyclicBarrier 示例:
CyclicBarrier barrier = new CyclicBarrier(2);
- 1
等待一個 CyclicBarrier
以下演示了如何讓一個線程等待一個 CyclicBarrier:
barrier.await();
- 1
當然,你也可以為等待線程設定一個超時時間。等待超過了超時時間之后,即便還沒有達成 N 個線程等待 CyclicBarrier 的條件,該線程也會被釋放出來。以下是定義超時時間示例:
barrier.await(10, TimeUnit.SECONDS);
- 1
滿足以下任何條件都可以讓等待 CyclicBarrier 的線程釋放:
- 最后一個線程也到達 CyclicBarrier(調用 await())
- 當前線程被其他線程打斷(其他線程調用了這個線程的 interrupt() 方法)
- 其他等待柵欄的線程被打斷
- 其他等待柵欄的線程因超時而被釋放
- 外部線程調用了柵欄的 CyclicBarrier.reset() 方法
CyclicBarrier 行動
CyclicBarrier 支持一個柵欄行動,柵欄行動是一個 Runnable 實例,一旦最后等待柵欄的線程抵達,該實例將被執行。你可以在 CyclicBarrier 的構造方法中將 Runnable 柵欄行動傳給它:
Runnable barrierAction = ... ;
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);
- 1
- 2
CyclicBarrier 示例
以下代碼演示了如何使用 CyclicBarrier:
Runnable barrier1Action = new Runnable() { public void run() { System.out.println("BarrierAction 1 executed "); } }; Runnable barrier2Action = new Runnable() { public void run() { System.out.println("BarrierAction 2 executed "); } }; CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action); CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action); CyclicBarrierRunnable barrierRunnable1 = new CyclicBarrierRunnable(barrier1, barrier2); CyclicBarrierRunnable barrierRunnable2 = new CyclicBarrierRunnable(barrier1, barrier2); new Thread(barrierRunnable1).start(); new Thread(barrierRunnable2).start();
CyclicBarrierRunnable 類:
public class CyclicBarrierRunnable implements Runnable{ CyclicBarrier barrier1 = null; CyclicBarrier barrier2 = null; public CyclicBarrierRunnable( CyclicBarrier barrier1, CyclicBarrier barrier2) { this.barrier1 = barrier1; this.barrier2 = barrier2; } public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " waiting at barrier 1"); this.barrier1.await(); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " waiting at barrier 2"); this.barrier2.await(); System.out.println(Thread.currentThread().getName() + " done!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
以上代碼控制台輸出如下。注意每個線程寫入控制台的時序可能會跟你實際執行不一樣。比如有時 Thread-0 先打印,有時 Thread-1 先打印。
Thread-0 waiting at barrier 1 Thread-1 waiting at barrier 1 BarrierAction 1 executed Thread-1 waiting at barrier 2 Thread-0 waiting at barrier 2 BarrierAction 2 executed Thread-0 done! Thread-1 done!
14. 交換機 Exchanger
java.util.concurrent.Exchanger 類表示一種兩個線程可以進行互相交換對象的會和點。這種機制圖示如下:
兩個線程通過一個 Exchanger 交換對象。
交換對象的動作由 Exchanger 的兩個 exchange() 方法的其中一個完成。以下是一個示例:
Exchanger exchanger = new Exchanger(); ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "A"); ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "B"); new Thread(exchangerRunnable1).start(); new Thread(exchangerRunnable2).start(); ExchangerRunnable 代碼: ```java public class ExchangerRunnable implements Runnable{ Exchanger exchanger = null; Object object = null; public ExchangerRunnable(Exchanger exchanger, Object object) { this.exchanger = exchanger; this.object = object; } public void run() { try { Object previous = this.object; this.object = this.exchanger.exchange(this.object); System.out.println( Thread.currentThread().getName() + " exchanged " + previous + " for " + this.object ); } catch (InterruptedException e) { e.printStackTrace(); } } }
以上程序輸出:
Thread-0 exchanged A for B Thread-1 exchanged B for A
- 1
- 2
15. 信號量 Semaphore
java.util.concurrent.Semaphore 類是一個計數信號量。這就意味着它具備兩個主要方法:
acquire()
release()
- 1
- 2
計數信號量由一個指定數量的 “許可” 初始化。每調用一次 acquire(),一個許可會被調用線程取走。每調用一次 release(),一個許可會被返還給信號量。因此,在沒有任何 release() 調用時,最多有 N 個線程能夠通過 acquire() 方法,N 是該信號量初始化時的許可的指定數量。這些許可只是一個簡單的計數器。這里沒啥奇特的地方。
Semaphore 用法
信號量主要有兩種用途:
- 保護一個重要(代碼)部分防止一次超過 N 個線程進入
- 在兩個線程之間發送信號
保護重要部分
如果你將信號量用於保護一個重要部分,試圖進入這一部分的代碼通常會首先嘗試獲得一個許可,然后才能進入重要部分(代碼塊),執行完之后,再把許可釋放掉。比如這樣:
Semaphore semaphore = new Semaphore(1); //critical section semaphore.acquire(); ... semaphore.release();
在線程之間發送信號
如果你將一個信號量用於在兩個線程之間傳送信號,通常你應該用一個線程調用 acquire() 方法,而另一個線程調用 release() 方法。
如果沒有可用的許可,acquire() 調用將會阻塞,直到一個許可被另一個線程釋放出來。同理,如果無法往信號量釋放更多許可時,一個 release() 調用也會阻塞。
通過這個可以對多個線程進行協調。比如,如果線程 1 將一個對象插入到了一個共享列表(list)之后之后調用了 acquire(),而線程 2 則在從該列表中獲取一個對象之前調用了 release(),這時你其實已經創建了一個阻塞隊列。信號量中可用的許可的數量也就等同於該阻塞隊列能夠持有的元素個數。
公平
沒有辦法保證線程能夠公平地可從信號量中獲得許可。也就是說,無法擔保掉第一個調用 acquire() 的線程會是第一個獲得一個許可的線程。如果第一個線程在等待一個許可時發生阻塞,而第二個線程前來索要一個許可的時候剛好有一個許可被釋放出來,那么它就可能會在第一個線程之前獲得許可。
如果你想要強制公平,Semaphore 類有一個具有一個布爾類型的參數的構造子,通過這個參數以告知 Semaphore 是否要強制公平。強制公平會影響到並發性能,所以除非你確實需要它否則不要啟用它。
以下是如何在公平模式創建一個 Semaphore 的示例:
Semaphore semaphore = new Semaphore(1, true);
- 1
更多方法
java.util.concurrent.Semaphore 類還有很多方法,比如:
- availablePermits()
- acquireUninterruptibly()
- drainPermits()
- hasQueuedThreads()
- getQueuedThreads()
- tryAcquire()
這些方法的細節請參考 Java 文檔。
16. 執行器服務 ExecutorService
java.util.concurrent.ExecutorService 接口表示一個異步執行機制,使我們能夠在后台執行任務。因此一個 ExecutorService 很類似於一個線程池。實際上,存在於 java.util.concurrent 包里的 ExecutorService 實現就是一個線程池實現。
ExecutorService 例子
以下是一個簡單的 ExecutorService 例子:
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
首先使用 newFixedThreadPool() 工廠方法創建一個 ExecutorService。這里創建了一個十個線程執行任務的線程池。
然后,將一個 Runnable 接口的匿名實現類傳遞給 execute() 方法。這將導致 ExecutorService 中的某個線程執行該 Runnable。
任務委派
下圖說明了一個線程是如何將一個任務委托給一個 ExecutorService 去異步執行的:
一個線程將一個任務委派給一個 ExecutorService 去異步執行。
一旦該線程將任務委派給 ExecutorService,該線程將繼續它自己的執行,獨立於該任務的執行。
ExecutorService 實現
既然 ExecutorService 是個接口,如果你想用它的話就得去使用它的實現類之一。java.util.concurrent 包提供了 ExecutorService 接口的以下實現類:
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
創建一個 ExecutorService
ExecutorService 的創建依賴於你使用的具體實現。但是你也可以使用 Executors 工廠類來創建 ExecutorService 實例。以下是幾個創建 ExecutorService 實例的例子:
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newScheduledThreadPool(10);
- 1
- 2
- 3
ExecutorService 使用
有幾種不同的方式來將任務委托給 ExecutorService 去執行:
- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(…)
- invokeAll(…)
接下來我們挨個看一下這些方法。
execute(Runnable)
execute(Runnable) 方法要求一個 java.lang.Runnable 對象,然后對它進行異步執行。以下是使用 ExecutorService 執行一個 Runnable 的示例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
沒有辦法得知被執行的 Runnable 的執行結果。如果有需要的話你得使用一個 Callable(以下將做介紹)。
submit(Runnable)
submit(Runnable) 方法也要求一個 Runnable 實現類,但它返回一個 Future 對象。這個 Future 對象可以用來檢查 Runnable 是否已經執行完畢。
以下是 ExecutorService submit() 示例:
Future future = executorService.submit(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); future.get(); //returns null if the task has finished correctly.
submit(Callable)
submit(Callable) 方法類似於 submit(Runnable) 方法,除了它所要求的參數類型之外。Callable 實例除了它的 call() 方法能夠返回一個結果之外和一個 Runnable 很相像。Runnable.run() 不能夠返回一個結果。
Callable 的結果可以通過 submit(Callable) 方法返回的 Future 對象進行獲取。以下是一個 ExecutorService Callable 示例:
Future future = executorService.submit(new Callable(){ public Object call() throws Exception { System.out.println("Asynchronous Callable"); return "Callable Result"; } }); System.out.println("future.get() = " + future.get());
以上代碼輸出:
Asynchronous Callable
future.get() = Callable Result
- 1
- 2
invokeAny()
invokeAny() 方法要求一系列的 Callable 或者其子接口的實例對象。調用這個方法並不會返回一個 Future,但它返回其中一個 Callable 對象的結果。無法保證返回的是哪個 Callable 的結果 - 只能表明其中一個已執行結束。
如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。
以下是示例代碼:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = executorService.invokeAny(callables); System.out.println("result = " + result); executorService.shutdown();
上述代碼將會打印出給定 Callable 集合中的一個的執行結果。我自己試着執行了它幾次,結果始終在變。有時是 “Task 1”,有時是 “Task 2” 等等。
invokeAll()
invokeAll() 方法將調用你在集合中傳給 ExecutorService 的所有 Callable 對象。invokeAll() 返回一系列的 Future 對象,通過它們你可以獲取每個 Callable 的執行結果。
記住,一個任務可能會由於一個異常而結束,因此它可能沒有 “成功”。無法通過一個 Future 對象來告知我們是兩種結束中的哪一種。
以下是一個代碼示例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future : futures){ System.out.println("future.get = " + future.get()); } executorService.shutdown();
ExecutorService 關閉
使用完 ExecutorService 之后你應該將其關閉,以使其中的線程不再運行。
比如,如果你的應用是通過一個 main() 方法啟動的,之后 main 方法退出了你的應用,如果你的應用有一個活動的 ExexutorService 它將還會保持運行。ExecutorService 里的活動線程阻止了 JVM 的關閉。
要終止 ExecutorService 里的線程你需要調用 ExecutorService 的 shutdown() 方法。ExecutorService 並不會立即關閉,但它將不再接受新的任務,而且一旦所有線程都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown() 被調用之前所有提交給 ExecutorService 的任務都被執行。
如果你想要立即關閉 ExecutorService,你可以調用 shutdownNow() 方法。這樣會立即嘗試停止所有執行中的任務,並忽略掉那些已提交但尚未開始處理的任務。無法擔保執行任務的正確執行。可能它們被停止了,也可能已經執行結束。
17. 線程池執行者 ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor 是 ExecutorService 接口的一個實現。ThreadPoolExecutor 使用其內部池中的線程執行給定任務(Callable 或者 Runnable)。
ThreadPoolExecutor 包含的線程池能夠包含不同數量的線程。池中線程的數量由以下變量決定:
- corePoolSize
- maximumPoolSize
當一個任務委托給線程池時,如果池中線程數量低於 corePoolSize,一個新的線程將被創建,即使池中可能尚有空閑線程。
如果內部任務隊列已滿,而且有至少 corePoolSize 正在運行,但是運行線程的數量低於 maximumPoolSize,一個新的線程將被創建去執行該任務。
ThreadPoolExecutor 圖解:
一個 ThreadPoolExecutor
創建一個 ThreadPoolExecutor
ThreadPoolExecutor 有若干個可用構造子。比如:
int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 5000; ExecutorService threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
但是,除非你確實需要顯式為 ThreadPoolExecutor 定義所有參數,使用 java.util.concurrent.Executors 類中的工廠方法之一會更加方便,正如 ExecutorService 小節所述。
18. 定時執行者服務 ScheduledExecutorService
java.util.concurrent.ScheduledExecutorService 是一個 ExecutorService, 它能夠將任務延后執行,或者間隔固定時間多次執行。 任務由一個工作者線程異步執行,而不是由提交任務給 ScheduledExecutorService 的那個線程執行。
ScheduledExecutorService 例子
以下是一個簡單的 ScheduledExecutorService 示例:
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5); ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() { public Object call() throws Exception { System.out.println("Executed!"); return "Called!"; } }, 5, TimeUnit.SECONDS);
首先一個內置 5 個線程的 ScheduledExecutorService 被創建。之后一個 Callable 接口的匿名類示例被創建然后傳遞給 schedule() 方法。后邊的倆參數定義了 Callable 將在 5 秒鍾之后被執行。
ScheduledExecutorService 實現
既然 ScheduledExecutorService 是一個接口,你要用它的話就得使用 java.util.concurrent 包里對它的某個實現類。ScheduledExecutorService 具有以下實現類:ScheduledThreadPoolExecutor
創建一個 ScheduledExecutorService
如何創建一個 ScheduledExecutorService 取決於你采用的它的實現類。但是你也可以使用 Executors 工廠類來創建一個 ScheduledExecutorService 實例。比如:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
- 1
ScheduledExecutorService 使用
一旦你創建了一個 ScheduledExecutorService,你可以通過調用它的以下方法:
- schedule (Callable task, long delay, TimeUnit timeunit)
- schedule (Runnable task, long delay, TimeUnit timeunit)
- scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
- scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
下面我們就簡單看一下這些方法。
schedule (Callable task, long delay, TimeUnit timeunit)
- 1
這個方法計划指定的 Callable 在給定的延遲之后執行。
這個方法返回一個 ScheduledFuture,通過它你可以在它被執行之前對它進行取消,或者在它執行之后獲取結果。
以下是一個示例:
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5); ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(new Callable() { public Object call() throws Exception { System.out.println("Executed!"); return "Called!"; } }, 5, TimeUnit.SECONDS); System.out.println("result = " + scheduledFuture.get()); scheduledExecutorService.shutdown();
示例輸出結果:
Executed! result = Called!
schedule (Runnable task, long delay, TimeUnit timeunit)
- 1
除了 Runnable 無法返回一個結果之外,這一方法工作起來就像以一個 Callable 作為一個參數的那個版本的方法一樣,因此 ScheduledFuture.get() 在任務執行結束之后返回 null。
scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
- 1
這一方法規划一個任務將被定期執行。該任務將會在首個 initialDelay 之后得到執行,然后每個 period 時間之后重復執行。
如果給定任務的執行拋出了異常,該任務將不再執行。如果沒有任何異常的話,這個任務將會持續循環執行到 ScheduledExecutorService 被關閉。
如果一個任務占用了比計划的時間間隔更長的時候,下一次執行將在當前執行結束執行才開始。計划任務在同一時間不會有多個線程同時執行。
scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
- 1
除了 period 有不同的解釋之外這個方法和 scheduleAtFixedRate() 非常像。
scheduleAtFixedRate() 方法中,period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。
而在本方法中,period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。因此這個延遲是執行結束之間的間隔,而不是執行開始之間的間隔。
ScheduledExecutorService 關閉
正如 ExecutorService,在你使用結束之后你需要把 ScheduledExecutorService 關閉掉。否則他將導致 JVM 繼續運行,即使所有其他線程已經全被關閉。
你可以使用從 ExecutorService 接口繼承來的 shutdown() 或 shutdownNow() 方法將 ScheduledExecutorService 關閉。參見 ExecutorService 關閉部分以獲取更多信息。
19. 使用 ForkJoinPool 進行分叉和合並
ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務可以繼續分割成更小的子任務,只要它還能分割。可能聽起來有些抽象,因此本節中我們將會解釋 ForkJoinPool 是如何工作的,還有任務分割是如何進行的。
分叉和合並解釋
在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合並的原理。
分叉和合並原理包含兩個遞歸進行的步驟。兩個步驟分別是分叉步驟和合並步驟。
分叉
一個使用了分叉和合並原理的任務可以將自己分叉(分割)為更小的子任務,這些子任務可以被並發執行。如下圖所示:
通過把自己分割成多個子任務,每個子任務可以由不同的 CPU 並行執行,或者被同一個 CPU 上的不同線程執行。
只有當給的任務過大,把它分割成幾個子任務才有意義。把任務分割成子任務有一定開銷,因此對於小型任務,這個分割的消耗可能比每個子任務並發執行的消耗還要大。
什么時候把一個任務分割成子任務是有意義的,這個界限也稱作一個閥值。這要看每個任務對有意義閥值的決定。很大程度上取決於它要做的工作的種類。
合並
當一個任務將自己分割成若干子任務之后,該任務將進入等待所有子任務的結束之中。
一旦子任務執行結束,該任務可以把所有結果合並到同一個結果。圖示如下:
當然,並非所有類型的任務都會返回一個結果。如果這個任務並不返回一個結果,它只需等待所有子任務執行完畢。也就不需要結果的合並啦。
ForkJoinPool
ForkJoinPool 是一個特殊的線程池,它的設計是為了更好的配合 分叉-和-合並 任務分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。
創建一個 ForkJoinPool
你可以通過其構造子創建一個 ForkJoinPool。作為傳遞給 ForkJoinPool 構造子的一個參數,你可以定義你期望的並行級別。並行級別表示你想要傳遞給 ForkJoinPool 的任務所需的線程或 CPU 數量。以下是一個 ForkJoinPool 示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
- 1
這個示例創建了一個並行級別為 4 的 ForkJoinPool。
提交任務到 ForkJoinPool
就像提交任務到 ExecutorService 那樣,把任務提交到 ForkJoinPool。你可以提交兩種類型的任務。一種是沒有任何返回值的(一個 “行動”),另一種是有返回值的(一個”任務”)。這兩種類型分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種類型的任務,以及如何對它們進行提交。
RecursiveAction
RecursiveAction 是一種沒有任何返回值的任務。它只是做一些工作,比如寫數據到磁盤,然后就退出了。
一個 RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨立的線程或者 CPU 執行。
你可以通過繼承來實現一個 RecursiveAction。示例如下:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveAction; public class MyRecursiveAction extends RecursiveAction { private long workLoad = 0; public MyRecursiveAction(long workLoad) { this.workLoad = workLoad; } @Override protected void compute() { //if work is above threshold, break tasks up into smaller tasks if(this.workLoad > 16) { System.out.println("Splitting workLoad : " + this.workLoad); List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>(); subtasks.addAll(createSubtasks()); for(RecursiveAction subtask : subtasks){ subtask.fork(); } } else { System.out.println("Doing workLoad myself: " + this.workLoad); } } private List<MyRecursiveAction> createSubtasks() { List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>(); MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2); MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2); subtasks.add(subtask1); subtasks.add(subtask2); return subtasks; } }
例子很簡單。MyRecursiveAction 將一個虛構的 workLoad 作為參數傳給自己的構造子。如果 workLoad 高於一個特定閥值,該工作將被分割為幾個子工作,子工作繼續分割。如果 workLoad 低於特定閥值,該工作將由 MyRecursiveAction 自己執行。
你可以這樣規划一個 MyRecursiveAction 的執行:
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24); forkJoinPool.invoke(myRecursiveAction);
- 1
- 2
- 3
RecursiveTask
RecursiveTask 是一種會返回結果的任務。它可以將自己的工作分割為若干更小任務,並將這些子任務的執行結果合並到一個集體結果。可以有幾個水平的分割和合並。以下是一個 RecursiveTask 示例:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveTask; public class MyRecursiveTask extends RecursiveTask<Long> { private long workLoad = 0; public MyRecursiveTask(long workLoad) { this.workLoad = workLoad; } protected Long compute() { //if work is above threshold, break tasks up into smaller tasks if(this.workLoad > 16) { System.out.println("Splitting workLoad : " + this.workLoad); List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>(); subtasks.addAll(createSubtasks()); for(MyRecursiveTask subtask : subtasks){ subtask.fork(); } long result = 0; for(MyRecursiveTask subtask : subtasks) { result += subtask.join(); } return result; } else { System.out.println("Doing workLoad myself: " + this.workLoad); return workLoad * 3; } } private List<MyRecursiveTask> createSubtasks() { List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>(); MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2); MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2); subtasks.add(subtask1); subtasks.add(subtask2); return subtasks; } }
除了有一個結果返回之外,這個示例和 RecursiveAction 的例子很像。MyRecursiveTask 類繼承自 RecursiveTask,這也就意味着它將返回一個 Long 類型的結果。
MyRecursiveTask 示例也會將工作分割為子任務,並通過 fork() 方法對這些子任務計划執行。
此外,本示例還通過調用每個子任務的 join() 方法收集它們返回的結果。子任務的結果隨后被合並到一個更大的結果,並最終將其返回。對於不同級別的遞歸,這種子任務的結果合並可能會發生遞歸。
你可以這樣規划一個 RecursiveTask:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128); long mergedResult = forkJoinPool.invoke(myRecursiveTask); System.out.println("mergedResult = " + mergedResult);
注意是如何通過 ForkJoinPool.invoke() 方法的調用來獲取最終執行結果的。
ForkJoinPool 評論
貌似並非每個人都對 Java 7 里的 ForkJoinPool 滿意:《一個 Java 分叉-合並 帶來的災禍》。
在你計划在自己的項目里使用 ForkJoinPool 之前最好讀一下該篇文章。
20. 鎖Lock
java.util.concurrent.locks.Lock 是一個類似於 synchronized 塊的線程同步機制。但是 Lock 比 synchronized 塊更加靈活、精細。
順便說一下,在我的《Java 並發指南》中我對如何實現你自己的鎖進行了描述。
Java Lock 例子
既然 Lock 是一個接口,在你的程序里需要使用它的實現類之一來使用它。以下是一個簡單示例:
Lock lock = new ReentrantLock(); lock.lock(); //critical section lock.unlock();
首先創建了一個 Lock 對象。之后調用了它的 lock() 方法。這時候這個 lock 實例就被鎖住啦。任何其他再過來調用 lock() 方法的線程將會被阻塞住,直到鎖定 lock 實例的線程調用了 unlock() 方法。最后 unlock() 被調用了,lock 對象解鎖了,其他線程可以對它進行鎖定了。
Java Lock 實現
java.util.concurrent.locks 包提供了以下對 Lock 接口的實現類:ReentrantLock
Lock 和 synchronized 代碼塊的主要不同點
一個 Lock 對象和一個 synchronized 代碼塊之間的主要不同點是:
- synchronized 代碼塊不能夠保證進入訪問等待的線程的先后順序。
- 你不能夠傳遞任何參數給一個 synchronized 代碼塊的入口。因此,對於 synchronized 代碼塊的訪問等待設置超時時間是不可能的事情。
- synchronized 塊必須被完整地包含在單個方法里。而一個 Lock 對象可以把它的 lock() 和 unlock() 方法的調用放在不同的方法里。
Lock 的方法
Lock 接口具有以下主要方法:
- lock()
lock() 將 Lock 實例鎖定。如果該 Lock 實例已被鎖定,調用 lock() 方法的線程將會阻塞,直到 Lock 實例解鎖。
- lockInterruptibly()
lockInterruptibly() 方法將會被調用線程鎖定,除非該線程被打斷。此外,如果一個線程在通過這個方法來鎖定 Lock 對象時進入阻塞等待,而它被打斷了的話,該線程將會退出這個方法調用。
- tryLock()
tryLock() 方法試圖立即鎖定 Lock 實例。如果鎖定成功,它將返回 true,如果 Lock 實例已被鎖定該方法返回 false。這一方法永不阻塞。
- tryLock(long timeout, TimeUnit timeUnit)
tryLock(long timeout, TimeUnit timeUnit) 的工作類似於 tryLock() 方法,除了它在放棄鎖定 Lock 之前等待一個給定的超時時間之外。
- unlock()
unlock() 方法對 Lock 實例解鎖。一個 Lock 實現將只允許鎖定了該對象的線程來調用此方法。其他(沒有鎖定該 Lock 對象的線程)線程對 unlock() 方法的調用將會拋一個未檢查異常(RuntimeException)。
21. 讀寫鎖 ReadWriteLock
java.util.concurrent.locks.ReadWriteLock 讀寫鎖是一種先進的線程鎖機制。它能夠允許多個線程在同一時間對某特定資源進行讀取,但同一時間內只能有一個線程對其進行寫入。
讀寫鎖的理念在於多個線程能夠對一個共享資源進行讀取,而不會導致並發問題。並發問題的發生場景在於對一個共享資源的讀和寫操作的同時進行,或者多個寫操作並發進行。
本節只討論 Java 內置 ReadWriteLock。如果你想了解 ReadWriteLock 背后的實現原理,請參考我的《Java 並發指南》主題中的《讀寫鎖》小節。
ReadWriteLock 鎖規則
一個線程在對受保護資源在讀或者寫之前對 ReadWriteLock 鎖定的規則如下:
讀鎖:如果沒有任何寫操作線程鎖定 ReadWriteLock,並且沒有任何寫操作線程要求一個寫鎖(但還沒有獲得該鎖)。因此,可以有多個讀操作線程對該鎖進行鎖定。
寫鎖:如果沒有任何讀操作或者寫操作。因此,在寫操作的時候,只能有一個線程對該鎖進行鎖定。
ReadWriteLock 實現
ReadWriteLock 是個接口,如果你想用它的話就得去使用它的實現類之一。java.util.concurrent.locks 包提供了 ReadWriteLock 接口的以下實現類:ReentrantReadWriteLock
ReadWriteLock 代碼示例
以下是 ReadWriteLock 的創建以及如何使用它進行讀、寫鎖定的簡單示例代碼:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readWriteLock.readLock().lock(); // multiple readers can enter this section // if not locked for writing, and not writers waiting // to lock for writing. readWriteLock.readLock().unlock(); readWriteLock.writeLock().lock(); // only one writer can enter this section, // and only if no threads are currently reading. readWriteLock.writeLock().unlock();
注意如何使用 ReadWriteLock 對兩種鎖實例的持有。一個對讀訪問進行保護,一個隊寫訪問進行保護。
22. 原子性布爾 AtomicBoolean
AtomicBoolean 類為我們提供了一個可以用原子方式進行讀和寫的布爾值,它還擁有一些先進的原子性操作,比如 compareAndSet()。AtomicBoolean 類位於 java.util.concurrent.atomic 包,完整類名是為 java.util.concurrent.atomic.AtomicBoolean。本小節描述的 AtomicBoolean 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。
AtomicBoolean 背后的設計理念在我的《Java 並發指南》主題的《比較和交換》小節有解釋。
創建一個 AtomicBoolean
你可以這樣創建一個 AtomicBoolean:
AtomicBoolean atomicBoolean = new AtomicBoolean();
- 1
以上示例新建了一個默認值為 false 的 AtomicBoolean。
如果你想要為 AtomicBoolean 實例設置一個顯式的初始值,那么你可以將初始值傳給 AtomicBoolean 的構造子:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
- 1
獲取 AtomicBoolean 的值
你可以通過使用 get() 方法來獲取一個 AtomicBoolean 的值。示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true); boolean value = atomicBoolean.get();
以上代碼執行后 value 變量的值將為 true。
設置 AtomicBoolean 的值
你可以通過使用 set() 方法來設置一個 AtomicBoolean 的值。示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true); atomicBoolean.set(false);
以上代碼執行后 AtomicBoolean 的值為 false。
交換 AtomicBoolean 的值
你可以通過 getAndSet() 方法來交換一個 AtomicBoolean 實例的值。getAndSet() 方法將返回 AtomicBoolean 當前的值,並將為 AtomicBoolean 設置一個新值。示例如下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true); boolean oldValue = atomicBoolean.getAndSet(false);
- 1
- 2
以上代碼執行后 oldValue 變量的值為 true,atomicBoolean 實例將持有 false 值。代碼成功將 AtomicBoolean 當前值 ture 交換為 false。
比較並設置 AtomicBoolean 的值
compareAndSet() 方法允許你對 AtomicBoolean 的當前值與一個期望值進行比較,如果當前值等於期望值的話,將會對 AtomicBoolean 設定一個新值。compareAndSet() 方法是原子性的,因此在同一時間之內有單個線程執行它。因此 compareAndSet() 方法可被用於一些類似於鎖的同步的簡單實現。
以下是一個 compareAndSet() 示例:
AtomicBoolean atomicBoolean = new AtomicBoolean(true); boolean expectedValue = true; boolean newValue = false; boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue);
本示例對 AtomicBoolean 的當前值與 true 值進行比較,如果相等,將 AtomicBoolean 的值更新為 false。
23. 原子性整型 AtomicInteger
AtomicInteger 類為我們提供了一個可以進行原子性讀和寫操作的 int 變量,它還包含一系列先進的原子性操作,比如 compareAndSet()。AtomicInteger 類位於 java.util.concurrent.atomic 包,因此其完整類名為 java.util.concurrent.atomic.AtomicInteger。本小節描述的 AtomicInteger 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。
AtomicInteger 背后的設計理念在我的《Java 並發指南》主題的《比較和交換》小節有解釋。
創建一個 AtomicInteger
創建一個 AtomicInteger 示例如下:
AtomicInteger atomicInteger = new AtomicInteger();
- 1
本示例將創建一個初始值為 0 的 AtomicInteger。
如果你想要創建一個給定初始值的 AtomicInteger,你可以這樣:
AtomicInteger atomicInteger = new AtomicInteger(123);
- 1
本示例將 123 作為參數傳給 AtomicInteger 的構造子,它將設置 AtomicInteger 實例的初始值為 123。
獲取 AtomicInteger 的值
你可以使用 get() 方法獲取 AtomicInteger 實例的值。示例如下:
AtomicInteger atomicInteger = new AtomicInteger(123); int theValue = atomicInteger.get();
- 1
- 2
設置 AtomicInteger 的值
你可以通過 set() 方法對 AtomicInteger 的值進行重新設置。以下是 AtomicInteger.set() 示例:
AtomicInteger atomicInteger = new AtomicInteger(123); atomicInteger.set(234);
- 1
- 2
以上示例創建了一個初始值為 123 的 AtomicInteger,而在第二行將其值更新為 234。
比較並設置 AtomicInteger 的值
AtomicInteger 類也通過了一個原子性的 compareAndSet() 方法。這一方法將 AtomicInteger 實例的當前值與期望值進行比較,如果二者相等,為 AtomicInteger 實例設置一個新值。AtomicInteger.compareAndSet() 代碼示例:
AtomicInteger atomicInteger = new AtomicInteger(123); int expectedValue = 123; int newValue = 234; atomicInteger.compareAndSet(expectedValue, newValue);
本示例首先新建一個初始值為 123 的 AtomicInteger 實例。然后將 AtomicInteger 與期望值 123 進行比較,如果相等,將 AtomicInteger 的值更新為 234。
增加 AtomicInteger 值
AtomicInteger 類包含有一些方法,通過它們你可以增加 AtomicInteger 的值,並獲取其值。這些方法如下:
- addAndGet()
- getAndAdd()
- getAndIncrement()
- incrementAndGet()
第一個 addAndGet() 方法給 AtomicInteger 增加了一個值,然后返回增加后的值。getAndAdd() 方法為 AtomicInteger 增加了一個值,但返回的是增加以前的 AtomicInteger 的值。具體使用哪一個取決於你的應用場景。以下是這兩種方法的示例:
AtomicInteger atomicInteger = new AtomicInteger(); System.out.println(atomicInteger.getAndAdd(10)); System.out.println(atomicInteger.addAndGet(10));
本示例將打印出 0 和 20。例子中,第二行拿到的是加 10 之前的 AtomicInteger 的值。加 10 之前的值是 0。第三行將 AtomicInteger 的值再加 10,並返回加操作之后的值。該值現在是為 20。
你當然也可以使用這倆方法為 AtomicInteger 添加負值。結果實際是一個減法操作。
getAndIncrement() 和 incrementAndGet() 方法類似於 getAndAdd() 和 addAndGet(),但每次只將 AtomicInteger 的值加 1。
減小 AtomicInteger 的值
AtomicInteger 類還提供了一些減小 AtomicInteger 的值的原子性方法。這些方法是:
- decrementAndGet()
- getAndDecrement()
decrementAndGet() 將 AtomicInteger 的值減一,並返回減一后的值。getAndDecrement() 也將 AtomicInteger 的值減一,但它返回的是減一之前的值。
24. 原子性長整型 AtomicLong
AtomicLong 類為我們提供了一個可以進行原子性讀和寫操作的 long 變量,它還包含一系列先進的原子性操作,比如 compareAndSet()AtomicLong 類位於 java.util.concurrent.atomic 包,因此其完整類名為 java.util.concurrent.atomic.AtomicLong。本小節描述的 AtomicLong 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。
AtomicLong 背后的設計理念在我的《Java 並發指南》主題的《比較和交換》小節有解釋。
創建一個 AtomicLong
創建一個 AtomicLong 如下:
AtomicLong atomicLong = new AtomicLong();
- 1
將創建一個初始值為 0 的 AtomicLong。
如果你想創建一個指定初始值的 AtomicLong,可以:
AtomicLong atomicLong = new AtomicLong(123);
- 1
本示例將 123 作為參數傳遞給 AtomicLong 的構造子,后者將 AtomicLong 實例的初始值設置為 123。
獲取 AtomicLong 的值
你可以通過 get() 方法獲取 AtomicLong 的值。AtomicLong.get() 示例:
AtomicLong atomicLong = new AtomicLong(123); long theValue = atomicLong.get();
設置 AtomicLong 的值
你可以通過 set() 方法設置 AtomicLong 實例的值。一個 AtomicLong.set() 的示例:
AtomicLong atomicLong = new AtomicLong(123); atomicLong.set(234);
本示例新建了一個初始值為 123 的 AtomicLong,第二行將其值設置為 234。
比較並設置 AtomicLong 的值
AtomicLong 類也有一個原子性的 compareAndSet() 方法。這一方法將 AtomicLong 實例的當前值與一個期望值進行比較,如果兩種相等,為 AtomicLong 實例設置一個新值。AtomicLong.compareAndSet() 使用示例:
AtomicLong atomicLong = new AtomicLong(123); long expectedValue = 123; long newValue = 234; atomicLong.compareAndSet(expectedValue, newValue);
本示例新建了一個初始值為 123 的 AtomicLong。然后將 AtomicLong 的當前值與期望值 123 進行比較,如果相等的話,AtomicLong 的新值將變為 234。
增加 AtomicLong 值
AtomicLong 具備一些能夠增加 AtomicLong 的值並返回自身值的方法。這些方法如下:
- addAndGet()
- getAndAdd()
- getAndIncrement()
- incrementAndGet()
第一個方法 addAndGet() 將 AtomicLong 的值加一個數字,並返回增加后的值。第二個方法 getAndAdd() 也將 AtomicLong 的值加一個數字,但返回的是增加前的 AtomicLong 的值。具體使用哪一個取決於你自己的場景。示例如下:
AtomicLong atomicLong = new AtomicLong(); System.out.println(atomicLong.getAndAdd(10)); System.out.println(atomicLong.addAndGet(10));
- 1
- 2
- 3
本示例將打印出 0 和 20。例子中,第二行拿到的是加 10 之前的 AtomicLong 的值。加 10 之前的值是 0。第三行將 AtomicLong 的值再加 10,並返回加操作之后的值。該值現在是為 20。
你當然也可以使用這倆方法為 AtomicLong 添加負值。結果實際是一個減法操作。
getAndIncrement() 和 incrementAndGet() 方法類似於 getAndAdd() 和 addAndGet(),但每次只將 AtomicLong 的值加 1。
減小 AtomicLong 的值
AtomicLong 類還提供了一些減小 AtomicLong 的值的原子性方法。這些方法是:
- decrementAndGet()
- getAndDecrement()
decrementAndGet() 將 AtomicLong 的值減一,並返回減一后的值。getAndDecrement() 也將 AtomicLong 的值減一,但它返回的是減一之前的值。
25. 原子性引用型 AtomicReference
AtomicReference 提供了一個可以被原子性讀和寫的對象引用變量。原子性的意思是多個想要改變同一個 AtomicReference 的線程不會導致 AtomicReference 處於不一致的狀態。AtomicReference 還有一個 compareAndSet() 方法,通過它你可以將當前引用於一個期望值(引用)進行比較,如果相等,在該 AtomicReference 對象內部設置一個新的引用。
創建一個 AtomicReference
創建 AtomicReference 如下:
AtomicReference atomicReference = new AtomicReference();
- 1
如果你需要使用一個指定引用創建 AtomicReference,可以:
String initialReference = "the initially referenced string"; AtomicReference atomicReference = new AtomicReference(initialReference);
- 1
- 2
創建泛型 AtomicReference
你可以使用 Java 泛型來創建一個泛型 AtomicReference。示例:
AtomicReference<String> atomicStringReference =
new AtomicReference<String>();
- 1
- 2
你也可以為泛型 AtomicReference 設置一個初始值。示例:
String initialReference = "the initially referenced string"; AtomicReference<String> atomicStringReference = new AtomicReference<String>(initialReference);
- 1
- 2
獲取 AtomicReference 引用
你可以通過 AtomicReference 的 get() 方法來獲取保存在 AtomicReference 里的引用。如果你的 AtomicReference 是非泛型的,get() 方法將返回一個 Object 類型的引用。如果是泛型化的,get() 將返回你創建 AtomicReference 時聲明的那個類型。
先來看一個非泛型的 AtomicReference get() 示例:
AtomicReference atomicReference = new AtomicReference("first value referenced"); String reference = (String) atomicReference.get();
- 1
- 2
注意如何對 get() 方法返回的引用強制轉換為 String。
泛型化的 AtomicReference 示例:
AtomicReference<String> atomicReference = new AtomicReference<String>("first value referenced"); String reference = atomicReference.get();
- 1
- 2
編譯器知道了引用的類型,所以我們無需再對 get() 返回的引用進行強制轉換了。
設置 AtomicReference 引用
你可以使用 get() 方法對 AtomicReference 里邊保存的引用進行設置。如果你定義的是一個非泛型 AtomicReference,set() 將會以一個 Object 引用作為參數。如果是泛型化的 AtomicReference,set() 方法將只接受你定義給的類型。
AtomicReference set() 示例:
AtomicReference atomicReference = new AtomicReference(); atomicReference.set("New object referenced");
這個看起來非泛型和泛型化的沒啥區別。真正的區別在於編譯器將對你能夠設置給一個泛型化的 AtomicReference 參數類型進行限制。
比較並設置 AtomicReference 引用
AtomicReference 類具備了一個很有用的方法:compareAndSet()。compareAndSet() 可以將保存在 AtomicReference 里的引用於一個期望引用進行比較,如果兩個引用是一樣的(並非 equals() 的相等,而是 == 的一樣),將會給AtomicReference 實例設置一個新的引用。
如果 compareAndSet() 為 AtomicReference 設置了一個新的引用,compareAndSet() 將返回 true。否則compareAndSet() 返回 false。
AtomicReference compareAndSet() 示例:
String initialReference = "initial value referenced"; AtomicReference<String> atomicStringReference = new AtomicReference<String>(initialReference); String newReference = "new value referenced"; boolean exchanged = atomicStringReference.compareAndSet(initialReference, newReference); System.out.println("exchanged: " + exchanged); exchanged = atomicStringReference.compareAndSet(initialReference, newReference); System.out.println("exchanged: " + exchanged);
本示例創建了一個帶有一個初始引用的泛型化的 AtomicReference。之后兩次調用 comparesAndSet()來對存儲值和期望值進行對比,如果二者一致,為 AtomicReference 設置一個新的引用。第一次比較,存儲的引用(initialReference)和期望的引用(initialReference)一致,所以一個新的引用(newReference)被設置給 AtomicReference,compareAndSet() 方法返回 true。第二次比較時,存儲的引用(newReference)和期望的引用(initialReference)不一致,因此新的引用沒有被設置給 AtomicReference,compareAndSet() 方法返回 false。
原文鏈接:http://tutorials.jenkov.com/java-util-concurrent/index.html
譯文鏈接:http://blog.csdn.net/defonds/article/details/44021605#t8