java.util.concurrent包的類都來自於JSR-166:Concurrent Utilities,官方的描述叫做“The JSR proposes a set of medium-level utilities that provide functionality commonly needed in concurrent programs. ”。作者是大名鼎鼎的Doug Lea,這個包的前身可以在這里找到,它最好的文檔就是系統的API手冊。
當然,這里參考的concurrent包來自JDK7,比最初JDK1.5的版本有了不少改進。我曾經在《Java多線程發展簡史》提到過,對於Java並發本身,在基礎的並發模型建立以后,JSR-133和JSR-166是貢獻最大的兩個,如覺必要,在閱讀這篇文章之前,你可以先移步閱讀這篇文章,能幫助在腦子里建立起最基礎的Java多線程知識模型;此外,還有一篇是《從DCL的對象安全發布談起》,這篇文章相當於是對JSR-133規范的閱讀理解。
這篇文章中,我只是簡要地記錄類的功能和使用,希望可以幫助大家全面掌握或回顧Java的並發包。當然,任何不清楚的接口和功能,JDK的API手冊是最好的參考材料,如果想更進一步,參透至少大部分類的實現代碼,這會非常非常辛苦。
並發容器
這些容器的關鍵方法大部分都實現了線程安全的功能,卻不使用同步關鍵字(synchronized)。值得注意的是Queue接口本身定義的幾個常用方法的區別,
- add方法和offer方法的區別在於超出容量限制時前者拋出異常,后者返回false;
- remove方法和poll方法都從隊列中拿掉元素並返回,但是他們的區別在於空隊列下操作前者拋出異常,而后者返回null;
- element方法和peek方法都返回隊列頂端的元素,但是不把元素從隊列中刪掉,區別在於前者在空隊列的時候拋出異常,后者返回null。
阻塞隊列:
- BlockingQueue.class,阻塞隊列接口
- BlockingDeque.class,雙端阻塞隊列接口
- ArrayBlockingQueue.class,阻塞隊列,數組實現
- LinkedBlockingDeque.class,阻塞雙端隊列,鏈表實現
- LinkedBlockingQueue.class,阻塞隊列,鏈表實現
- DelayQueue.class,阻塞隊列,並且元素是Delay的子類,保證元素在達到一定時間后才可以取得到
- PriorityBlockingQueue.class,優先級阻塞隊列
- SynchronousQueue.class,同步隊列,但是隊列長度為0,生產者放入隊列的操作會被阻塞,直到消費者過來取,所以這個隊列根本不需要空間存放元素;有點像一個獨木橋,一次只能一人通過,還不能在橋上停留
非阻塞隊列:
- ConcurrentLinkedDeque.class,非阻塞雙端隊列,鏈表實現
- ConcurrentLinkedQueue.class,非阻塞隊列,鏈表實現
轉移隊列:
- TransferQueue.class,轉移隊列接口,生產者要等消費者消費的隊列,生產者嘗試把元素直接轉移給消費者
- LinkedTransferQueue.class,轉移隊列的鏈表實現,它比SynchronousQueue更快
其它容器:
- ConcurrentMap.class,並發Map的接口,定義了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)這四個並發場景下特定的方法
- ConcurrentHashMap.class,並發HashMap
- ConcurrentNavigableMap.class,NavigableMap的實現類,返回最接近的一個元素
- ConcurrentSkipListMap.class,它也是NavigableMap的實現類(要求元素之間可以比較),同時它比ConcurrentHashMap更加scalable——ConcurrentHashMap並不保證它的操作時間,並且你可以自己來調整它的load factor;但是ConcurrentSkipListMap可以保證O(log n)的性能,同時不能自己來調整它的並發參數,只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,才去使用它
- ConcurrentSkipListSet.class,和上面類似,只不過map變成了set
- CopyOnWriteArrayList.class,copy-on-write模式的array list,每當需要插入元素,不在原list上操作,而是會新建立一個list,適合讀遠遠大於寫並且寫時間並苛刻的場景
- CopyOnWriteArraySet.class,和上面類似,list變成set而已
同步設備
這些類大部分都是幫助做線程之間同步的,簡單描述,就像是提供了一個籬笆,線程執行到這個籬笆的時候都得等一等,等到條件滿足以后再往后走。
- CountDownLatch.class,一個線程調用await方法以后,會阻塞地等待計數器被調用countDown直到變成0,功能上和下面的CyclicBarrier有點像
- CyclicBarrier.class,也是計數等待,只不過它是利用await方法本身來實現計數器“+1”的操作,一旦計數器上顯示的數字達到Barrier可以打破的界限,就會拋出BrokenBarrierException,線程就可以繼續往下執行;請參見我寫過的這篇文章《同步、異步轉化和任務執行》中的Barrier模式
- Semaphore.class,功能上很簡單,acquire()和release()兩個方法,一個嘗試獲取許可,一個釋放許可,Semaphore構造方法提供了傳入一個表示該信號量所具備的許可數量。
- Exchanger.class,這個類的實例就像是兩列飛馳的火車(線程)之間開了一個神奇的小窗口,通過小窗口(exchange方法)可以讓兩列火車安全地交換數據。
- Phaser.class,功能上和第1、2個差不多,但是可以重用,且更加靈活,稍微有點復雜(CountDownLatch是不斷-1,CyclicBarrier是不斷+1,而Phaser定義了兩個概念,phase和party),我在下面畫了張圖,希望能夠幫助理解:
- 一個是phase,表示當前在哪一個階段,每碰到一次barrier就會觸發advance操作(觸發前調用onAdvance方法),一旦越過這道barrier就會觸發phase+1,這很容易理解;
- 另一個是party,很多文章說它就是線程數,但是其實這並不准確,它更像一個用於判斷advance是否被允許發生的計數器:
- 任何時候都有一個party的總數,即注冊(registered)的party數,它可以在Phaser構造器里指定,也可以任意時刻調用方法動態增減;
- 每一個party都有unarrived和arrived兩種狀態,可以通過調用arriveXXX方法使得它從unarrived變成arrived;
- 每一個線程到達barrier后會等待(調用arriveAndAwaitAdvance方法),一旦所有party都到達(即arrived的party數量等於registered的數量),就會觸發advance操作,同時barrier被打破,線程繼續向下執行,party重新變為unarrived狀態,重新等待所有party的到達;
- 在絕大多數情況下一個線程就只負責操控一個party的到達,因此很多文章說party指的就是線程,但是這是不准確的,因為一個線程完全可以操控多個party,只要它執行多次的arrive方法。
- 結合JDK的文檔如果還無法理解,請參看這篇博客(牆外),它說得非常清楚;之后關於它的幾種典型用法請參見這篇文章。
給出一個Phaser使用的最簡單的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public
class
T {
public
static
void
main(String args[]) {
final
int
count =
3
;
final
Phaser phaser =
new
Phaser(count);
// 總共有3個registered parties
for
(
int
i =
0
; i < count; i++) {
final
Thread thread =
new
Thread(
new
Task(phaser));
thread.start();
}
}
public
static
class
Task
implements
Runnable {
private
final
Phaser phaser;
public
Task(Phaser phaser) {
this
.phaser = phaser;
}
@Override
public
void
run() {
phaser.arriveAndAwaitAdvance();
// 每執行到這里,都會有一個party arrive,如果arrived parties等於registered parties,就往下繼續執行,否則等待
}
}
}
|
原子對象
這些對象都的行為在不使用同步的情況下保證了原子性。值得一提的有兩點:
- weakCompareAndSet方法:compareAndSet方法很明確,但是這個是啥?根據JSR規范,調用weakCompareAndSet時並不能保證happen-before的一致性,因此允許存在重排序指令等等虛擬機優化導致這個操作失敗(較弱的原子更新操作),但是從Java源代碼看,它的實現其實和compareAndSet是一模一樣的;
- lazySet方法:延時設置變量值,這個等價於set方法,但是由於字段是volatile類型的,因此次字段的修改會比普通字段(非volatile字段)有稍微的性能損耗,所以如果不需要立即讀取設置的新值,那么此方法就很有用。
- AtomicBoolean.class
- AtomicInteger.class
- AtomicIntegerArray.class
- AtomicIntegerFieldUpdater.class
- AtomicLong.class
- AtomicLongArray.class
- AtomicLongFieldUpdater.class
- AtomicMarkableReference.class,它是用來高效表述Object-boolean這樣的對象標志位數據結構的,一個對象引用+一個bit標志位
- AtomicReference.class
- AtomicReferenceArray.class
- AtomicReferenceFieldUpdater.class
- AtomicStampedReference.class,它和前面的AtomicMarkableReference類似,但是它是用來高效表述Object-int這樣的“對象+版本號”數據結構,特別用於解決ABA問題(ABA問題這篇文章里面也有介紹)
鎖
- AbstractOwnableSynchronizer.class,這三個AbstractXXXSynchronizer都是為了創建鎖和相關的同步器而提供的基礎,鎖,還有前面提到的同步設備都借用了它們的實現邏輯
- AbstractQueuedLongSynchronizer.class,AbstractOwnableSynchronizer的子類,所有的同步狀態都是用long變量來維護的,而不是int,在需要64位的屬性來表示狀態的時候會很有用
- AbstractQueuedSynchronizer.class,為實現依賴於先進先出隊列的阻塞鎖和相關同步器(信號量、事件等等)提供的一個框架,它依靠int值來表示狀態
- Lock.class,Lock比synchronized關鍵字更靈活,而且在吞吐量大的時候效率更高,根據JSR-133的定義,它happens-before的語義和synchronized關鍵字效果是一模一樣的,它唯一的缺點似乎是缺乏了從lock到finally塊中unlock這樣容易遺漏的固定使用搭配的約束,除了lock和unlock方法以外,還有這樣兩個值得注意的方法:
- lockInterruptibly:如果當前線程沒有被中斷,就獲取鎖;否則拋出InterruptedException,並且清除中斷
- tryLock,只在鎖空閑的時候才獲取這個鎖,否則返回false,所以它不會block代碼的執行
- ReadWriteLock.class,讀寫鎖,讀寫分開,讀鎖是共享鎖,寫鎖是獨占鎖;對於讀-寫都要保證嚴格的實時性和同步性的情況,並且讀頻率遠遠大過寫,使用讀寫鎖會比普通互斥鎖有更好的性能。
- ReentrantLock.class,可重入鎖(lock行為可以嵌套,但是需要和unlock行為一一對應),有幾點需要注意:
- 構造器支持傳入一個表示是否是公平鎖的boolean參數,公平鎖保證一個阻塞的線程最終能夠獲得鎖,因為是有序的,所以總是可以按照請求的順序獲得鎖;不公平鎖意味着后請求鎖的線程可能在其前面排列的休眠線程恢復前拿到鎖,這樣就有可能提高並發的性能
- 還提供了一些監視鎖狀態的方法,比如isFair、isLocked、hasWaiters、getQueueLength等等
- ReentrantReadWriteLock.class,可重入讀寫鎖
- Condition.class,使用鎖的newCondition方法可以返回一個該鎖的Condition對象,如果說鎖對象是取代和增強了synchronized關鍵字的功能的話,那么Condition則是對象wait/notify/notifyAll方法的替代。在下面這個例子中,lock生成了兩個condition,一個表示不滿,一個表示不空;在put方法調用的時候,需要檢查數組是不是已經滿了,滿了的話就得等待,直到“不滿”這個condition被喚醒(notFull.await());在take方法調用的時候,需要檢查數組是不是已經空了,如果空了就得等待,直到“不空”這個condition被喚醒(notEmpty.await()):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class
BoundedBuffer {
final
Lock lock =
new
ReentrantLock();
final
Condition notFull = lock.newCondition();
final
Condition notEmpty = lock.newCondition();
final
Object[] items =
new
Object[
100
];
int
putptr, takeptr, count;
public
void
put(Object x)
throws
InterruptedException {
lock.lock();
try
{
while
(count == items.length)
notFull.await();
items[putptr] = x;
if
(++putptr == items.length) putptr =
0
;
++count;
notEmpty.signal();
// 既然已經放進了元素,肯定不空了,喚醒“notEmpty”
}
finally
{
lock.unlock();
}
}
public
Object take()
throws
InterruptedException {
lock.lock();
try
{
while
(count ==
0
)
notEmpty.await();
Object x = items[takeptr];
if
(++takeptr == items.length) takeptr =
0
;
--count;
notFull.signal();
// 既然已經拿走了元素,肯定不滿了,喚醒“notFull”
return
x;
}
finally
{
lock.unlock();
}
}
}
|
Fork-join框架
這是一個JDK7引入的並行框架,它把流程划分成fork(分解)+join(合並)兩個步驟(怎么那么像MapReduce?),傳統線程池來實現一個並行任務的時候,經常需要花費大量的時間去等待其他線程執行任務的完成,但是fork-join框架使用work stealing技術緩解了這個問題:
- 每個工作線程都有一個雙端隊列,當分給每個任務一個線程去執行的時候,這個任務會放到這個隊列的頭部;
- 當這個任務執行完畢,需要和另外一個任務的結果執行合並操作,可是那個任務卻沒有執行的時候,不會干等,而是把另一個任務放到隊列的頭部去,讓它盡快執行;
- 當工作線程的隊列為空,它會嘗試從其他線程的隊列尾部偷一個任務過來;
- 取得的任務可以被進一步分解。
- ForkJoinPool.class,ForkJoin框架的任務池,ExecutorService的實現類
- ForkJoinTask.class,Future的子類,框架任務的抽象
- ForkJoinWorkerThread.class,工作線程
- RecursiveTask.class,ForkJoinTask的實現類,compute方法有返回值,下文中有例子
- RecursiveAction.class,ForkJoinTask的實現類,compute方法無返回值,只需要覆寫compute方法,對於可繼續分解的子任務,調用coInvoke方法完成(參數是RecursiveAction子類對象的可變數組):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
class
SortTask
extends
RecursiveAction {
final
long
[] array;
final
int
lo;
final
int
hi;
private
int
THRESHOLD =
30
;
public
SortTask(
long
[] array) {
this
.array = array;
this
.lo =
0
;
this
.hi = array.length -
1
;
}
public
SortTask(
long
[] array,
int
lo,
int
hi) {
this
.array = array;
this
.lo = lo;
this
.hi = hi;
}
@Override
protected
void
compute() {
if
(hi - lo < THRESHOLD)
sequentiallySort(array, lo, hi);
else
{
int
pivot = partition(array, lo, hi);
coInvoke(
new
SortTask(array, lo, pivot -
1
),
new
SortTask(array,
pivot +
1
, hi));
}
}
private
int
partition(
long
[] array,
int
lo,
int
hi) {
long
x = array[hi];
int
i = lo -
1
;
for
(
int
j = lo; j < hi; j++) {
if
(array[j] <= x) {
i++;
swap(array, i, j);
}
}
swap(array, i +
1
, hi);
return
i +
1
;
}
private
void
swap(
long
[] array,
int
i,
int
j) {
if
(i != j) {
long
temp = array[i];
array[i] = array[j];
array[j] = temp;
}
}
private
void
sequentiallySort(
long
[] array,
int
lo,
int
hi) {
Arrays.sort(array, lo, hi +
1
);
}
}
|
測試的調用代碼:
1
2
3
4
5
6
7
8
9
10
11
|
@Test
public
void
testSort()
throws
Exception {
ForkJoinTask sort =
new
SortTask(array);
ForkJoinPool fjpool =
new
ForkJoinPool();
fjpool.submit(sort);
fjpool.shutdown();
fjpool.awaitTermination(
30
, TimeUnit.SECONDS);
assertTrue(checkSorted(array));
}
|
RecursiveTask和RecursiveAction的區別在於它的compute是可以有返回值的,子任務的計算使用fork()方法,結果的獲取使用join()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class
Fibonacci
extends
RecursiveTask {
final
int
n;
Fibonacci(
int
n) {
this
.n = n;
}
private
int
compute(
int
small) {
final
int
[] results = {
1
,
1
,
2
,
3
,
5
,
8
,
13
,
21
,
34
,
55
,
89
};
return
results[small];
}
public
Integer compute() {
if
(n <=
10
) {
return
compute(n);
}
Fibonacci f1 =
new
Fibonacci(n -
1
);
Fibonacci f2 =
new
Fibonacci(n -
2
);
f1.fork();
f2.fork();
return
f1.join() + f2.join();
}
}
|
執行器和線程池
這個是我曾經舉過的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public
class
FutureUsage {
public
static
void
main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Object> task =
new
Callable<Object>() {
public
Object call()
throws
Exception {
Thread.sleep(
4000
);
Object result =
"finished"
;
return
result;
}
};
Future<Object> future = executor.submit(task);
System.out.println(
"task submitted"
);
try
{
System.out.println(future.get());
}
catch
(InterruptedException e) {
}
catch
(ExecutionException e) {
}
// Thread won't be destroyed.
}
}
|
線程池具備這樣的優先級處理策略:
- 請求到來首先交給coreSize內的常駐線程執行
- 如果coreSize的線程全忙,任務被放到隊列里面
- 如果隊列放滿了,會新增線程,直到達到maxSize
- 如果還是處理不過來,會把一個異常扔到RejectedExecutionHandler中去,用戶可以自己設定這種情況下的最終處理策略
對於大於coreSize而小於maxSize的那些線程,空閑了keepAliveTime后,會被銷毀。觀察上面說的優先級順序可以看到,假如說給ExecutorService一個無限長的隊列,比如LinkedBlockingQueue,那么maxSize>coreSize就是沒有意義的。
ExecutorService:
- Future.class,異步計算的結果對象,get方法會阻塞線程直至真正的結果返回
- Callable.class,用於異步執行的可執行對象,call方法有返回值,它和Runnable接口很像,都提供了在其他線程中執行的方法,二者的區別在於:
- Runnable沒有返回值,Callable有
- Callable的call方法聲明了異常拋出,而Runnable沒有
- RunnableFuture.class,實現自Runnable和Future的子接口,成功執行run方法可以完成它自身這個Future並允許訪問其結果,它把任務執行和結果對象放到一起了
- FutureTask.class,RunnableFuture的實現類,可取消的異步計算任務,僅在計算完成時才能獲取結果,一旦計算完成,就不能再重新開始或取消計算;它的取消任務方法cancel(boolean mayInterruptIfRunning)接收一個boolean參數表示在取消的過程中是否需要設置中斷
- Executor.class,執行提交任務的對象,只有一個execute方法
- Executors.class,輔助類和工廠類,幫助生成下面這些ExecutorService
- ExecutorService.class,Executor的子接口,管理執行異步任務的執行器,AbstractExecutorService提供了默認實現
- AbstractExecutorService.class,ExecutorService的實現類,提供執行方法的默認實現,包括:
- ① submit的幾個重載方法,返回Future對象,接收Runnable或者Callable參數
- ② invokeXXX方法,這類方法返回的時候,任務都已結束,即要么全部的入參task都執行完了,要么cancel了
- ThreadPoolExecutor.class,線程池,AbstractExecutorService的子類,除了從AbstractExecutorService繼承下來的①、②兩類提交任務執行的方法以外,還有:
- ③ 實現自Executor接口的execute方法,接收一個Runnable參數,沒有返回值
- RejectedExecutionHandler.class,當任務無法被執行的時候,定義處理邏輯的地方,前面已經提到過了
- ThreadFactory.class,線程工廠,用於創建線程
- Delayed.class,延遲執行的接口,只有long getDelay(TimeUnit unit)這樣一個接口方法
- ScheduledFuture.class,Delayed和Future的共同子接口
- RunnableScheduledFuture.class,ScheduledFuture和RunnableFuture的共同子接口,增加了一個方法boolean isPeriodic(),返回它是否是一個周期性任務,一個周期性任務的特點在於它可以反復執行
- ScheduledExecutorService.class,ExecutorService的子接口,它允許任務延遲執行,相應地,它返回ScheduledFuture
- ScheduledThreadPoolExecutor.class,可以延遲執行任務的線程池
CompletionService:
- CompletionService.class,它是對ExecutorService的改進,因為ExecutorService只是負責處理任務並把每個任務的結果對象(Future)給你,卻並沒有說要幫你“管理”這些結果對象,這就意味着你得自己建立一個對象容器存放這些結果對象,很麻煩;CompletionService像是集成了一個Queue的功能,你可以調用Queue一樣的方法——poll來獲取結果對象,還有一個方法是take,它和poll差不多,區別在於take方法在沒有結果對象的時候會返回空,而poll方法會block住線程直到有結果對象返回
- ExecutorCompletionService.class,是CompletionService的實現類
其它:
- ThreadLocalRandom.class,隨機數生成器,它和Random類差不多,但是它的性能要高得多,因為它的種子內部生成后,就不再修改,而且隨機對象不共享,就會減少很多消耗和爭用,由於種子內部生成,因此生成隨機數的方法略有不同:ThreadLocalRandom.current().nextX(…)