1. 什么是Monitor?
Monitor其實是一種同步工具,也可以說是一種同步機制,它通常被描述為一個對象,主要特點是:
- 對象的所有方法都被“互斥”的執行。好比一個Monitor只有一個運行“許可”,任一個線程進入任何一個方法都需要獲得這個“許可”,離開時把許可歸還。
- 通常提供singal機制:允許正持有“許可”的線程暫時放棄“許可”,等待某個謂詞成真(條件變量),而條件成立后,當前進程可以“通知”正在等待這個條件變量的線程,讓他可以重新去獲得運行許可。
Monitor對象可以被多線程安全地訪問。關於“互斥”與“為什么要互斥”,我就不傻X兮兮解釋了;而關於Monitor的singal機制,歷史上曾經出現過兩大門派,分別是Hoare派和Mesa派(上過海波老師OS課的SS同學應該對這個有印象),我還是用我的理解通俗地庸俗地解釋一下:
- Hoare派的singal機制江湖又稱“Blocking condition variable”,特點是,當“發通知”的線程發出通知后,立即失去許可,並“親手”交給等待者,等待者運行完畢后再將許可交還通知者。在這種機制里,可以等待者拿到許可后,謂詞肯定為真——也就是說等待者不必再次檢查條件成立與否,所以對條件的判斷可以使用“if”,不必“while”
- Mesa派的signal機制又稱“Non-Blocking condition variable”, 與Hoare不同,通知者發出通知后,並不立即失去許可,而是把聞風前來等待者安排在ready queue里,等到schedule時有機會去拿到“許可”。這種機制里,等待者拿到許可后不能確定在這個時間差里是否有別的等待者進入過Monitor,因此不能保證謂詞一定為真,所以對條件的判斷必須使用“while”
這兩種方案可以說各有利弊,但Mesa派在后來的盟主爭奪中漸漸占了上風,被大多數實現所采用,有人給這種signal另外起了個別名叫“notify”,想必你也知道,Java采取的就是這個機制。
2. Monitor與Java不得不說的故事
子曰:“Java對象是天生的Monitor。”每一個Java對象都有成為Monitor的“潛質”。這是為什么?因為在Java的設計中,每一個對象自打娘胎里出來,就帶了一把看不見的鎖,通常我們叫“內部鎖”,或者“Monitor鎖”,或者“Intrinsic lock”。為了裝逼起見,我們就叫它Intrinsic lock吧。有了這個鎖的幫助,只要把類的所有對象方法都用synchronized關鍵字修飾,並且所有域都為私有(也就是只能通過方法訪問對象狀態),就是一個貨真價實的Monitor了。比如,我們舉一個大俗例吧:
Example Source Code [http://www.cnblogs.com/tomsheep/]
public class Account {
private int balance;
public Account(int balance) {
this.balance = balance;
}
synchronized public boolean withdraw(int amount){
if(balance<amount)
return false;
balance -= amount;
return true;
}
synchronized public void deposit(int amount){
balance +=amount;
}
}
3. synchronized關鍵字
上面我們已經看到synchronized的一種用法,用來修飾方法,表示進入該方法需要對Intrinsic lock加鎖,離開時放鎖。synchronized可以用在程序塊中,顯示說明對“哪個對象的Intrinsic lock加鎖”,比如
Example Source Code [http://www.cnblogs.com/tomsheep/]
synchronized public void deposit(int amount){
balance +=amount;
}
// 等價於
public void deposit(int amount){
synchronized(this){
balance +=amount;
}
}
這時,你可能就要問了,你不是說任何對象都有intrinsic lock么?而synchronized關鍵字又可以顯示指定去鎖誰,那我們是不是可以這樣做:
Example Source Code [http://www.cnblogs.com/tomsheep/]
public class Account {
private int balance;
private Object lock = new Object();
public Account(int balance) {
this.balance = balance;
}
public boolean withdraw(int amount){
synchronized (lock) {
if(balance<amount)
return false;
balance -= amount;
return true;
}
}
public void deposit(int amount){
synchronized (lock) {
balance +=amount;
}
}
}
不用this的內部鎖,而是用另外任意一個對象的內部鎖來完成完全相同的任務?沒錯,完全可以。不過,需要注意的是,這時候,你實際上禁止了“客戶代碼加鎖”的行為。前幾天BBS上簡哥有一貼提到的bug其實就是這個,這個時候使用這份代碼的客戶程序如果想當然地認為Account的同步是基於其內部鎖的,並且傻X兮兮地寫了類似下面的代碼:
Example Source Code [http://www.cnblogs.com/tomsheep/]
public static void main(String[] args) {
Account account =new Account(1000);
//some threads modifying account through Account’s methods...
synchronized (account) {
;//blabla
}
}
自認為后面的同步快對account加了鎖,期間的操作不會被其余通過Account方法操作account對象的線程所干擾,那就太悲劇了。因為他們並不相干,鎖住了不同的鎖。
4. Java中的條件變量
正如我們前面所說,Java采取了wait/notify機制來作為intrinsic lock 相關的條件變量,表示為等待某一條件成立的條件隊列——說到這里順帶插一段,條件隊列必然與某個鎖相關,並且語義上關聯某個謂詞(條件隊列、鎖、條件謂詞就是吉祥的一家)。所以,在使用wait/notify方法時,必然是已經獲得相關鎖了的,在進一步說,一個推論就是“wait/notify 方法只能出現在相應的同步塊中”。如果不呢?就像下面一段(notify表示的謂詞是“帳戶里有錢啦~”):
Example Source Code [http://www.cnblogs.com/tomsheep/]
public void deposit(int amount){
balance +=amount;
notify();
}
//或者這樣:
public void deposit(int amount){
synchronized (lock) {
balance +=amount;
notify();
}
}
這兩段都是錯的,第一段沒有在同步塊里,而第二段拿到的是lock的內部鎖,調用的卻是this.notify(),讓人遺憾。運行時他們都會拋IllegalMonitorStateException異常——唉,想前一陣我參加一次筆試的時候,有一道題就是這個,讓你選所給代碼會拋什么異常,我當時就傻了,想這考得也太偏了吧,現在看看,確實是很基本的概念,當初被虐是壓根沒有理解wait/notify機制的緣故。那怎么寫是對的呢?
Example Source Code [http://www.cnblogs.com/tomsheep/]
public void deposit(int amount){
synchronized (lock) {
balance +=amount;
lock.notify();
}
}
//或者(取決於你采用的鎖):
synchronized public void deposit(int amount){
balance +=amount;
notify();
}
5.這就夠了嗎?
看上去,Java的內部鎖和wait/notify機制已經可以滿足任何同步需求了,不是嗎?em…可以這么說,但也可以說,不那么完美。有兩個問題:
- 鎖不夠用
有時候,我們的類里不止有一個狀態,這些狀態是相互獨立的,如果只用同一個內部鎖來維護他們全部,未免顯得過於笨拙,會嚴重影響吞吐量。你馬上會說,你剛才不是演示了用任意一個Object來做鎖嗎?我們多整幾個Object分別加鎖不就行了嗎?沒錯,是可行的。但這樣可能顯得有些丑陋,而且Object來做鎖本身就有語義不明確的缺點。
- 條件變量不夠用
Java用wait/notify機制實際上默認給一個內部鎖綁定了一個條件隊列,但是,有時候,針對一個狀態(鎖),我們的程序需要兩個或以上的條件隊列,比如,剛才的Account例子,如果某個2B銀行有這樣的規定“一個賬戶存款不得多於10000元”,這個時候,我們的存錢需要滿足“余額+要存的數目不大於10000,否則等待,直到滿足這個限制”,取錢需要滿足“余額足夠,否則等待,直到有錢為止”,這里需要兩個條件隊列,一個等待“存款不溢出”,一個等待“存款足夠”,這時,一個默認的條件隊列夠用么?你可能又說,夠用,我們可以模仿network里的“多路復用”,一個隊列就能當多個來使,像這樣:
Example Source Code [http://www.cnblogs.com/tomsheep/]
public class Account {
public static final int BOUND = 10000;
private int balance;
public Account(int balance) {
this.balance = balance;
}
synchronized public boolean withdraw(int amount) throws InterruptedException{
while(balance<amount)
wait();// no money, wait
balance -= amount;
notifyAll();// not full, notify
return true;
}
synchronized public void deposit(int amount) throws InterruptedException{
while(balance+amount >BOUND)
wait();//full, wait
balance +=amount;
notifyAll();// has money, notify
}
}
不是挺好嗎?恩,沒錯,是可以。但是,仍然存在性能上的缺陷:每次都有多個線程被喚醒,而實際只有一個會運行,頻繁的上下文切換和鎖請求是件很廢的事情。我們能不能不要notifyAll,而每次只用notify(只喚醒一個)呢?不好意思,想要“多路復用”,就必須notifyAll,否則會有丟失信號之虞(不解釋了)。只有滿足下面兩個條件,才能使用notify:
一,只有一個條件謂詞與條件隊列相關,每個線程從wait返回執行相同的邏輯。
二,一進一出:一個對條件變量的通知,語義上至多只激活一個線程。
我又想插播一段:剛才寫上面那段代碼,IDE提示拋InterruptedException,我想提一下,這是因為wait是一個阻塞方法,幾乎所有阻塞方法都會聲明可能拋InterruptedException,這是和Java的interrupt機制有關的,以后我們有機會再說。
既然這么做不優雅不高效不亞克西,那如之奈何?Java提供了其他工具嗎?是的。這就是傳說中的java.util.concurrency包里的故事。
5 java的並發編程工具包
java.util.concurrency是java的並發編程工具包,包下的類主要分為:
- locks部分:顯式鎖(互斥鎖和速寫鎖)相關;
- atomic部分:原子變量類相關,是構建非阻塞算法的基礎;
- executor部分:線程池相關;
- collections部分:並發容器相關;
- tools部分:同步工具相關,如信號量、閉鎖、柵欄等功能;
類圖結構:

BlockingQueue
此接口是一個線程安全的 存取實例的隊列。
使用場景
BlockingQueue通常用於一個線程生產對象,而另外一個線程消費這些對象的場景。

注意事項:
- 此隊列是
有限的,如果隊列到達臨界點,Thread1就會阻塞,直到Thread2從隊列中拿走一個對象。 - 若果隊列是空的,
Thread2會阻塞,直到Thread1把一個對象丟進隊列。
相關方法
BlockingQueue中包含了如下操作方法:
| Throws Exception | Special Value | Blocks | Times Out | |
|---|---|---|---|---|
| Insert | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
| Remove | remove(o) | poll() | take() | poll(timeout, timeunit) |
| Examine | element() | peek() |
名詞解釋:
- Throws Exception: 如果試圖的操作無法立即執行,拋一個異常。
- Special Value: 如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
- Blocks: 如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
- Times Out: 如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。
注意事項:
- 無法插入 null,否則會拋出一個 NullPointerException。
- 隊列這種數據結構,導致除了獲取開始和結尾位置的其他對象的效率都不高,雖然可通過
remove(o)來移除任一對象。
實現類
因為是一個接口,所以我們必須使用一個實現類來使用它,有如下實現類:
- ArrayBlockingQueue: 數組阻塞隊列
- DelayQueue: 延遲隊列
- LinkedBlockingQueue: 鏈阻塞隊列
- PriorityBlockingQueue: 具有優先級的阻塞隊列
- SynchronousQueue: 同步隊列
使用示例:
ArrayBlockingQueue
ArrayBlockingQueue 是一個有界的阻塞隊列
- 內部實現是將對象放到一個數組里。數組有個特性:一旦初始化,大小就無法修改。因此無法修改
ArrayBlockingQueue初始化時的上限。 ArrayBlockingQueue內部以FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
DelayQueue
DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口:
public interface Delayed extends Comparable<Delayed< { public long getDelay(TimeUnit timeUnit); // 返回將要延遲的時間段 }
- 1
- 2
- 3
- 在每個元素的 getDelay() 方法返回的值的時間段之后才釋放掉該元素。如果返回的是 0 或者負值,延遲將被認為過期,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。
Delayed接口也繼承了java.lang.Comparable接口,Delayed對象之間可以進行對比。這對DelayQueue隊列中的元素進行排序時有用,因此它們可以根據過期時間進行有序釋放。
LinkedBlockingQueue
內部以一個鏈式結構(鏈接節點)對其元素進行存儲 。
- 可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。
- 內部以 FIFO(先進先出)的順序對元素進行存儲。
PriorityBlockingQueue
一個無界的並發隊列,它使用了和類 java.util.PriorityQueue 一樣的排序規則。
- 無法向這個隊列中插入 null 值。
- 插入到 其中的元素必須實現 java.lang.Comparable 接口。
- 對於具有相等優先級(compare() == 0)的元素並不強制任何特定行為。
- 從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先級為序的。
SynchronousQueue
一個特殊的隊列,它的內部同時只能夠容納單個元素。
- 如果該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個線程將該元素從隊列中抽走。
- 如果該隊列為空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另一個線程向隊列中插入了一條新的元素。
BlockingDeque
此接口表示一個線程安全放入和提取實例的雙端隊列。
使用場景
通常用在一個線程既是生產者又是消費者的時候。
注意事項
- 如果雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。
- 如果雙端隊列為空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。
相關方法
| Throws Exception | Special Value | Blocks | Times Out | |
|---|---|---|---|---|
| Insert | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
| Remove | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
| Examine | getFirst(o) | peekFirst(o) |
| Throws Exception | Special Value | Blocks | Times Out | |
|---|---|---|---|---|
| Insert | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
| Remove | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
| Examine | getLast(o) | peekLast(o) |
注意事項
- 關於方法的處理方式和上節一樣。
- BlockingDeque 接口繼承自 BlockingQueue 接口,可以用其中定義的方法。
實現類
- LinkedBlockingDeque : 鏈阻塞雙端隊列
LinkedBlockingDeque
LinkedBlockingDeque 是一個雙端隊列,可以從任意一端插入或者抽取元素的隊列。
- 在它為空的時候,一個試圖從中抽取數據的線程將會阻塞,無論該線程是試圖從哪一端抽取數據。
ConcurrentMap
一個能夠對別人的訪問(插入和提取)進行並發處理的 java.util.Map接口。
ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。
實現類
因為是接口,必須用實現類來使用它,其實現類為
- ConcurrentHashMap
ConcurrentHashMap與HashTable比較
- 更好的並發性能,在你從中讀取對象的時候 ConcurrentHashMap 並不會把整個 Map 鎖住,只是把 Map 中正在被寫入的部分進行鎖定。
- 在被遍歷的時候,即使是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。
ConcurrentNavigableMap
一個支持並發訪問的 java.util.NavigableMap,它還能讓它的子 map 具備並發訪問的能力。
headMap
headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。
tailMap
tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。
subMap
subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。
更多方法
- descendingKeySet()
- descendingMap()
- navigableKeySet()
CountDownLatch
CountDownLatch 是一個並發構造,它允許一個或多個線程等待一系列指定操作的完成。
- CountDownLatch 以一個給定的數量初始化。countDown() 每被調用一次,這一數量就減一。
- 通過調用 await() 方法之一,線程可以阻塞等待這一數量到達零。
CyclicBarrier
CyclicBarrier 類是一種同步機制,它能夠對處理一些算法的線程實現同步。
更多實例參考: CyclicBarrier
Exchanger
Exchanger 類表示一種兩個線程可以進行互相交換對象的會和點。
更多實例參考: Exchanger
Semaphore
Semaphore 類是一個計數信號量。具備兩個主要方法:
- acquire()
- release()
- 每調用一次 acquire(),一個許可會被調用線程取走。
- 每調用一次 release(),一個許可會被返還給信號量。
Semaphore 用法
- 保護一個重要(代碼)部分防止一次超過 N 個線程進入。
- 在兩個線程之間發送信號。
保護重要部分
如果你將信號量用於保護一個重要部分,試圖進入這一部分的代碼通常會首先嘗試獲得一個許可,然后才能進入重要部分(代碼塊),執行完之后,再把許可釋放掉。
Semaphore semaphore = new Semaphore(1); //critical section semaphore.acquire(); ... semaphore.release();
- 1
- 2
- 3
- 4
- 5
在線程之間發送信號
如果你將一個信號量用於在兩個線程之間傳送信號,通常你應該用一個線程調用 acquire() 方法,而另一個線程調用 release() 方法。
- 如果沒有可用的許可,acquire() 調用將會阻塞,直到一個許可被另一個線程釋放出來。
- 如果無法往信號量釋放更多許可時,一個 release() 調用也會阻塞。
公平性
無法擔保掉第一個調用 acquire() 的線程會是第一個獲得一個許可的線程。
可以通過如下來強制公平:
Semaphore semaphore = new Semaphore(1, true);
- 1
- 需要注意,強制公平會影響到並發性能,建議不使用。
ExecutorService
這里之前有過簡單的總結: Java 中幾種常用的線程池
存在於 java.util.concurrent 包里的 ExecutorService 實現就是一個線程池實現。
實現類
此接口實現類包括:
- ScheduledThreadPoolExecutor : 通過
Executors.newScheduledThreadPool(10)創建的 - ThreadPoolExecutor: 除了第一種的
其他三種方式創建的
相關方法
- execute(Runnable):
無法得知被執行的 Runnable 的執行結果 - submit(Runnable):
返回一個 Future 對象,可以知道Runnable 是否執行完畢。 - submit(Callable):
Callable 實例除了它的 call() 方法能夠返回一個結果,通過Future可以獲取。 - invokeAny(…):
傳入一系列的 Callable 或者其子接口的實例對象,無法保證返回的是哪個 Callable 的結果 ,只能表明其中一個已執行結束。
如果其中一個任務執行結束(或者拋了一個異常),其他 Callable 將被取消。 - invokeAll(…):
返回一系列的 Future 對象,通過它們你可以獲取每個 Callable 的執行結果。
關閉ExecutorService
- shutdown() : 不會立即關閉,但它將不再接受新的任務
- shutdownNow(): 立即關閉
ThreadPoolExecutor
- ThreadPoolExecutor 使用其內部池中的線程執行給定任務(Callable 或者 Runnable)。
ScheduledExecutorService(接口,其實現類為ScheduledThreadPoolExecutor)
- ScheduledExecutorService能夠將任務延后執行,或者間隔固定時間多次執行。
- ScheduledExecutorService中的 任務由一個工作者線程異步執行,而不是由提交任務給 ScheduledExecutorService 的那個線程執行。
相關方法
- schedule (Callable task, long delay, TimeUnit timeunit):
Callable 在給定的延遲之后執行,並返回結果。 - schedule (Runnable task, long delay, TimeUnit timeunit)
除了 Runnable 無法返回一個結果之外,和第一個方法類似。 - scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
這一方法規划一個任務將被定期執行。該任務將會在首個 initialDelay 之后得到執行,然后每個 period 時間之后重復執行。
period 被解釋為前一個執行的開始和下一個執行的開始之間的間隔時間。 - scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
和上一個方法類似,只是period 則被解釋為前一個執行的結束和下一個執行的結束之間的間隔。
ForkJoinPool
ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點不同。ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。
用法參考:Java Fork and Join using ForkJoinPool
Lock
Lock 是一個類似於 synchronized 塊的線程同步機制。但是 Lock 比 synchronized 塊更加靈活、精細。
實現類
Lock是一個接口,其實現類包括:
- ReentrantLock
示例
Lock lock = new ReentrantLock(); lock.lock(); //critical section lock.unlock();
- 1
- 2
- 3
- 4
- 調用
lock() 方法之后,這個 lock 實例就被鎖住啦。 - 當lock示例被鎖后,任何其他再過來調用 lock() 方法的線程將會被阻塞住,直到調用了unlock() 方法。
- unlock() 被調用了,lock 對象解鎖了,其他線程可以對它進行鎖定了。
Lock 和 synchronized區別
- synchronized 代碼塊不能夠保證進入訪問等待的線程的先后順序。
- 你不能夠傳遞任何參數給一個 synchronized 代碼塊的入口。因此,對於 synchronized 代碼塊的訪問等待設置超時時間是不可能的事情。
- synchronized 塊必須被完整地包含在單個方法里。而一個 Lock 對象可以把它的 lock() 和 unlock() 方法的調用放在不同的方法里。
ReadWriteLock
讀寫鎖一種先進的線程鎖機制。
- 允許多個線程在同一時間對某特定資源進行讀取,
- 但同一時間內只能有一個線程對其進行寫入。
實現類
- ReentrantReadWriteLock
規則
- 如果沒有任何寫操作鎖定,那么可以有多個讀操作鎖定該鎖
- 如果沒有任何讀操作或者寫操作,只能有一個寫線程對該鎖進行鎖定。
示例:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readWriteLock.readLock().lock(); // multiple readers can enter this section // if not locked for writing, and not writers waiting // to lock for writing. readWriteLock.readLock().unlock(); readWriteLock.writeLock().lock(); // only one writer can enter this section, // and only if no threads are currently reading. readWriteLock.writeLock().unlock();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
更多原子性包裝類
位於 atomic包下,包含一系列原子性變量。
- AtomicBoolean
- AtomicInteger
- AtomicLong
- AtomicReference
…
