Java並發包常用類用法及原理


com.java.util.concurrent包是java5時添加的,專門處理多線程提供的工具類

一、Atomic

二、Lock

三、BlockingQueue

四、BlockDeque

五、ConcurrnetMap

六、CountDownLatch

七、CyclicBarrier

八、ExecutorService

九、CopyOnWriteList

十、ThreadLocal

1.atomic包

AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference
類提供多種方法,可以原子性地為參數取值、賦值、交換值(getAndSet)、比較並且設置值(CAS:compareAndSet)(失敗就重試,直到沒有沖突為止)等。

為什么需要使用atomic?
先說幾個概念:

重排序
Java優化程序性能,在編譯、處理和內存中對代碼進行重排序,重排序是對代碼的執行順序做了修改。

happens-before
規定在編譯、處理、內存中對代碼執行重排序的規則。

as-if-serial語義
規定單線程中重排序和順序執行的結果一致。

至於為什么需要在多線程使用atomic,有以下幾點原因:

1.多個線程不能保證哪個線程先執行。因為不可見主內存值的問題,可能出現臟讀的情況。

2.多線程賦值過程非原子性。因為變量在多線程中,修改一個主內存中的值,需要執行多個步驟(讀取主內存,放入寄存器,修改值、賦值到主內存中),這么一來可能你在執行這個步驟的過程中,該變量被其他線程修改了,不能保證原子性。

3.使用volatile修飾變量。volatile是變量具有可見性(當寄存器中修改了值,會立即通知主內存和其他寄存器修改值)和有序性,查看AtomicInteger的源碼,發現變量被volatile修飾。而原子性是Atomic中使用CAS(修改前判斷主內存的值是否和當前的值一致)實現的,可見性就解決了上面問題1的臟讀,有序性和原子性就解決了問題2中的問題。


private volatile int value;
public AtomicInteger(int initialValue) {
value = initialValue;
}
2.locks

ReentrantLock

可重入的互斥鎖,即同一線程可以多次獲得該鎖,線程間是互斥的。使用CAS+CLH(雙向鏈表)實現
ReentrantReadWriteLock

可重入的讀寫鎖,是在ReentrantLock的基礎上的增強,更細粒度地控制。在特殊場景中會使用到,分為readLock和writeLock,讀讀共享,讀寫和寫寫排他。
StampLock

讀寫並發鎖,適用於讀遠遠大於寫的場景。
Synchronized加鎖實現原理:

Synchronized經過編譯,會在同步塊的前后分別形成monitorenter和monitorexit這個兩個字節碼指令。在執行monitorenter指令時,首先要嘗試獲取對象鎖。如果這個對象沒被鎖定,或者當前線程已經擁有了那個對象鎖,把鎖的計算器加1,相應的,在執行monitorexit指令時會將鎖計算器就減1,當計算器為0時,鎖就被釋放了。如果獲取對象鎖失敗,那當前線程就要阻塞,直到對象鎖被另一個線程釋放為止。

Synchronized和ReentrantLock的區別聯系:

相同點:
都是加鎖實現阻塞式的同步,一個線程獲取了鎖,其他線程就必須等待。

不同點:

使用上。Synchronized直接使用關鍵字Synchronized,ReentrantLock需要實例化,並且顯示地調用lock()加鎖和在finally方法塊中unlock()解鎖。

等待可中斷。ReentrantLock可以使用lockInterruptibly()方法中斷鎖或者設置超時中斷。

公平鎖。Synchronized是非公平鎖,ReentrantLock默認也是非公平鎖,可以指定為公平鎖。

使用Condition條件,實現線程之間的協作。

在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通信方式,Condition都可以實現,這里注意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。

3.BlockingQueue

提供了不同的插入移除檢查方法,可以支持不同的返回值。

拋異常 特定值 阻塞 超時
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
檢查 get(o) peek(o) / /
阻塞隊列值提供一個隊列可以提供遵守FIFO放入和取出的操作,如果隊列滿了放入就會阻塞,相反隊列如果為空,取出就會阻塞。

BlockingQueue是一個接口類,具體有多種實現,主要介紹5種常用的:

ArrayBlockingQueue

數組阻塞隊列,故名思議是用數組實現的阻塞隊列,是有界的,只能在初始化確定隊列容量大小。內部只有一個reentrantLock,讀和寫使用同一個鎖,因此效率不高。

LinkedBlockingQueue

鏈表阻塞隊列,顧名思義是用鏈表實現的阻塞隊列,但它可以是有界的也可以無界的,內部有兩個reentrantLock,讀寫鎖是分離的。性能要比ArrayBlockingQueue要高。但創建和銷毀Node,高並發對GC有一定壓力。


//默認的構造器
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//指定容量的構造器
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
PriorityBlockingQueue

優先級阻塞隊列。基於最小二叉堆實現,線程安全的無界隊列。構造器中可以傳入初始值和比較器的規則。根據比較器規則對內部元素排序。

SynchronousQueue

同步隊列。內部只能存放一個元素。如果滿了就插入就阻塞,相反如果為空取出就阻塞。

DelayQueue

延遲隊列無界隊列。內部使用優先級阻塞隊列實現,只有元素過期才能取出來。並且按過期長短排序,隊頭的是過期最長的元素。使用ReentrantLock實現線程安全。

4.BlockDeque

提供了不同的插入移除檢查方法,可以支持不同的返回值。

拋異常 特定值 阻塞 超時
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
檢查 getFirst(o) peekFirst(o) / /
LinkedBlockingDeque
雙端鏈式阻塞隊列。默認是無界的,也可以指定容量。該阻塞隊列同時支持FIFO和FILO兩種操作方式,隊頭和隊尾都可以執行插入取出的操作。使用一把鎖+兩個條件維持隊列的同步,和ArrayBlockingQueue的原理一樣。

5.ConcurrentMap

支持並發操作的Map。

ConcurrentHashMap 是ConcurrentMap的具體實現。
1.發展。JDK1.7及之前都是使用Segment分段鎖來實現的,因為Segment數量會限制並發量,而且在尋址也會執行兩次hash,JDK1.8后取消Segment改為數組+鏈表+紅黑樹和CAS原子操作+synchronized實現。

2.初始化參數。

initialCapacity初始化Map的容量
loadFactor負載因子
concurrencyLevel是最好情況下可以達到的並發數(如果都訪問的不同的Segment上)。Segment的個數是大於等於的第一個2的n次方的數,即設置15。即Segment = concurrencyLevel = 24 = 16。默認情況下,initialCapacity等於16,loadFactor等於0.75,concurrencyLevel等於16.
3.關於鎖

1.7
Get沒有加鎖,因為Map中的key,value,nextHashEntry都是使用volatile修飾符修飾,多線程具有可見行。但是會進行兩次Hash()方法尋址,第一次確定Segment位置,第二次確定table數組中位置。
Put使用的分段鎖繼承來ReentrantLock實現可重入鎖。
1.8
Get方法同1.7相似都是沒有加鎖,一次hash尋址。
Put方法。使用CAS無鎖機制,僅在Hash沖突時候加了synchronized同步鎖。
4.擴容
數組容量增加一倍,並遷移鏈表中的數據

ConcurrentSkipListMap
使用跳表skipList實現,可以支持排序,對應非線程安全的TreeMap是使用紅黑樹實現的。ConcurrentSkipListMap適用於高並發的寫操作(千萬級),因為它鎖住的節點少,相對於紅黑樹平衡造成的鎖競爭,ConcurrentSkipListMap效率更高。
6.CountDownLatch

倒計時控制器(自己起的名字)。因為他類似於一個倒計時啟動的功能。
初始化指定倒計時的值CountDownLatch latch = new CountDownLatch(3)並使用latch.await()等待執行,當其他其他線程調用3次latch.countDown()就觸發主線程繼續。

7.CyclicBarrier

柵欄。允許定義N個線程到達柵欄才執行某個方法。

1
2
//創建一個柵欄,這里設置2個線程都執行barrier1.await()方法后可以執行barrier1Action方法
CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
8.ExecutorService
線程池服務接口,有兩種具體的實現方式

ThreadPoolExecutor
序號 名稱 類型 含義
1 corePoolSize int 核心線程池大小
2 maximumPoolSize int 最大線程池大小
3 keepAliveTime long 線程最大空閑時間
4 unit TimeUnit 時間單位
5 workQueue BlockingQueue 線程等待隊列
6 threadFactory ThreadFactory 線程創建工廠
7 handler RejectedExecutionHandler 拒絕策略
實際上Executors類使用上述參數為他提供了多種預定義的實現。


簡單介紹幾種預定義實現:

1.FixedThreadPool:可以指定固定數量的核心線程,但是隊列使用LinkedBlockingQueue是無界的,可能導致內存溢出。

2.CachedThreadPool:不限制線程的個數,要設置線程生存的周期,超過這個時間沒有使用將自動回收線程。但是隊列使用的是SynchronousQueue入隊時必須出隊。因為這些特性,該線程池應該用於類似於Netty中的短連接,快速處理大量耗時短的任務。

3.newSingleThreadExecutor:只創建一個線程,但是隊列使用LinkedBlockingQueue無界隊列。

ScheduledThreadPoolExecutor

繼承了ThreadPoolExecutor,可以設置核心和最大線程的大小,使用DelayedWorkQueue延遲隊列。

ForkJoinPool

實現了Executor接口,支持將一個大任務分為若干個子任務交給子線程處理,然后合並為一個結果集。采用了分治和遞歸的思想。內部維護了多個隊列。
(挖坑以后用到了再詳細寫)

9.CopyOnWriteList

CopyOnWriteList是並發場景下的List容器,適用於讀遠大於寫的場景。相對於Vector的線程安全List,Vector所有方法上都有Synchronized同步鎖,會造成大量的鎖競爭。CopyOnWriteList使用讀寫分離的機制,它實現了無鎖並發讀,寫操作加鎖,發生在新的副本上,寫完成后將原容器指向副本。

 

10.ThreadLocal

ThreadLocal用於處理同一線程數據共享的操作類。目的減少參數傳遞,和不同線程之間的數據隔離。
原理:內部使用靜態的ThreadLocalMap對象存放元素,同一線程使用同一個ThreadLocalMap,key是ThreadLocal對象,value是存放的值。
ThreadLocalMap使用Entry數組實現,是一個弱引用對象,當線程被銷毀時候ThreadLocalMap也會被回收。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM