深入理解HashMap+ConcurrentHashMap的擴容策略
前言
理解HashMap和ConcurrentHashMap的重點在於:
(1)理解HashMap的數據結構的設計和實現思路
(2)在(1)的基礎上,理解ConcurrentHashMap的並發安全的設計和實現思路
前面的文章已經介紹過Map結構的底層實現,這里我們重點放在其擴容方法,
這里分別對JDK7和JDK8版本的HashMap+ConcurrentHashMap來分析:
JDK7的HashMap擴容
這個版本的HashMap數據結構還是數組+鏈表的方式,擴容方法如下:
- ```
- void transfer(Entry[] newTable) {
- Entry[] src = table; //src引用了舊的Entry數組
- int newCapacity = newTable.length;
- for (int j = 0; j < src.length; j++) { //遍歷舊的Entry數組
- Entry<K, V> e = src[j]; //取得舊Entry數組的每個元素
- if (e != null) {
- src[j] = null;//釋放舊Entry數組的對象引用(for循環后,舊的Entry數組不再引用任何對象)
- do {
- Entry<K, V> next = e.next;
- int i = indexFor(e.hash, newCapacity); //!!重新計算每個元素在數組中的位置
- e.next = newTable[i]; //標記[1]
- newTable[i] = e; //將元素放在數組上
- e = next; //訪問下一個Entry鏈上的元素
- } while (e != null);
- }
- }
- }
- ```
上面的這段代碼不並不難理解,對於擴容操作,底層實現都需要新生成一個數組,然后拷貝舊數組里面的每一個Node鏈表到新數組里面,這個方法在單線程下執行是沒有任何問題的,但是在多線程下面卻有很大問題,主要的問題在於基於頭插法的數據遷移,會有幾率造成鏈表倒置,從而引發鏈表閉鏈,導致程序死循環,並吃滿CPU。據說已經有人給原來的SUN公司提過bug,但sun公司認為,這是開發者使用不當造成的,因為這個類本就不是線程安全的,你還偏在多線程下使用,這下好了吧,出了問題這能怪我咯?仔細想想,還有點道理。
JDK7的ConcurrentHashMap擴容
HashMap是線程不安全的,我們來看下線程安全的ConcurrentHashMap,在JDK7的時候,這種安全策略采用的是分段鎖的機制,ConcurrentHashMap維護了一個Segment數組,Segment這個類繼承了重入鎖ReentrantLock,並且該類里面維護了一個 HashEntry<K,V>[] table數組,在寫操作put,remove,擴容的時候,會對Segment加鎖,所以僅僅影響這個Segment,不同的Segment還是可以並發的,所以解決了線程的安全問題,同時又采用了分段鎖也提升了並發的效率。  下面看下其擴容的源碼:
- ```
- // 方法參數上的 node 是這次擴容后,需要添加到新的數組中的數據。
- private void rehash(HashEntry<K,V> node) {
- HashEntry<K,V>[] oldTable = table;
- int oldCapacity = oldTable.length;
- // 2 倍
- int newCapacity = oldCapacity << 1;
- threshold = (int)(newCapacity * loadFactor);
- // 創建新數組
- HashEntry<K,V>[] newTable =
- (HashEntry<K,V>[]) new HashEntry[newCapacity];
- // 新的掩碼,如從 16 擴容到 32,那么 sizeMask 為 31,對應二進制 ‘000...00011111’
- int sizeMask = newCapacity - 1;
- // 遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置
- for (int i = 0; i < oldCapacity ; i++) {
- // e 是鏈表的第一個元素
- HashEntry<K,V> e = oldTable[i];
- if (e != null) {
- HashEntry<K,V> next = e.next;
- // 計算應該放置在新數組中的位置,
- // 假設原數組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
- int idx = e.hash & sizeMask;
- if (next == null) // 該位置處只有一個元素,那比較好辦
- newTable[idx] = e;
- else { // Reuse consecutive sequence at same slot
- // e 是鏈表表頭
- HashEntry<K,V> lastRun = e;
- // idx 是當前鏈表的頭結點 e 的新位置
- int lastIdx = idx;
- // 下面這個 for 循環會找到一個 lastRun 節點,這個節點之后的所有元素是將要放到一起的
- for (HashEntry<K,V> last = next;
- last != null;
- last = last.next) {
- int k = last.hash & sizeMask;
- if (k != lastIdx) {
- lastIdx = k;
- lastRun = last;
- }
- }
- // 將 lastRun 及其之后的所有節點組成的這個鏈表放到 lastIdx 這個位置
- newTable[lastIdx] = lastRun;
- // 下面的操作是處理 lastRun 之前的節點,
- // 這些節點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中
- for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
- V v = p.value;
- int h = p.hash;
- int k = h & sizeMask;
- HashEntry<K,V> n = newTable[k];
- newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
- }
- }
- }
- }
- // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部
- int nodeIndex = node.hash & sizeMask; // add the new node
- node.setNext(newTable[nodeIndex]);
- newTable[nodeIndex] = node;
- table = newTable;
- }
- ```
注意這里面的代碼,外部已經加鎖,所以這里面是安全的,我們看下具體的實現方式:先對數組的長度增加一倍,然后遍歷原來的舊的table數組,把每一個數組元素也就是Node鏈表遷移到新的數組里面,最后遷移完畢之后,把新數組的引用直接替換舊的。此外這里這有一個小的細節優化,在遷移鏈表時用了兩個for循環,第一個for的目的是為了,判斷是否有遷移位置一樣的元素並且位置還是相鄰,根據HashMap的設計策略,首先table的大小必須是2的n次方,我們知道擴容后的每個鏈表的元素的位置,要么不變,要么是原table索引位置+原table的容量大小,舉個例子假如現在有三個元素(3,5,7)要放入map里面,table的的容量是2,簡單的假設元素位置=元素的值 % 2,得到如下結構:
- ```
- [0]=null
- [1]=3->5->7
- ```
現在將table的大小擴容成4,分布如下:
- ```
- [0]=null
- [1]=5->7
- [2]=null
- [3]=3
- ```
因為擴容必須是2的n次方,所以HashMap在put和get元素的時候直接取key的hashCode然后經過再次均衡后直接采用&位運算就能達到取模效果,這個不再細說,上面這個例子的目的是為了說明擴容后的數據分布策略,要么保留在原位置,要么會被均衡在舊的table位置,這里是1加上舊的table容量這是是2,所以是3。基於這個特點,第一個for循環,作的優化如下,假設我們現在用0表示原位置,1表示遷移到index+oldCap的位置,來代表元素:
- ```
- [0]=null
- [1]=0->1->1->0->0->0->0
- ```
第一個for循環的會記錄lastRun,比如要遷移[1]的數據,經過這個循環之后,lastRun的位置會記錄第三個0的位置,因為后面的數據都是0,代表他們要遷移到新的數組中同一個位置中,所以就可以把這個中間節點,直接插入到新的數組位置而后面附帶的一串元素其實都不需要動。
接着第二個循環里面在此從第一個0的位置開始遍歷到lastRun也就是第三個元素的位置就可以了,只循環處理前面的數據即可,這個循環里面根據位置0和1做不同的鏈表追加,后面的數據已經被優化的遷移走了,但最壞情況下可能后面一個也沒優化,比如下面的結構:
- ```
- [0]=null
- [1]=1->1->0->0->0->0->1->0
- ```
這種情況,第一個for循環沒多大作用,需要通過第二個for循環從頭開始遍歷到尾部,按0和1分發遷移,這里面使用的是還是頭插法的方式遷移,新遷移的數據是追加在鏈表的頭部,但這里是線程安全的所以不會出現循環鏈表,導致死循環問題。遷移完成之后直接將最新的元素加入,最后將新的table替換舊的table即可。
JDK8的HashMap擴容
在JDK8里面,HashMap的底層數據結構已經變為數組+鏈表+紅黑樹的結構了,因為在hash沖突嚴重的情況下,鏈表的查詢效率是O(n),所以JDK8做了優化對於單個鏈表的個數大於8的鏈表,會直接轉為紅黑樹結構算是以空間換時間,這樣以來查詢的效率就變為O(logN),圖示如下:
我們看下其擴容代碼:
- ```
- final Node<K,V>[] resize() {
- Node<K,V>[] oldTab = table;
- int oldCap = (oldTab == null) ? 0 : oldTab.length;
- int oldThr = threshold;
- int newCap, newThr = 0;
- if (oldCap > 0) {
- if (oldCap >= MAXIMUM_CAPACITY) {
- threshold = Integer.MAX_VALUE;
- return oldTab;
- }
- else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
- oldCap >= DEFAULT_INITIAL_CAPACITY)
- newThr = oldThr << 1; // double threshold
- }
- else if (oldThr > 0) // initial capacity was placed in threshold
- newCap = oldThr;
- else { // zero initial threshold signifies using defaults
- newCap = DEFAULT_INITIAL_CAPACITY;
- newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
- }
- if (newThr == 0) {
- float ft = (float)newCap * loadFactor;
- newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
- (int)ft : Integer.MAX_VALUE);
- }
- threshold = newThr;
- @SuppressWarnings({"rawtypes","unchecked"})
- Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
- table = newTab;
- if (oldTab != null) {
- for (int j = 0; j < oldCap; ++j) {
- Node<K,V> e;
- if ((e = oldTab[j]) != null) {
- oldTab[j] = null;
- if (e.next == null)
- newTab[e.hash & (newCap - 1)] = e;
- else if (e instanceof TreeNode)
- ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
- else {
- //重點關注區域
- // preserve order
- Node<K,V> loHead = null, loTail = null;
- Node<K,V> hiHead = null, hiTail = null;
- Node<K,V> next;
- do {
- next = e.next;
- if ((e.hash & oldCap) == 0) {
- if (loTail == null)
- loHead = e;
- else
- loTail.next = e;
- loTail = e;
- }
- else {
- if (hiTail == null)
- hiHead = e;
- else
- hiTail.next = e;
- hiTail = e;
- }
- } while ((e = next) != null);
- if (loTail != null) {
- loTail.next = null;
- newTab[j] = loHead;
- }
- if (hiTail != null) {
- hiTail.next = null;
- newTab[j + oldCap] = hiHead;
- }
- }
- }
- }
- }
- return newTab;
- }
- ```
在JDK8中,單純的HashMap數據結構增加了紅黑樹是一個大的優化,此外根據上面的遷移擴容策略,我們發現JDK8里面HashMap沒有采用頭插法轉移鏈表數據,而是保留了元素的順序位置,新的代碼里面采用:
- ```
- //按原始鏈表順序,過濾出來擴容后位置不變的元素(低位=0),放在一起
- Node<K,V> loHead = null, loTail = null;
- //按原始鏈表順序,過濾出來擴容后位置改變到(index+oldCap)的元素(高位=0),放在一起
- Node<K,V> hiHead = null, hiTail = null;
- ```
把要遷移的元素分類之后,最后在分別放到新數組對應的位置上:
- ```
- //位置不變
- if (loTail != null) {
- loTail.next = null;
- newTab[j] = loHead;
- }
- //位置遷移(index+oldCap)
- if (hiTail != null) {
- hiTail.next = null;
- newTab[j + oldCap] = hiHead;
- }
- ```
JDK7里面是先判斷table的存儲元素的數量是否超過當前的threshold=table.length*loadFactor(默認0.75),如果超過就先擴容,在JDK8里面是先插入數據,插入之后在判斷下一次++size的大小是否會超過當前的閾值,如果超過就擴容。
JDK8的ConcurrentHashMap擴容
在JDK8中徹底拋棄了JDK7的分段鎖的機制,新的版本主要使用了Unsafe類的CAS自旋賦值+synchronized同步+LockSupport阻塞等手段實現的高效並發,代碼可讀性稍差。
ConcurrentHashMap的JDK8與JDK7版本的並發實現相比,最大的區別在於JDK8的鎖粒度更細,理想情況下talbe數組元素的大小就是其支持並發的最大個數,在JDK7里面最大並發個數就是Segment的個數,默認值是16,可以通過構造函數改變一經創建不可更改,這個值就是並發的粒度,每一個segment下面管理一個table數組,加鎖的時候其實鎖住的是整個segment,這樣設計的好處在於數組的擴容是不會影響其他的segment的,簡化了並發設計,不足之處在於並發的粒度稍粗,所以在JDK8里面,去掉了分段鎖,將鎖的級別控制在了更細粒度的table元素級別,也就是說只需要鎖住這個鏈表的head節點,並不會影響其他的table元素的讀寫,好處在於並發的粒度更細,影響更小,從而並發效率更好,但不足之處在於並發擴容的時候,由於操作的table都是同一個,不像JDK7中分段控制,所以這里需要等擴容完之后,所有的讀寫操作才能進行,所以擴容的效率就成為了整個並發的一個瓶頸點,好在Doug lea大神對擴容做了優化,本來在一個線程擴容的時候,如果影響了其他線程的數據,那么其他的線程的讀寫操作都應該阻塞,但Doug lea說你們閑着也是閑着,不如來一起參與擴容任務,這樣人多力量大,辦完事你們該干啥干啥,別浪費時間,於是在JDK8的源碼里面就引入了一個ForwardingNode類,在一個線程發起擴容的時候,就會改變sizeCtl這個值,其含義如下:
- ```
- sizeCtl :默認為0,用來控制table的初始化和擴容操作,具體應用在后續會體現出來。
- -1 代表table正在初始化
- -N 表示有N-1個線程正在進行擴容操作
- 其余情況:
- 1、如果table未初始化,表示table需要初始化的大小。
- 2、如果table初始化完成,表示table的容量,默認是table大小的0.75倍
- ```
擴容時候會判斷這個值,如果超過閾值就要擴容,首先根據運算得到需要遍歷的次數i,然后利用tabAt方法獲得i位置的元素f,初始化一個forwardNode實例fwd,如果f == null,則在table中的i位置放入fwd,否則采用頭插法的方式把當前舊table數組的指定任務范圍的數據給遷移到新的數組中,然后
給舊table原位置賦值fwd。直到遍歷過所有的節點以后就完成了復制工作,把table指向nextTable,並更新sizeCtl為新數組大小的0.75倍 ,擴容完成。在此期間如果其他線程的有讀寫操作都會判斷head節點是否為forwardNode節點,如果是就幫助擴容。
擴容源碼如下:
- ```
- private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
- int n = tab.length, stride;
- if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
- stride = MIN_TRANSFER_STRIDE; // subdivide range
- if (nextTab == null) { // initiating
- try {
- @SuppressWarnings("unchecked")
- Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
- nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTable = nextTab;
- transferIndex = n;
- }
- int nextn = nextTab.length;
- ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
- boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- for (int i = 0, bound = 0;;) {
- Node<K,V> f; int fh;
- while (advance) {
- int nextIndex, nextBound;
- if (--i >= bound || finishing)
- advance = false;
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- if (finishing) {
- nextTable = null;
- table = nextTab;
- sizeCtl = (n << 1) - (n >>> 1);
- return;
- }
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
- return;
- finishing = advance = true;
- i = n; // recheck before commit
- }
- }
- else if ((f = tabAt(tab, i)) == null)
- advance = casTabAt(tab, i, null, fwd);
- else if ((fh = f.hash) == MOVED)
- advance = true; // already processed
- else {
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- Node<K,V> ln, hn;
- if (fh >= 0) {
- int runBit = fh & n;
- Node<K,V> lastRun = f;
- for (Node<K,V> p = f.next; p != null; p = p.next) {
- int b = p.hash & n;
- if (b != runBit) {
- runBit = b;
- lastRun = p;
- }
- }
- if (runBit == 0) {
- ln = lastRun;
- hn = null;
- }
- else {
- hn = lastRun;
- ln = null;
- }
- for (Node<K,V> p = f; p != lastRun; p = p.next) {
- int ph = p.hash; K pk = p.key; V pv = p.val;
- if ((ph & n) == 0)
- ln = new Node<K,V>(ph, pk, pv, ln);
- else
- hn = new Node<K,V>(ph, pk, pv, hn);
- }
- setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- }
- else if (f instanceof TreeBin) {
- TreeBin<K,V> t = (TreeBin<K,V>)f;
- TreeNode<K,V> lo = null, loTail = null;
- TreeNode<K,V> hi = null, hiTail = null;
- int lc = 0, hc = 0;
- for (Node<K,V> e = t.first; e != null; e = e.next) {
- int h = e.hash;
- TreeNode<K,V> p = new TreeNode<K,V>
- (h, e.key, e.val, null, null);
- if ((h & n) == 0) {
- if ((p.prev = loTail) == null)
- lo = p;
- else
- loTail.next = p;
- loTail = p;
- ++lc;
- }
- else {
- if ((p.prev = hiTail) == null)
- hi = p;
- else
- hiTail.next = p;
- hiTail = p;
- ++hc;
- }
- }
- ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
- (hc != 0) ? new TreeBin<K,V>(lo) : t;
- hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
- (lc != 0) ? new TreeBin<K,V>(hi) : t;
- setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- }
- }
- }
- }
- }
- }
- ```
在擴容時讀寫操作如何進行
(1)對於get讀操作,如果當前節點有數據,還沒遷移完成,此時不影響讀,能夠正常進行。
如果當前鏈表已經遷移完成,那么頭節點會被設置成fwd節點,此時get線程會幫助擴容。
(2)對於put/remove寫操作,如果當前鏈表已經遷移完成,那么頭節點會被設置成fwd節點,此時寫線程會幫助擴容,如果擴容沒有完成,當前鏈表的頭節點會被鎖住,所以寫線程會被阻塞,直到擴容完成。
對於size和迭代器是弱一致性
volatile修飾的數組引用是強可見的,但是其元素卻不一定,所以,這導致size的根據sumCount的方法並不准確。
同理Iteritor的迭代器也一樣,並不能准確反映最新的實際情況
總結
本文主要了介紹了HashMap+ConcurrentHashMap的擴容策略,擴容的原理是新生成大於原來1倍大小的數組,然后拷貝舊數組數據到新的數組里面,在多線程情況下,這里面如果注意線程安全問題,在解決安全問題的同時,我們也要關注其效率,這才是並發容器類的最出色的地方。