簡介:
本文主要介紹Java8中的並發容器ConcurrentHashMap的工作原理,和其它文章不同的是,本文重點分析了不同線程的各類並發操作如get,put,remove之間是如何同步的,以及這些操作和擴容操作之間同步可能出現的各種情況。由於源代碼的分析肯定會有所紕漏,希望大家積極指出錯誤。
歡迎探討,如有錯誤敬請指正
如需轉載,請注明出處 http://www.cnblogs.com/nullzx/
1.Java8中 ConcurrentHashMap的結構
圖片來源(http://www.importnew.com/28263.html)
我們將數組稱之為表,將數組中每個鏈表或紅黑樹稱之為桶,將數組中的每個結點稱之為槽,也就是說“槽”存儲了鏈表的頭結點或者紅黑樹的根結點。源代碼中用內部類Node表示鏈表中的每個結點。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; //…… //省略其它代碼 //…… Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
每個結點都有一個hash屬性它表示了Node對象的哈希值,這個哈希值實際上是key.hashCode()經過spread函數進一步散列后的值(后面的內容有spread函數源代碼)。特別需要注意的是val和next屬性都是用volatile修飾的。
而有關紅黑樹的內容不在本文的討論范圍之內,有興趣的同學可以參考我的另外三篇有關紅黑樹的技術博客。
2. 表初始化長度與負載因子的含義
構造函數
public ConcurrentHashMap(int initialCapacity, float loadFactor)
2.1 表的長度
實際上表的長度必須為2的整數次冪。該類內部會用大於等於initialCapacity的最小2的整數次冪作為長度。假設你構造ConcurrentHashMap對象時傳遞的initialCapacity的值是21,那么實際上表的長度是32。一般教科書上設計哈希表時,會將表的長度設置為較大的質數,而這里將表的長度設置成2的整數次冪,我認為有以下兩點原因:
1)在教科書中我們是通過
(Node對象的hash屬性值)%表長度
來定位槽的位置。這樣做的前提是我們假設求余運算是很快就可以完成的,但實際上CPU可能需要很多條指令才能實現求余操作。如果槽的長度正好的2的整數次冪,那么我們就可以通過下面的方式計算槽的位置 ,這和上面的計算方式等價,但位與運算明顯要快於求余運算。
(Node對象的hash屬性值)&(表長度-1)
2)在多線程擴容的時,這樣的長度設置可以避免在擴容時對新表加鎖,從而加快ConcurrentHashMap的擴容速度。關於擴容的細節問題,后面會進行講述。
2.2 負載因子的含義
默認負載因子為0.75。我們假設表的長度為100(當然,實際上不可能是這個值,這里只是為了方便分析)。那么我們最多存儲75個結點就要擴容(注意並不是占用75個槽以后才會擴容)。所以負載因子是對查詢效率和存儲空間平衡關系的表示。
3.減少Key的沖突
static final int HASH_BITS = 0x7fffffff; static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
通過key確定槽的位置時,如果我們直接使用
key.hashCode() &(表長度-1)
那么我們實際上只使用了key.hashCode()的低若干位信息,高位不起作用。所以為了key更加的分散,減少沖突,在實際定位槽的位置時,我們會將key.hashCode()再進行spread一下,充分使用key.hashCode()的高16位信息。而spread后的哈希值會存儲在結點的hash屬性中,便於下一次直接使用。
如果通過上述方法,仍然還存在較多的key沖突,那么就會導致同一個槽中聚集了較多結點,Java8中就會將這個長的鏈表轉化為一顆以key表示大小的紅黑樹,以減少查詢時間。默認情況下鏈表長度大於8就會被轉化成紅黑樹。
4. 擴容操作
這里我們先不考慮並發問題,先說說基本的擴容操作,當put操作完成后,都要統計當前ConcurrentHashMap中結點的個數(顯然結點個數不是一個准確值,只能是一個估計值)。如果結點個數大於設定的閥值(表的長度*負載因子),就要進行擴容操作,以提高查詢效率。
前面我們說過表的長度是2的整數次冪,擴容時我們讓表的長度翻倍,所以擴容后的新表長度也必然是2的整數次冪。我們這里假設舊表的長度是8(實際上代碼中表的最小長度也是16,這樣假設是為了畫圖方便),圖中的數字表示結點的hash值。
從圖中我們可以看出,擴容后表的長度變成了16。我們現在要對比觀察擴容前后每個結點的位置,顯然可以得到一個有意思的結論:每個結點在擴容后要么留在了新表原來的位置上,要么去了新表 “原位置+8”的位置上,而8就是舊表的長度。比如擴容前3號槽有[3,11,19]結點,擴容后[3,19]結點依然留在了原3號位置,而節點[11]去了“原位置3 + 8 = 11”的位置。計算新表中槽的位置有很巧妙的方法,有興趣的同學可以參照transfer函數的源代碼。
擴容長度翻倍並,且擴容后長度仍然是2的整數次冪的特性在多線程擴容有很大的優勢。原表中不同桶上的結點,在新表上一定不會分配到相同位置的槽上。我們可以讓不同線程負責原表不同位置的桶中所有結點的遷移,這樣兩個線程的遷移操作是不會相互干擾的。
比如我們可以讓一個線程負責原表中3號桶中所有結點的遷移,另一個線程負責原表中4號桶所有結點的遷移。原表中3號位置上的結點只能遷移到新表3號位置或11號位置上,絕對不會映射到其它位置上。而4號位置上的結點只能遷移到新表4號位置或12號位置上,所以在遷移結點的過程中,兩個線程就不必在新表的對應槽上加鎖了。
5. 幾個重要方法的源代碼分析
5.1 get方法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //說明這個桶遷移已完成 或者 槽中是紅黑樹的根 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
通過源代碼發現,整個get操作都沒有加鎖,也沒有用 CAS操作,那么get方法是怎么保證線程安全的呢?現在先不回答這個問題,不過我們應該注意get方法中頭結點hash值小於0的情況(即eh < 0)的情況,結合后面的擴容操作進行解釋。
5.2 put方法
public V put(K key, V value) { return putVal(key, value, false); }
put方法實際上調用了putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); //每次循環都會重新計算槽的位置,因為可能剛好完成擴容操作 //擴容完成后會使用新表,槽的位置可能會發生改變 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //槽為空,先嘗試用CAS方式進行添加結點 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //當前線程先幫助遷移,遷移完成后在新表中進行put else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //加鎖操作,防止其它線程對此桶同時進行put,remove,transfer操作 synchronized (f) { //頭結點發生改變,就說明當前鏈表(或紅黑樹)的頭節點已不是f了 //可能被前面的線程remove掉了或者遷移到新表上了 //如果被remove掉了,需要重新對鏈表新的頭節點加鎖 if (tabAt(tab, i) == f) { //ForwordingNode的hash值為-1 //鏈表結點的hash值 >= 0 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果遍歷到了最后一個結點, //那么就說明需要插入新的結點,就把新結點插入在鏈表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //紅黑樹的根結點的hash值為-2 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //內部判斷是否需要擴容 addCount(1L, binCount); return null; }
put方法做了以下幾點事情:
1)如果沒有初始化就先調用initTable()方法來進行初始化過程2)如果沒有hash沖突就嘗試CAS方式插入
3)如果還在進行擴容操作就先幫助其它線程進一起行擴容
4)如果存在hash沖突,就加鎖來保證put操作的線程安全。
有意思的是,ConcurrentHashMap中並沒有使用ReentrantLock,而是直接使用了synchronized關鍵字對槽加鎖。個人猜測,這樣做的原因是避免創建過多的鎖對象。如果桶的長度是1024(別問我為啥是這個值,我只是考慮到了它是2的整數次冪,如果你聯想到了其它不宜公開討論的內容,請告訴我地址),那么我們就需要在每個桶的位置上分配一把鎖,也就要1024把鎖,考慮到每次擴容后都還要重新創建所有的鎖對象,這顯然是不划算的。
添加結點操作完成后會調用addCount方法,在addCount方法中會去判斷是否需要擴容操作。如果容量超過閥值了,就由這個線程發起擴容操作。如果已經處於擴容狀態(sizeCtl < -1),根據剩余遷移的數據和已參加到擴容中的線程數來判斷是否需要當前線程來幫助擴容。
5.3 remove方法
public V remove(Object key) { return replaceNode(key, null, null); }
實際上調用了replaceNode方法
final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || /*每次循環都會重新計算槽的位置,因為在擴容完成后會使用新表 槽的位置可能會發生改變*/ (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //如果有線程正在擴容,先幫助它一起擴容,然后在新表中進行put操作 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; //加鎖操作,防止其它線程對此桶同時進行put,remove,transfer操作 synchronized (f) { //頭結點發生改變,就說明當前鏈表(或紅黑樹)的頭節點已不是f了 //可能被前面的線程remove掉了或者遷移到新表上了 //如果被remove掉了,需要重新對鏈表新的頭節點加鎖 if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
5.4 ForwardingNode類
static final class ForwardingNode<K,V> extends Node<K,V> { //新表的引用 final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //進行get操作的線程若發現槽中的節點為ForwordingNode類型 //說明該桶中所有結點已遷移完成,會調用ForwordingNode的find方法在新表中進行查找 Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes //從新表中查詢 outer: for (Node<K,V>[] tab = nextTable;;) { //n表示新表的長度 Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || //重新在新表中定位 (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; //繼續遞歸查詢?這里沒看懂 if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } //下一個 if ((e = e.next) == null) return null; } } } }
ForwardingNode類繼承了Node類,所以ForwardingNode對象也是Node類型對象,所以它也可以放到表中。
ForwardingNode在擴容中使用。每一個ForwardingNode對象都包含擴容后的表的引用(新表保存在nextTable屬性中)。 ForwardingNode對象的key,value,next屬性值全部為null,它的hash值為-1(注意小於0哦,可以去看看get方法中對應的部分了)。
ForwardingNode對象中也定義了find的方法,它是從擴容后的新表中查詢結點,而不是以自身為頭結點進行查找。
5.5 擴容方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //頭結點加鎖,防止其它線程此時對該桶進行put和remove操作 synchronized (f) { //和put及remove操作判斷頭結點是否改變的原理類似 if (tabAt(tab, i) == f) { // fh >= 0 表示鏈表 Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //按新表中槽的位置分為兩部分 //注意新表中的節點都是新建的,而不是修改原的結點的next指針 //這樣做是為了同其它線程的get方法並發時能get正確的結果 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //將頭結點設置為fwd setTabAt(tab, i, fwd); advance = true; } //表示紅黑樹 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //將頭結點設置為fwd setTabAt(tab, i, fwd); advance = true; } } } } } }
整個擴容操作分為兩個部分
1)構建一個nextTable,它的容量是原來的兩倍,這個操作是單線程完成的。
2)是將原來table中的結點遷移到nextTable中,這里允許多線程進行操作。
在每個位置擴容時,會對頭結點加鎖,避免其它線程在該位置進行put及remove操作,這個位置擴容結束時會將頭結點設置成ForwardingNode,然后釋放鎖。ForwardingNode結點中包含新表的引用,ForwardingNode結點的hash屬性的值為-1,next屬性的值為null。原表中引用為null的槽同樣被設置成ForwardingNode結點。
多線程遷移的過程不是一個線程處理一個槽,而是一個線程處理多個連續的多個槽。在ConcurrentHashMap類中還定義下面屬性值,開始擴容時這個值表示了舊表的長度,也就是說搬運工作是從舊表的末尾開始的。
private transient volatile int transferIndex;
transfer函數中定義一個局部變量stride,它表示了每個線程的一次遷移處理的桶的個數,當一個線程處理完成后 transferIndex就自減一個stride,那么下一個線程就應該從transerIndex – stride處開始,往前處理stride個桶,以此類推完成協作。為什么要設計成從舊表的后部開始往頭部的方向搬運呢?個人猜想是搬運結束的時候條件是統一的,只是寫代碼的技巧吧。當然怎么確定整個舊表上的內容全部都遷移了,還需要讀更多的源代碼,這里就不作分析了。
6.並發問題的分析
上圖表示了擴容操作過程中舊表和新表之間的一種可能的狀態,在圖中fw表示ForwordingNode類型結點,數字表示Node類型結點(上圖中的擴容過程和前面論述過的“4.擴容操作”章節中的的擴容過程不是同一個過程,對應的數據會有所差異)。現在我們就通過以下幾種情況解釋上圖所表達的意思。
首先,多個線程在同一個位置上的get操作時顯然不需要同步,所以這種情況不需要討論,我們來討論剩下幾種情況。
6.1初化的同步問題
表的創建並不是在構造函數中進行的,而是在put方法中進行的,也就是說這實際上是個懶漢模式。但是如果多個線程同時創建表,顯然是非線程安全的。所以只能有一個線程來進行創建表,其它線程會等待創建完成后完成其它操作。ConcurrentHashMap類中設定一個volatile變量sizeCtl
private transient volatile int sizeCtl;
然后通過CAS方法去修改它,如果有其它線程發現sieCtl為-1
U.compareAndSwapInt(this, SIZECTL, sc, -1)
就表示已經有線程正在創建表了,那么當前線程就會放棄CPU使用權(調用Thread.yield()方法),等待分初始化完成后繼續進行put操作。否則當前線程嘗試將siezeCtl修改為-1,若成功,就由當前線程來創建表。
6.2 put方法和remove方法之間的同步問題
在表的同一個桶上,一個線程調用put方法和另一個線程調用put方法是互斥的;在表的同一個桶上,一個線程調用remove方法和另一個線程調用remove方法也是互斥的;在表的同一個桶上,一個線程調用remove方法和另一個線程調用put方法也是互斥的。這些互斥操作在代碼中都是通過鎖來保證的,每個線程執行這些操作時都會先鎖住槽。
6.3 put(或remove)方法和get方法的同步問題
實際上這兩類操作是不需要同步,先到先得。這主要由於Node定義中value和next都定義成了volatile類型。一個線程能否get到另一個線程剛剛put(或remove)的值,這主要由兩個線程當前訪問的結點所處的位置決定的。
6.4 get方法和擴容操作的同步問題
可以分成兩種情況討論
1)該位置的頭結點是Node類型對象,直接get,即使這個桶正在進行遷移,在get方法未完成前,遷移操作已完成,即槽被設置成了ForwordingNode對象,也沒關系,並不影響get的結果。因為get線程仍然持有舊鏈表的引用,可以從當前結點位置訪問到所有的后續結點。這是因為新表中的節點是通過復制舊表中的結點得到的,所以新表的結點的next值不會影響舊表中對應結點的next值。當get方法結束后,舊鏈表就出於不可達的狀態,會被垃圾回收線程回收。
2)該位置的頭結點是ForwordingNode類型對象(頭結點的hash值 == -1),頭結點是ForwordingNode類型的對象,調用該對象的find方法,在新表中查找。
所以無論哪種情況,都能get到正確的值。
6.5 put(或remove)方法和擴容操作的同步問題
同樣可以分為兩種情況討論:
1)該位置的頭結點是Node類型對象,那就看誰先獲取鎖,如果put操作先獲取鎖,則先將Node對象放入到舊表中,然后調用addCount方法,判斷是否需要幫助擴容。
2)該位置的頭結點是ForwordingNode類型對象,那就會先幫助擴容,然后在新表中進行put操作。
7.參考內容
[1] Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析