今天開始我們聊聊 Java 並發工具包中提供的一些工具類,本文主要從並發同步容器和並發集合工具角度入手,簡單介紹下相關 API 的用法與部分實現原理,旨在幫助大家更好的使用和理解 JUC 工具類。
在開始今天的內容之前,我們還需要簡單回顧下線程、 syncronized 的相關知識。
Java 線程的運行周期中的幾種狀態, 在 java.lang.Thread 包中有詳細定義和說明:
-
NEW 狀態是指線程剛創建, 尚未啟動
-
RUNNABLE 狀態是線程正在正常運行中
-
BLOCKED 阻塞狀態
-
WAITING 等待另一個線程來執行某一特定操作的線程處於這種狀態。這里要區分 BLOCKED 和 WATING 的區別, BLOCKED 是在臨界點外面等待進入, WATING 是在臨界點里面 wait 等待其他線程喚醒(notify)
-
TIMEDWAITING 這個狀態就是有限的(時間限制)的 WAITING
-
TERMINATED 這個狀態下表示 該線程的 run 方法已經執行完畢了, 基本上就等於死亡了(當時如果線程被持久持有, 可能不會被回收)
synchronized 實現同步的基礎:Java 中的每一個對象都可以作為鎖。
具體表現為以下 3 種形式:
-
對於普通同步方法,鎖是當前實例對象。
-
對於靜態同步方法,鎖是當前類的 Class 對象。
-
對於同步方法塊,鎖是 synchronized 括號里配置的對象。當一個線程試圖訪問同步代碼塊時,它首先必須得到鎖,退出或拋出異常時必須釋放鎖。
那么同步方法(syncronized ) 與 靜態同步方法(static syncronized ) 的有什么區別呢? 我們來看一個簡單的例子:
class Phone {
public /*static*/ synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("--------sendEmail");
}
public /*static*/ synchronized void getMessage() {
System.out.println("--------getMessage");
}
public void getHello() {
System.out.println("--------getHello");
}
main{
Phone p = new Phone();
p.sendEmail();
p.getMessage();
p.getHello();
}
}
通過以上代碼回答下面問題:
-
標准訪問的時候,請問先打印郵件還是短信?
-
sendEmail方法暫停4秒鍾,請問先打印郵件還是短信?
-
新增Hello普遍方法,請問先打印郵件還是Hello?
-
兩部手機,請問先打印郵件還是短信?
-
兩個靜態同步方法,同1部手機 ,請問先打印郵件還是短信?
-
兩個靜態同步方法,有2部手機 ,請問先打印郵件還是短信?
-
1個靜態同步方法,1個普通同步方法,有1部手機 ,請問先打印郵件還是短信?
-
1個靜態同步方法,1個普通同步方法,有2部手機 ,請問先打印郵件還是短信?
思考一下,我們再做分析~
-
一個對象里面如果有多個 synchronized 方法,某一個時刻內,只要一個線程去調用 其中的一個 synchronized 方法了,其它的線程都只能等待,換句話說,某一個時刻內,只能有唯一一個線程去訪問這些 synchronized 方法;
-
所有的非靜態同步方法用的都是同一把鎖——實例對象本身,synchronized 方法鎖的是當前對象 this,被鎖定后,其它的線程都不能進入到當前對象的其它 synchronized 方法,也就是說如果一個實例對象的非靜態同步方法獲取鎖后,該實例對象的其他非靜態同步方法必須等待獲取鎖的方法釋放鎖后才能獲取鎖;
-
加個普通方法后發現和同步鎖無關;
-
換成兩個對象后,不是同一把鎖了,毋須等待互不影響。
因為別的實例對象的非靜態同步方法跟該實例對象的非靜態同步方法用的是不同的鎖,所以毋須等待;
-
所有的靜態同步方法用的是同一把鎖——類對象本身(鎖的是類模板),一旦一個靜態同步方法獲取鎖后,其他的靜態同步方法都必須等待該方法釋放鎖后才能獲取鎖,而不管是同一個實例對象的靜態同步方法之間,還是不同的實例對象的靜態同步方法之間,只要它們是同一個類模板的實例對象就要爭取同一把鎖;
-
第1和第5中的這兩把鎖是兩個不同的對象,所以靜態同步方法與非靜態同步方法之間是不會有競態條件的。
經過分析,答案也就不然而喻了。
簡單回顧之后,回到正文,JUC 中提供了比 synchronized 更加高級的同步結構,包括 CountDownLatch,CyclicBarrier,Semaphone 等可以實現更加豐富的多線程操作。
另外還提供了各種線程安全的容器 ConcurrentHashMap、有序的 ConcurrentSkipListMap,CopyOnWriteArrayList 等。
-
CountDownLatch (計數器)
讓一些線程阻塞直到另一些線程完成一系列操作后才被喚醒。
CountDownLatch 主要有 countDown、await 兩個方法,當一個或多個線程調用 await 方法時,這些線程會阻塞。其它線程調用 countDown 方法會將計數器減 1 (調用 countDown 方法的線程不會阻塞),當計數器的值變為 0 時,因 await 方法阻塞的線程會被喚醒,繼續執行。
代碼案例:圖書館下班 ,等讀者全部離開后,圖書管理員才能關閉圖書館。
main 主線程必須要等前面線程完成全部工作后,自己才能執行。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);//參數代表讀者的數量
for (int i = 1; i <= 5 ; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 號讀者離開了圖書館");
countDownLatch.countDown();
} ,CountryEnum.getKey(i).getName()).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t ------圖書管理員閉館");
}
}
結果如下:
3 號讀者離開了圖書館
2 號讀者離開了圖書館
4 號讀者離開了圖書館
1 號讀者離開了圖書館
5 號讀者離開了圖書館
main ------圖書管理員閉館
-
CyclicBarrier (循環屏障)
CyclicBarrier 的字面意思是可循環(Cyclic)使用的屏障(Barrier)。
它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續干活。
線程進入屏障通過 CyclicBarrier 的 await() 方法。
代碼案例:集齊10張卡牌才可以召開獎
public class CyclicBarrierDemo {
private static final int NUMBER = 10;
public static void main(String[] args){
//構造方法 CyclicBarrier(int parties,Runnable action)
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Thread(() -> {
System.out.println("集齊卡牌 開始開獎");
}));
for (int i = 1; i <= NUMBER ; i++) {
final int tempInt = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t 收集了"+tempInt+"號卡牌");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
} ,String.valueOf(i)).start();
}
}
}
結果如下:
1 收集了1號卡牌
8 收集了8號卡牌
4 收集了4號卡牌
3 收集了3號卡牌
5 收集了5號卡牌
7 收集了7號卡牌
9 收集了9號卡牌
6 收集了6號卡牌
2 收集了2號卡牌
10 收集了10號卡牌
集齊卡牌 開始開獎
-
Semaphone (信號量)
信號量典型應用場景是多個線程搶多個資源。
在信號量上我們定義兩種操作:
acquire(獲取): 當一個線程調用 acquire 操作時,它要么通過成功獲取信號量(信號量減 1 ),要么一直等下去,直到有線程釋放信號量,或超時。
release(釋放):實際上會將信號量的值加 1,然后喚醒等待的線程。
信號量主要用於兩個目的,一個是用於多個共享資源的互斥使用,另一個用於並發線程數的控制。
代碼案例:停車場停車 ,車搶車位
public class SemaphoreDemo {
public static void main(String[] args){
Semaphore semaphore = new Semaphore(3);// 模擬 3 個停車位
for (int i = 1; i <= 6 ; i++) {//6 輛車
new Thread(() -> {
try{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t 搶到停車位");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+"\t 離開停車位");
}catch(Exception e){
e.printStackTrace();
}finally{
semaphore.release();
}
} ,String.valueOf(i)).start();
}
}
}
結果如下:
2 搶到停車位
4 搶到停車位
1 搶到停車位
2 離開停車位
6 搶到停車位
6 離開停車位
5 搶到停車位
4 離開停車位
1 離開停車位
3 搶到停車位
3 離開停車位
5 離開停車位
接下來,我們來梳理下並發包里提供的線程安全的集合類,基本代碼如下:
public class NotSafeDemo {
public static void main(String[] args){
//高並發 list
List<Object> list = new CopyOnWriteArrayList<>();
/高並發 set
Set<Object> objects = new CopyOnWriteArraySet<>();
/高並發 map
Map<String,String> map = new ConcurrentHashMap<String,String>();
for (int i = 0; i < 50 ; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,6));
System.out.println(list);
} ,String.valueOf(i)).start();
}
}
}
CopyOnWrite 容器也被稱為寫時復制的容器。
往一個容器添加元素的時候,不直接往當前容器 Object[] 添加,而是先將當前容器 Object[] 進行 Copy,復制出一個新的容器 Object[] newElements,然后新的容器 Object[] newElements 里添加元素,添加完元素之后,再將原容器的引用指向新的容器 setArray(newElements)。
這樣做的好處是可以對 CopyOnWrite 容器進行並發的讀,而不需要加鎖,因為當前容器不會添加任何元素。所以 CopyOnWrite 容器也是一種讀寫分離的思想,讀和寫不同的容器,但是由於通過對底層數組復制來實現的,一般需要很大的開銷。當遍歷次數大大超過修改次數的時,這種方法比其他替代方法更有效。部分源碼如下:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//先復制,再添加一個空元素
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
而帶有 Concurrent 的一般才是真正的適用並發的工具,ConcurrentHashMap 被認為是弱一致性的,本質原因在於 ConcurrentHashMap 在讀數據是並沒有加鎖。
關於並發集合的應用還要在實際開發中多多體會,實踐才是最好的老師。
擴展知識:
今天的擴展知識簡單介紹下 Java 常用的 4 種線程池:
-
newCachedThreadPool
創建可緩存的線程,底層是依靠 SynchronousQueue 實現的,創建線程數量幾乎沒有限制(最大為 Integer.MAX_VALUE)。
如果長時間沒有往線程池提交任務,即如果工作線程空閑了指定時間(默認1分鍾),該工作線程自動終止。終止后如果又有了新的任務,則會創建新的線程。
在使用 CachedTreadPool 時,要注意控制任務數量,否則由於大量線程同時運行,很有可能造成系統癱瘓。
-
newFixedThreadPool
創建指定數量的工作線程,底層是依靠 LinkedBlockingQueue 實現的,沒提交一個任務就創建一個工作線程,當工作線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。
在線程空閑時,不會釋放工作線程,還會占用一定的系統資源。
-
newSingleThreadExecutor
創建單線程,底層是 LinkedBlockingQueue 實現的,它只會用一個工作線程來執行任務,保證所有的任務按指定順序執行。如果這個線程異常結束,會有另一個取代它,保證順序執行。
最大的特點是可保證順序地執行各個任務,並在任意時間是不會有過個線程活動的。
-
newScheduleThreadPool
創建一個定長的線程池,支持定時以及周期性的任務調度。
參考資料:
https://github.com/fanpengyi/java-util-concurrent.git ---- 文中代碼git庫
關注一下,我寫的就更來勁兒啦