和朱曄一起復習Java並發(五):並發容器和同步器
本節我們先會來復習一下java.util.concurrent下面的一些並發容器,然后再會來簡單看一下各種同步器。
ConcurrentHashMap和ConcurrentSkipListMap的性能
首先,我們來測試一下ConcurrentHashMap和ConcurrentSkipListMap的性能。
前者對應的非並發版本是HashMap,后者是跳表實現,Map按照Key順序排序(當然也可以提供一個Comparator進行排序)。
在這個例子里,我們不是簡單的測試Map讀寫Key的性能,而是實現一個多線程環境下使用Map最最常見的場景:統計Key出現頻次,我們的Key的范圍是1萬個,然后循環1億次(也就是Value平均也在1萬左右),10個並發來操作Map:
@Slf4j
public class ConcurrentMapTest {
int loopCount = 100000000;
int threadCount = 10;
int itemCount = 10000;
@Test
public void test() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("hashmap");
normal();
stopWatch.stop();
stopWatch.start("concurrentHashMap");
concurrent();
stopWatch.stop();
stopWatch.start("concurrentSkipListMap");
concurrentSkipListMap();
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
private void normal() throws InterruptedException {
HashMap<String, Long> freqs = new HashMap<>();
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
synchronized (freqs) {
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//log.debug("normal:{}", freqs);
}
private void concurrent() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(itemCount);
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//log.debug("concurrentHashMap:{}", freqs);
}
private void concurrentSkipListMap() throws InterruptedException {
ConcurrentSkipListMap<String, LongAdder> freqs = new ConcurrentSkipListMap<>();
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//log.debug("concurrentSkipListMap:{}", freqs);
}
}
這里可以看到,這里的三種實現:
- 對於normal的實現,我們全程鎖住了HashMap然后進行讀寫
- 對於ConcurrentHashMap,我們巧妙利用了一個computeIfAbsent()方法,實現了判斷Key是否存在,計算獲取Value,put Key Value三步操作,得到一個Value是LongAdder(),然后因為LongAdder是線程安全的所以直接調用了increase()方法,一行代碼實現了5行代碼效果
- ConcurrentSkipListMap也是一樣
運行結果如下:

可以看到我們利用ConcurrentHashMap巧妙實現的並發詞頻統計功能,其性能相比有鎖的版本高了太多。
值得注意的是,ConcurrentSkipListMap的containsKey、get、put、remove等類似操作時間復雜度是log(n),加上其有序性,所以性能和ConcurrentHashMap有差距。
如果我們打印一下ConcurrentSkipListMap最后的結果,差不多是這樣的:

可以看到Entry按照了Key進行排序。
ConcurrentHashMap的那些原子操作方法
這一節我們比較一下computeIfAbsent()和putIfAbsent()的區別,這2個方法很容易因為誤用導致一些Bug。
- 第一個是性能上的區別,如果Key存在的話,computeIfAbsent因為傳入的是一個函數,函數壓根就不會執行,而putIfAbsent需要直接傳值。所以如果要獲得Value代價很大的話,computeIfAbsent性能會好
- 第二個是使用上的區別,computeIfAbsent返回是的是操作后的值,如果之前值不存在的話就返回計算后的值,如果本來就存在那么就返回本來存在的值,putIfAbsent返回的是之前的值,如果原來值不存在那么會得到null
寫一個程序來驗證一下:
@Slf4j
public class PutIfAbsentTest {
@Test
public void test() {
ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
log.info("Start");
log.info("putIfAbsent:{}", concurrentHashMap.putIfAbsent("test1", getValue()));
log.info("computeIfAbsent:{}", concurrentHashMap.computeIfAbsent("test1", k -> getValue()));
log.info("putIfAbsent again:{}", concurrentHashMap.putIfAbsent("test2", getValue()));
log.info("computeIfAbsent again:{}", concurrentHashMap.computeIfAbsent("test2", k -> getValue()));
}
private String getValue() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return UUID.randomUUID().toString();
}
}
在這里獲取值的操作需要1s,從運行結果可以看到,第二次值已經存在的時候,putIfAbsent還耗時1s,而computeIfAbsent不是,而且還可以看到第一次值不存在的時候putIfAbsent返回了null,而computeIfAbsent返回了計算后的值:

使用的時候一定需要根據自己的需求來使用合適的方法。
ThreadLocalRandom的誤用
之前的例子里我們用到了ThreadLocalRandom,這里簡單提一下ThreadLocalRandom可能的誤用:
@Slf4j
public class ThreadLocalRandomMisuse {
@Test
public void test() throws InterruptedException {
ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
IntStream.rangeClosed(1, 5)
.mapToObj(i -> new Thread(() -> log.info("wrong:{}", threadLocalRandom.nextInt())))
.forEach(Thread::start);
IntStream.rangeClosed(1, 5)
.mapToObj(i -> new Thread(() -> log.info("ok:{}", ThreadLocalRandom.current().nextInt())))
.forEach(Thread::start);
TimeUnit.SECONDS.sleep(1);
}
}
一句話而言,我們應該每次都ThreadLocalRandom.current().nextInt()這樣用而不是實例化了ThreadLocalRandom.current()每次調用nextInt()。觀察一下兩次輸出可以發現,wrong的那5次得到的隨機數都是一樣的:

ConcurrentHashMap的並發reduce功能測試
ConcurrentHashMap提供了比較高級的一些方法可以進行並發的歸並操作,我們寫一段程序比較一下使用遍歷方式以及使用reduceEntriesToLong()統計ConcurrentHashMap中所有值的平均數的性能和寫法上的差異:
@Slf4j
public class ConcurrentHashMapReduceTest {
int loopCount = 100;
int itemCount = 10000000;
@Test
public void test() {
ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, itemCount)
.boxed()
.collect(Collectors.toMap(i -> "item" + i, Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));
StopWatch stopWatch = new StopWatch();
stopWatch.start("normal");
normal(concurrentHashMap);
stopWatch.stop();
stopWatch.start("concurrent with parallelismThreshold=1");
concurrent(concurrentHashMap, 1);
stopWatch.stop();
stopWatch.start("concurrent with parallelismThreshold=max long");
concurrent(concurrentHashMap, Long.MAX_VALUE);
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
private void normal(ConcurrentHashMap<String, Long> map) {
IntStream.rangeClosed(1, loopCount).forEach(__ -> {
long sum = 0L;
for (Map.Entry<String, Long> item : map.entrySet()) {
sum += item.getValue();
}
double average = sum / map.size();
Assert.assertEquals(itemCount / 2, average, 0);
});
}
private void concurrent(ConcurrentHashMap<String, Long> map, long parallelismThreshold) {
IntStream.rangeClosed(1, loopCount).forEach(__ -> {
double average = map.reduceEntriesToLong(parallelismThreshold, Map.Entry::getValue, 0, Long::sum) / map.size();
Assert.assertEquals(itemCount / 2, average, 0);
});
}
}
執行結果如下:

可以看到並行歸並操作對於比較大的HashMap性能好不少,注意一點是傳入的parallelismThreshold不是並行度(不是ForkJoinPool(int parallelism)的那個parallelism)的意思,而是並行元素的閾值,傳入Long.MAX_VALUE取消並行,傳入1充分利用ForkJoinPool。
當然,我們這里只演示了reduceEntriesToLong()一個方法,ConcurrentHashMap還有十幾種各種reduceXXX()用於對Key、Value和Entry進行並行歸並操作。
ConcurrentHashMap的誤用
其實這里想說的之前的文章中也提到過,ConcurrentHashMap不能確保多個針對Map的操作是原子性的(除非是之前提到computeIfAbsent()和putIfAbsent()等等),比如在下面的例子里,我們有一個9990大小的ConcurrentHashMap,有多個線程在計算它離10000滿員還有多少差距,然后填充差距:
@Test
public void test() throws InterruptedException {
int limit = 10000;
ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, limit - 10)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
log.info("init size:{}", concurrentHashMap.size());
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int __ = 0; __ < 10; __++) {
executorService.execute(() -> {
int gap = limit - concurrentHashMap.size();
log.debug("gap:{}", gap);
concurrentHashMap.putAll(LongStream.rangeClosed(1, gap)
.boxed()
.collect(Collectors.toMap(i -> UUID.randomUUID().toString(), Function.identity())));
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
}
這段代碼顯然是有問題的:
- 第一,諸如size()、containsValue()等(聚合狀態的)方法僅僅在沒有並發更新的時候是准確的,否則只能作為統計、監控來使用,不能用於控制程序運行邏輯
- 第二,即使size()是准確的,在計算出gap之后其它線程可能已經往里面添加數據了,雖然putAll()操作這一操作是線程安全的,但是這個這個計算gap,填補gap的邏輯並不是原子性的,不是說用了ConcurrentHashMap就不需要鎖了
輸出結果如下:

可以看到,有一些線程甚至計算出了負數的gap,最后結果是10040,比預期的limit多了40。
還有一點算不上誤用,只是提一下,ConcurrentHashMap的Key/Value不能是null,而HashMap是可以的,為什么是這樣呢?
下圖是ConcurrentHashMap作者的回復:

意思就是如果get(key)返回了null,你搞不清楚這到底是key沒有呢還是value就是null。非並發情況下你可以使用后contains(key)來判斷,但是並發情況下不行,你判斷的時候可能Map已經修改了。
CopyOnWriteArrayList測試
CopyOnWrite的意義在於幾乎沒有什么修改,而讀並發超級高的場景,如果有修改,我們重起爐灶復制一份,雖然代價很大,但是這樣能讓99.9%的並發讀實現無鎖,我們來試試其性能,先是寫的測試,我們比拼一下CopyOnWriteArrayList、手動鎖的ArrayList以及synchronizedList包裝過的ArrayList:
@Test
public void testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> arrayList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("arrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
synchronized (arrayList) {
arrayList.add(ThreadLocalRandom.current().nextInt(loopCount));
}
});
stopWatch.stop();
stopWatch.start("synchronizedList");
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
10萬次操作不算多,結果如下:

可見CopyOnWriteArrayList的修改因為涉及到整個數據的復制,代價相當大。
再來看看讀,先使用一個方法來進行1000萬數據填充,然后測試,迭代1億次:
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 10000000).boxed().collect(Collectors.toList()));
}
@Test
public void testRead() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> arrayList = new ArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
addAll(copyOnWriteArrayList);
addAll(arrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 100000000;
int count = arrayList.size();
stopWatch.start("copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("arrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
synchronized (arrayList) {
arrayList.get(ThreadLocalRandom.current().nextInt(count));
}
});
stopWatch.stop();
stopWatch.start("synchronizedList");
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
執行結果如下:

的確沒錯,CopyOnWriteArrayList性能相當強悍,畢竟讀取無鎖,想多少並發就多少並發。
看完了大部分的並發容器我們再來看看五種並發同步器。
CountDownLatch測試
CountDownLatch在之前的文章中已經出現過N次了,也是五種並發同步器中使用最最頻繁的一種,一般常見的應用場景有:
- 等待N個線程執行完畢
- 就像之前很多次性能測試例子,使用兩個CountDownLatch,一個用來讓所有線程等待主線程發起命令一起開啟,一個用來給主線程等待所有子線程執行完畢
- 異步操作的異步轉同步,很多基於異步網絡通訊(比如Netty)的RPC框架都使用了CountDownLatch來異步轉同步,比如下面取自RocketMQ中Remoting模塊的源碼片段:

來看看ResponseFuture的相關代碼實現:
public class ResponseFuture {
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final SemaphoreReleaseOnlyOnce once;
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
...
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
...
}
在發出網絡請求后,我們等待響應,在收到響應后我們把數據放入后解鎖CountDownLatch,然后等待響應的請求就可以繼續拿數據。
Semaphore測試
Semaphore可以用來限制並發,假設我們有一個游戲需要限制同時在線的玩家,我們先來定義一個Player類,在這里我們通過傳入的Semaphore限制進入玩家的數量。
在代碼里,我們通過了之前學習到的AtomicInteger、AtomicLong和LongAdder來統計玩家的總數,最長等待時間和宗等待時長。
@Slf4j
public class Player implements Runnable {
private static AtomicInteger totalPlayer = new AtomicInteger();
private static AtomicLong longestWait = new AtomicLong();
private static LongAdder totalWait = new LongAdder();
private String playerName;
private Semaphore semaphore;
private LocalDateTime enterTime;
public Player(String playerName, Semaphore semaphore) {
this.playerName = playerName;
this.semaphore = semaphore;
}
public static void result() {
log.info("totalPlayer:{},longestWait:{}ms,averageWait:{}ms", totalPlayer.get(), longestWait.get(), totalWait.doubleValue() / totalPlayer.get());
}
@Override
public void run() {
try {
enterTime = LocalDateTime.now();
semaphore.acquire();
totalPlayer.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
long ms = Duration.between(enterTime, LocalDateTime.now()).toMillis();
longestWait.accumulateAndGet(ms, Math::max);
totalWait.add(ms);
//log.debug("Player:{} finished, took:{}ms", playerName, ms);
}
}
}
主測試代碼如下:
@Test
public void test() throws InterruptedException {
Semaphore semaphore = new Semaphore(10, false);
ExecutorService threadPool = Executors.newFixedThreadPool(100);
IntStream.rangeClosed(1, 10000).forEach(i -> threadPool.execute(new Player("Player" + i, semaphore)));
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
Player.result();
}
我們限制並發玩家數量為10個,非公平進入,線程池是100個固定線程,總共有10000個玩家需要進行游戲,程序結束后輸出如下:

再來試試公平模式:

可以明顯看到,開啟公平模式后最長等待的那個玩家沒有等那么久了,平均等待時間比之前略長,符合預期。
CyclicBarrier測試
CyclicBarrier用來讓所有線程彼此等待,等待所有的線程或者說參與方一起到達了匯合點后一起進入下一次等待,不斷循環。在所有線程到達了匯合點后可以由最后一個到達的線程做一下『后處理』操作,這個后處理操作可以在聲明CyclicBarrier的時候傳入,也可以通過判斷await()的返回來實現。
這個例子我們實現一個簡單的場景,一個演出需要等待3位演員到位才能開始表演,演出需要進行3次。我們通過CyclicBarrier來實現等到所有演員到位,到位后我們的演出需要2秒時間。
@Slf4j
public class CyclicBarrierTest {
@Test
public void test() throws InterruptedException {
int playerCount = 5;
int playCount = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(playerCount);
List<Thread> threads = IntStream.rangeClosed(1, playerCount).mapToObj(player->new Thread(()-> IntStream.rangeClosed(1, playCount).forEach(play->{
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100));
log.debug("Player {} arrived for play {}", player, play);
if (cyclicBarrier.await() ==0) {
log.info("Total players {} arrived, let's play {}", cyclicBarrier.getParties(),play);
TimeUnit.SECONDS.sleep(2);
log.info("Play {} finished",play);
}
} catch (Exception e) {
e.printStackTrace();
}
}))).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
}
}
通過if (cyclicBarrier.await() ==0)可以實現在最后一個演員到位后做沖破柵欄后的后處理操作,我們看下這個演出是不是循環了3次,並且是不是所有演員到位后才開始的:
10:35:43.333 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 1
10:35:43.333 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 1
10:35:43.333 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 1
10:35:43.367 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 1
10:35:43.376 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 1
10:35:43.377 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 1
10:35:43.378 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 2
10:35:43.432 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 2
10:35:43.434 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 2
10:35:43.473 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 2
10:35:45.382 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 1 finished
10:35:45.390 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 2
10:35:45.390 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 2
10:35:45.437 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 3
10:35:45.443 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 3
10:35:45.445 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 3
10:35:45.467 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 3
10:35:47.395 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 2 finished
10:35:47.472 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 3
10:35:47.473 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 3
10:35:49.477 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 3 finished
從這個例子可以看到,我們的演出是在最后到達的Player1演員這個線程上進行的,值得注意的一點是,在他表演的時候其他演員已經又進入了等待狀態(不要誤認為,CyclicBarrier會讓所有線程阻塞,等待后處理完成后再讓其它線程繼續下一次循環),就等他表演結束后繼續來到await()才能又開始新的演出。
Phaser測試
Phaser和Barrier類似,只不過前者更靈活,參與方的人數是可以動態控制的,而不是一開始先確定的。Phaser可以手動通過register()方法注冊成為一個參與方,然后通過arriveAndAwaitAdvance()表示自己已經到達,等到其它參與方一起到達后沖破柵欄。
比如下面的代碼,我們對所有傳入的任務進行iterations次迭代操作。
Phaser終止的條件是大於迭代次數或者沒有參與方,onAdvance()返回true表示終止。
我們首先讓主線程成為一個參與方,然后讓每一個任務也成為參與方,在新的線程中運行任務,運行完成后到達柵欄,只要柵欄沒有終止則無限循環。
在主線程上我們同樣也是無限循環,每一個階段都是等待其它線程完成任務后(到達柵欄后),自己再到達柵欄開啟下一次任務。
@Slf4j
public class PhaserTest {
AtomicInteger atomicInteger = new AtomicInteger();
@Test
public void test() throws InterruptedException {
int iterations = 10;
int tasks = 100;
runTasks(IntStream.rangeClosed(1, tasks)
.mapToObj(index -> new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.incrementAndGet();
}))
.collect(Collectors.toList()), iterations);
Assert.assertEquals(tasks * iterations, atomicInteger.get());
}
private void runTasks(List<Runnable> tasks, int iterations) {
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations - 1 || registeredParties == 0;
}
};
phaser.register();
for (Runnable task : tasks) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
while (!phaser.isTerminated()) {
doPostOperation(phaser);
phaser.arriveAndAwaitAdvance();
}
doPostOperation(phaser);
}
private void doPostOperation(Phaser phaser) {
while (phaser.getArrivedParties() < 100) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("phase:{},registered:{},unarrived:{},arrived:{},result:{}",
phaser.getPhase(),
phaser.getRegisteredParties(),
phaser.getUnarrivedParties(),
phaser.getArrivedParties(), atomicInteger.get());
}
}
10次迭代,每次迭代100個任務,執行一下看看:

可以看到,主線程的后處理任務的while循環結束后只有它自己沒有到達柵欄,這個時候它可以做一些任務后處理工作,完成后沖破柵欄。
Exchanger測試
Exchanger實現的效果是兩個線程在同一時間(會合點)交換數據,寫一段代碼測試一下。在下面的代碼里,我們定義一個生產者線程不斷發送數據,發送數據后休眠時間隨機,通過使用Exchanger,消費者線程實現了在生產者發送數據后立刻拿到數據的效果,在這里我們並沒有使用阻塞隊列來實現:
@Slf4j
public class ExchangerTest {
@Test
public void test() throws InterruptedException {
Random random = new Random();
Exchanger<Integer> exchanger = new Exchanger<>();
int count = 10;
Executors.newFixedThreadPool(1, new ThreadFactoryImpl("producer"))
.execute(() -> {
try {
for (int i = 0; i < count; i++) {
log.info("sent:{}", i);
exchanger.exchange(i);
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryImpl("consumer"));
executorService.execute(() -> {
try {
for (int i = 0; i < count; i++) {
int data = exchanger.exchange(null);
log.info("got:{}", data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
}
}
運行效果如下:

小結
並發容器這塊我就不做過多總結了,ConcurrentHashMap實在是太好用太常用,但是務必注意其線程安全的特性並不是說ConcurrentHashMap怎么用都沒有問題,錯誤使用在業務代碼中很常見。
現在我們來舉個看表演的例子總結一下幾種並發同步器:
- Semaphore是限制同時看表演的觀眾人數,有人走了后新人才能進來看
- CountDownLatch是演職人員人不到齊表演無法開始,演完結束
- CyclicBarrier是演職人員到期了后才能表演,最后一個到的人是導演,導演會主導整個演出,演出完畢后所有演職人員修整后重新等待大家到期
- Phaser是每一場演出的演職人員名單可能隨時會更改,但是也是要確保所有演職人員到期后才能開演
同樣,代碼見我的Github,歡迎clone后自己把玩,歡迎點贊。
歡迎關注我的微信公眾號:隨緣主人的園子

