ConcurrentHashMap擴容機制
ConcurrentHashMap,jdk1.8,采用多線程擴容。整個擴容過程,通過CAS設置sizeCtl、transferIndex等變量協調多個線程進行並發擴容。多線程無鎖擴容的關鍵就是通過CAS設置sizeCtl與transferIndex變量,協調多個線程對table數組中的node進行遷移。
如何實現線程安全
采用CAS和synchronized關鍵字來保證線程安全。
put方法邏輯
- 計算key的hash值
- 判斷Node[]數組是否初始化,沒有則進行初始化操作
- 通過hash定位數組的索引坐標,是否有Node節點,如果沒有則使用CAS進行添加(鏈表的頭節點),添加失敗則進入下次循環。
- 檢查到內部正在擴容,就幫助它一塊擴容。
- 如果f!=null,則使用synchronized鎖住f元素(鏈表/紅黑樹的頭元素)。如果是Node(鏈表結構)則執行鏈表的添加操作;如果是TreeNode(樹型結構)則執行樹添加操作。
- 判斷鏈表長度已經達到臨界值8(默認值),當節點超過這個值就需要把鏈表轉換為樹結構。
- 如果添加成功就調用addCount()方法統計size,並且檢查是否需要擴容
擴容過程分析
1、線程執行put操作,發現容量已經達到擴容閾值,需要進行擴容操作,此時transferindex=tab.length=32
2、擴容線程A 以CAS的方式修改transferindex=31-16=16 ,然后按照降序遷移table[31]至table[16]這個區間的hash桶
3、遷移hash桶時,會將桶內的鏈表或者紅黑樹,按照一定算法,拆分成2份,將其插入nextTable[i]和nextTable[i+n](n是table數組的長度)。 遷移完畢的hash桶,會被設置成ForwardingNode節點,以此告知訪問此桶的其他線程,此節點已經遷移完畢。
4、此時線程2訪問到了ForwardingNode節點,如果線程2執行的put或remove等寫操作,那么就會先幫其擴容。如果線程2執行的是get等讀方法,則會調用ForwardingNode的find方法,去nextTable里面查找相關元素。
sizeCtl屬性
private transient volatile int sizeCtl;
1
多線程之間,以volatile的方式讀取sizeCtl屬性,來判斷ConcurrentHashMap當前所處的狀態。通過CAS設置sizeCtl屬性,告知其他線程ConcurrentHashMap的狀態變更。
不同狀態,sizeCtl所代表的含義也有所不同。
未初始化:sizeCtl=0:表示沒有指定初始容量。sizeCtl>0:表示初始容量。
初始化中:sizeCtl=-1,標記作用,告知其他線程,正在初始化
正常狀態:sizeCtl=0.75n,擴容閾值
擴容中:sizeCtl < 0 : 表示有其他線程正在執行擴容
sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT)+2 表示此時只有一個線程在執行擴容
transferIndex 擴容索引
擴容索引,表示已經分配給擴容線程的table數組索引位置。主要用來協調多個線程,並發安全地獲取遷移任務(hash桶)。
private transient volatile int transferIndex;
private static final int MIN_TRANSFER_STRIDE = 16; //擴容線程每次最少要遷移16個hash桶
12
1、在擴容之前,transferIndex 在數組的最右邊 。此時有一個線程發現已經到達擴容閾值,准備開始擴容。
2、擴容線程,在遷移數據之前,首先要將transferIndex右移(以CAS的方式修改 transferIndex=transferIndex-stride(要遷移hash桶的個數)),獲取遷移任務。每個擴容線程都會通過for循環+CAS的方式設置transferIndex,因此可以確保多線程擴容的並發安全。
換個角度,我們可以將待遷移的table數組,看成一個任務隊列,transferIndex看成任務隊列的頭指針。而擴容線程,就是這個隊列的消費者。擴容線程通過CAS設置transferIndex索引的過程,就是消費者從任務隊列中獲取任務的過程。為了性能考慮,我們當然不會每次只獲取一個任務(hash桶),因此ConcurrentHashMap規定,每次至少要獲取16個遷移任務(遷移16個hash桶,MIN_TRANSFER_STRIDE = 16)
無鎖的執行者-CAS
CAS的全稱是Compare And Swap 即比較交換,其算法核心思想如下
執行函數:CAS(V,E,N)
其包含3個參數:V表示要更新的變量;E表示預期值;N表示新值
如果V值等於E值,則將V的值設為N。若V值和E值不同,則說明已經有其他線程做了更新,則當前線程什么都不做。
通俗的理解就是CAS操作需要我們提供一個期望值,當期望值與當前線程的變量值相同時,說明還沒線程修改該值,當前線程可以進行修改,也就是執行CAS操作,但如果期望值與當前線程不符,則說明該值已被其他線程修改,此時不執行更新操作,但可以選擇重新讀取該變量再嘗試再次修改該變量,也可以放棄操作。
由於CAS操作屬於樂觀派,它總認為自己可以成功完成操作,當多個線程同時使用CAS操作一個變量時,只有一個會勝出,並成功更新,其余均會失敗,但失敗的線程並不會被掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許失敗的線程放棄操作,這點從圖中也可以看出來。基於這樣的原理,CAS操作即使沒有鎖,同樣知道其他線程對共享資源操作影響,並執行相應的處理措施。
同時從這點也可以看出,由於無鎖操作中沒有鎖的存在,因此不可能出現死鎖的情況,也就是說無鎖操作天生免疫死鎖。
為什么要使用CAS+Synchronized取代Segment+ReentrantLock
假設你對CAS,Synchronized,ReentrantLock這些知識很了解,並且知道AQS,自旋鎖,偏向鎖,輕量級鎖,重量級鎖這些知識,也知道Synchronized和ReentrantLock在喚醒被掛起線程競爭的時候有什么區別。
Synchronized上鎖的對象,請記住,Synchronized是靠對象的對象頭和此對象對應的monitor來保證上鎖的,也就是對象頭里的重量級鎖標志指向了monitor,而monitor內部則保存了一個當前線程,也就是搶到了鎖的線程.
那么這里的 f 是什么呢?它是Node鏈表里的每一個node,也就是說,Synchronized是將每一個node對象作為了一個鎖,這樣做的好處是將鎖細化了,也就是說,除非兩個線程同時操作一個node,注意,是一個node而不是一個Node鏈表,那么才會爭搶同一把鎖.
如果使用ReentrantLock其實也可以將鎖細化成這樣的,只要讓Node類繼承ReentrantLock就行了,這樣的話調用f.lock()就能做到和Synchronized(f)同樣的效果,但為什么不這樣做呢?
試想一下,鎖已經被細化到這種程度了,那么出現並發爭搶的可能性還高嗎?哪怕出現爭搶了,只要線程可以在30到50次自旋里拿到鎖,那么Synchronized就不會升級為重量級鎖,而等待的線程也就不用被掛起,我們也就少了掛起和喚醒這個上下文切換的過程開銷.
但如果是ReentrantLock,它只有在線程沒有搶到鎖,然后新建Node節點后再嘗試一次而已,不會自旋,而是直接被掛起,這樣一來就很容易多出線程上下文開銷的代價.當然,你也可以使用tryLock(),但是這樣又出現了一個問題,你怎么知道tryLock的時間呢?在時間范圍里還好,假如超過了呢?
所以,在鎖被細化到如此程度上,使用Synchronized是最好的選擇了.這里再補充一句,Synchronized和ReentrantLock他們的開銷差距是在釋放鎖時喚醒線程的數量,Synchronized是喚醒鎖池里所有的線程+剛好來訪問的線程,而ReentrantLock則是當前線程后進來的第一個線程+剛好來訪問的線程.
如果是線程並發量不大的情況下,那么Synchronized因為自旋鎖,偏向鎖,輕量級鎖的原因,不用將等待線程掛起,偏向鎖甚至不用自旋,所以在這種情況下要比ReentrantLock高效.