1.1 java.util.ConcurrentHashMap繼承結構
ConcurrentHashMap和HashMap的實現有很大的相似性,建議先看HashMap源碼,再來理解ConcurrentHashMap。
1.2 java.util.ConcurrentHashMap屬性
這里僅展示幾個關鍵的屬性
1 // ConcurrentHashMap核心數組 2 transient volatile Node<K,V>[] table; 3 4 // 擴容時才會用的一個臨時數組 5 private transient volatile Node<K,V>[] nextTable; 6 7 /** 8 * table初始化和resize控制字段 9 * 負數表示table正在初始化或resize。-1表示正在初始化,-N表示有N-1個線程正在resize操作 10 * 當table為null的時候,保存初始化表的大小以用於創建時使用,或者直接采用默認值0 11 * table初始化之后,保存下一次擴容的的大小,跟HashMap的threshold = loadFactor*capacity作用相同 12 */ 13 private transient volatile int sizeCtl; 14 15 // resize的時候下一個需要處理的元素下標為index=transferIndex-1 16 private transient volatile int transferIndex; 17 18 // 通過CAS無鎖更新,ConcurrentHashMap元素總數,但不是准確值 19 // 因為多個線程同時更新會導致部分線程更新失敗,失敗時會將元素數目變化存儲在counterCells中 20 private transient volatile long baseCount; 21 22 // resize或者創建CounterCells時的一個標志位 23 private transient volatile int cellsBusy; 24 25 // 用於存儲元素變動 26 private transient volatile CounterCell[] counterCells;
1.3 java.util.ConcurrentHashMap方法
1.3.1 Unsafe.compareAndSwapXXX方法
Unsafe.compareAndSwapXXX方法是sun.misc.Unsafe類中的方法,因為在ConcurrentHashMap中大量使用了這些方法。其聲明如下:
public final native boolean compareAndSwapXXX(type1 object, type2 offset, type4 expect, type5 update);
object為待修改的對象,offset為偏移量(數組可以理解為下標),expect為期望值,update為更新值。這個方法執行的邏輯偽代碼如下:
1 if (object[offset].value equal expect) { 2 object[offset].value = update; 3 return true; 4 } else { 5 return false 6 }
object[offset].value 等於expect更新value值並返回true,否則不更新並且返回false。之所以不更新是因為多線程執行時有其它線程已經修改該值,expect已經不是最新的值,如果強行修改必然會覆蓋之前的修改,造成臟數據。
CAS方法都是native方法,可以保證原子性,並且效率比synchronized高。
1.3.2 hash方法
1 static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash 2 int hash = spread(key.hashCode()); 3 4 static final int spread(int h) { 5 return (h ^ (h >>> 16)) & HASH_BITS; 6 }
上面源碼為計算hash算法,h ^ (h >>> 16)在計算hash的時候key.hashCode()的高位也參與運算,這部分跟HashMap計算方法一致,不同的是h ^ (h >>> 16)計算結果“與”上 0x7fffffff,從而保證結果一定為正整數。獲得hash之后,通過hash & (n -1)計算下標。
ConcurrentHashMap中的元素節點總結一下有這么幾種可能:
(1) null 暫無元素
(2) Node<K, V> 普通節點,可以組成單向鏈表,hash > 0
(3) TreeBin<K,V> 紅黑樹節點,TreeBin是對TreeNode的封裝,其hash為TREEBIN = -2。
HashMap和ConcurrentHashMap的TreeNode實現並不相同。
在HashMap中TreeNode封裝了紅黑樹所有的操作方法,而ConcurrentHashMap中紅黑樹操作的方法都封裝在TreeBin中,TreeBin相當於一個紅黑樹容器,容器中的紅黑樹節點為TreeNode。
HashMap可以直接在tab[i]存入TreeNode,而ConCurrentHashMap只能在tab[i]存入TreeBin。
(4) ForwardingNode<K,V> key和value都為null的一個特殊節點,用於resize操作填充已經完成遷移操作的節點。FrowardingNode的hash在初始化的時候被置成MOVED = -1
在resize過程中當發現tab[i]上是ForwardingNode的時候(通過hash判斷)就可知tab[i]已經遷移完了,直接跳過該節點去處理其它節點。
ConcurrentHashMap禁止node的key或value為null或許跟該節點的存在也是有一定關系的。
(5)ReservationNode<K,V>只在compute和computeIfAbsent中使用,其hash為RESERVED = -3
從上面的總結可以看出普通節點hash為正整數是有意義的,hash > 0是判斷該節點是否為鏈表節點(普通節點)的一個重要依據。
1.3.3 get/set/update tab[i] 方法
1 // 獲取tab[i]節點 2 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { 3 return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); 4 } 5 6 // compare and swap tab[i],期望值是c,tab[i].value == c ? tab[i] = v : return false 7 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { 8 return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); 9 } 10 11 // 設置tab[i] = v 12 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { 13 U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); 14 }
1.3.4 size() 方法
1 public int size() { 2 long n = sumCount(); 3 return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); 4 } 5 6 final long sumCount() { 7 CounterCell[] as = counterCells; CounterCell a; 8 long sum = baseCount; 9 // 除了baseCount以外,部分元素變化存儲在counterCells數組中 10 if (as != null) { 11 // 遍歷數組累加獲得結果 12 for (int i = 0; i < as.length; ++i) { 13 if ((a = as[i]) != null) 14 sum += a.value; 15 } 16 } 17 return sum; 18 }
ConcurrentHashMap中baseCount用於保存tab中元素總數,但是並不准確,因為多線程同時增刪改,會導致baseCount修改失敗,此時會將元素變動存儲於counterCells數組內。
當需要統計當前的size的時候,除了要統計baseCount之外,還需要統計counterCells中的元素變化。
值得一提的是即使如此,統計出來的依舊不是當前tab中元素的准確值,在多線程環境下統計前后並不能stop the world暫停線程操作,因此無法保證准確性。
1.3.5 put/putIfAbsent方法
1 public V put(K key, V value) { 2 // 核心是調用putVal方法 3 return putVal(key, value, false); 4 } 5 6 public V putIfAbsent(K key, V value) { 7 // 如果key存在就不更新value 8 return putVal(key, value, true); 9 } 10 11 /** Implementation for put and putIfAbsent */ 12 final V putVal(K key, V value, boolean onlyIfAbsent) { 13 // key或value 為null都是不允許的,因為Forwarding Node就是key和value都為null,是用作標志位的。 14 if (key == null || value == null) throw new NullPointerException(); 15 // 根據key計算hash值,有了hash就可以計算下標了 16 int hash = spread(key.hashCode()); 17 int binCount = 0; 18 // 可能需要初始化或擴容,因此一次未必能完成插入操作,所以添加上for循環 19 for (Node<K,V>[] tab = table;;) { 20 Node<K,V> f; int n, i, fh; 21 // 表還沒有初始化,先初始化,lazily initialized 22 if (tab == null || (n = tab.length) == 0) 23 tab = initTable(); 24 // 根據hash計算應該插入的index,該位置上還沒有元素,則直接插入 25 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 26 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 27 break; // no lock when adding to empty bin 28 } 29 // static final int MOVED = -1; // hash for forwarding nodes 30 // 說明f為ForwardingNode,只有擴容的時候才會有ForwardingNode出現在tab中,因此可以斷定該tab正在進行擴容 31 else if ((fh = f.hash) == MOVED) 32 // 協助擴容 33 tab = helpTransfer(tab, f); 34 else { 35 V oldVal = null; 36 // 節點上鎖,hash值相同的節點組成的鏈表頭結點 37 synchronized (f) { 38 if (tabAt(tab, i) == f) { 39 if (fh >= 0) { // 是鏈表節點 40 binCount = 1; 41 for (Node<K,V> e = f;; ++binCount) { 42 K ek; 43 // 遍歷鏈表查找是否包含該元素 44 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { 45 oldVal = e.val; // 保存舊的值用於當做返回值 46 if (!onlyIfAbsent) 47 e.val = value; // 替換舊的值為新值 48 break; 49 } 50 Node<K,V> pred = e; 51 if ((e = e.next) == null) { 52 // 遍歷鏈表,如果一直沒找到,則新建一個Node放到鏈表結尾 53 pred.next = new Node<K,V>(hash, key, value, null); 54 break; 55 } 56 } 57 } 58 else if (f instanceof TreeBin) { // 是紅黑樹節點 59 Node<K,V> p; 60 binCount = 2; 61 // 去紅黑樹查找該元素,如果沒找到就添加,找到了就返回該節點 62 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { 63 // 保存舊的value用於返回 64 oldVal = p.val; 65 if (!onlyIfAbsent) 66 p.val = value; // 替換舊的值 67 } 68 } 69 } 70 } 71 if (binCount != 0) { 72 if (binCount >= TREEIFY_THRESHOLD) 73 // 鏈表長度超過閾值(默認為8),則需要將鏈表轉為一棵紅黑樹 74 treeifyBin(tab, i); 75 if (oldVal != null) 76 // 如果只是替換,並未帶來節點的增加則直接返回舊的value即可 77 return oldVal; 78 break; 79 } 80 } 81 } 82 // 元素總數加1,並且判斷是否需要擴容 83 addCount(1L, binCount); 84 return null; 85 }
1.3.6 addCount方法
在putVal方法中調用了若干其它方法,下面來看下addCount方法。
1 // check<0不檢查resize, check<=1只在沒有線程競爭的情況下檢查resize 2 private final void addCount(long x, int check) { 3 CounterCell[] as; long b, s; 4 // counterCells數組不為null 5 if ((as = counterCells) != null || 6 // CAS更新BASECOUNT失敗(有其它線程更新了BASECOUNT,baseCount已經不是最新值) 7 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 8 CounterCell a; long v; int m; 9 boolean uncontended = true; 10 // counterCells為null 11 if (as == null || (m = as.length - 1) < 0 || 12 // counterCells對應位置為null,這里不是很懂,有沒有大神解答下? 13 // ThreadLocalRandom.getProbe() 獲得線程探測值,什么用途? 14 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 15 // 更新CELLVALUE失敗 16 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 17 // 初始化counterCells 18 fullAddCount(x, uncontended); 19 return; 20 } 21 // counterCells != null 或者 BASECOUNT CAS更新失敗都是因為有線程競爭,因此不檢查resize 22 if (check <= 1) 23 return; 24 // 統計下ConcurrentHashMap元素總數 25 s = sumCount(); 26 } 27 if (check >= 0) { 28 Node<K,V>[] tab, nt; int n, sc; 29 // 元素總數大於sizeCtl 30 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { 31 // 獲取一個resize標志位 32 int rs = resizeStamp(n); 33 // sizeCtl < 0 表示table正在初始化或者resize 34 if (sc < 0) { 35 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 36 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) 37 break; 38 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 39 transfer(tab, nt); 40 } 41 // 當前線程是第一個發起擴容操作 42 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) 43 transfer(tab, null); 44 s = sumCount(); 45 } 46 } 47 }
1.3.7 resize相關方法:resizeStamp、helpTransfer、transfer
1 // 返回一個標志位,該標志位經過RESIZE_STAMP_SHIFT左移必定為負數 2 static final int resizeStamp(int n) { 3 // Integer.numberOfLeadingZeros返回n對應32位二進制數左側0的個數,如9(1001)返回28 4 // 1 << (RESIZE_STAMP_BITS - 1) = 2^15,其中RESIZE_STAMP_BITS固定為16 5 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 6 } 7
helpTransfer方法:輔助擴容方法,直接進入transfer方法的遷移元素階段
1 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { 2 Node<K,V>[] nextTab; int sc; 3 // 在tab中發現了ForwardingNode,在ForwardingNode初始化的時候保存了nextTable引用 4 // 因此可以通過f找到nextTable,並且可以斷定nextTable!=null 5 if (tab != null && (f instanceof ForwardingNode) && 6 (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { 7 int rs = resizeStamp(tab.length); 8 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { 9 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 10 sc == rs + MAX_RESIZERS || transferIndex <= 0) 11 break; 12 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { 13 transfer(tab, nextTab); 14 break; 15 } 16 } 17 return nextTab; 18 } 19 return table; 20 }
transfer方法:resize的核心操作。基本思路是先new一個double capacity的nextTable數組,然后將tab中的元素一個一個遷移到nextTable中。遷移完成后將tab = nextTable操作替換掉tab。
1 // 擴容操作方法 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 3 int n = tab.length, stride; 4 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 5 stride = MIN_TRANSFER_STRIDE; // subdivide range 6 // 剛開始resize,需要初始化nextTab 7 if (nextTab == null) { 8 try { 9 @SuppressWarnings("unchecked") 10 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 擴容為兩倍 11 nextTab = nt; 12 } catch (Throwable ex) { // try to cope with OOME 13 sizeCtl = Integer.MAX_VALUE; 14 return; 15 } 16 nextTable = nextTab; 17 transferIndex = n; // 倒序transfer tab 18 } 19 int nextn = nextTab.length; // 擴容后表的length 20 // 預先定義一個頭節點ForwardingNode,其hash被置成MOVED=-1 21 // 當線程發現某個元素hash==MOVED則表明該節點已經被處理過 22 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 23 boolean advance = true; 24 // 是否完成元素遷移的標志 25 boolean finishing = false; // to ensure sweep before committing nextTab 26 for (int i = 0, bound = 0;;) { 27 Node<K,V> f; int fh; 28 // 這個while循環是為了找到下一個准備處理的下標 29 while (advance) { 30 int nextIndex, nextBound; 31 // --i還未越界,准備處理tab[i] 32 // finishing==true,resize完成,可能處於提交前的檢查階段,檢查tab[--i] 33 if (--i >= bound || finishing) 34 advance = false; 35 // 下一個准備處理的元素下標為transferIndex-1<0, 可以斷定tab已經完成了transfer操作 36 else if ((nextIndex = transferIndex) <= 0) { 37 i = -1; 38 advance = false; 39 } 40 else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, 41 nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { 42 bound = nextBound; 43 i = nextIndex - 1; // 下一個准備處理的index 44 advance = false; 45 } 46 } 47 // i越界,可能已經完成元素遷移操作 48 if (i < 0 || i >= n || i + n >= nextn) { 49 int sc; 50 if (finishing) { // 擴容完成,替換table 51 // 擴容完成nextTable置空 52 nextTable = null; 53 // 替換table為擴容后的nextTab 54 table = nextTab; 55 // sizeCtl設置為0.75 * capacity,即為下一次需要擴容的閾值 56 sizeCtl = (n << 1) - (n >>> 1); 57 return; 58 } 59 // CAS更新sizeCtl,sc-1表示新加入一個線程參與擴容操作 60 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 61 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 62 return; 63 finishing = advance = true; 64 // 處理完成后重新遍歷一遍,以免多線程操作帶來遺漏 65 i = n; // recheck before commit 66 } 67 } 68 else if ((f = tabAt(tab, i)) == null) 69 // tab[i] == null則置一個ForwardingNode 70 advance = casTabAt(tab, i, null, fwd); 71 else if ((fh = f.hash) == MOVED) 72 // ForwardingNode的hash為MOVED,說明tab[i]已經被置成ForwardingNode,已經處理過 73 advance = true; // already processed 74 else { 75 // 對tab[i]節點加鎖,鎖住了tab[i]節點上所有的Node 76 synchronized (f) { 77 // 如果AB兩個線程先后執行到這里,A線程獲取鎖,執行完遷移之后釋放鎖;B線程獲取鎖,此時tab[i]是ForwardingNode,不等於f 78 if (tabAt(tab, i) == f) { 79 Node<K,V> ln, hn; 80 // fh >= 0說明是鏈表節點。TreeBin的hash在初始化的時候被置成TREEBIN=-2 81 if (fh >= 0) { 82 // (fh = f.hash) & n 決定Node應該遷移到原下標i還是應該遷移到i+n位置 83 // 這種擴容方法參考HashMap的resize思想 http://www.cnblogs.com/snowater/p/7742287.html 84 int runBit = fh & n; 85 Node<K,V> lastRun = f; 86 // 遍歷鏈表找到最后一個與鏈表頭結點runBit不同的Node,並且將runBit置為該節點的 p.hash & n 87 for (Node<K,V> p = f.next; p != null; p = p.next) { 88 int b = p.hash & n; 89 if (b != runBit) { 90 runBit = b; 91 lastRun = p; 92 } 93 } 94 if (runBit == 0) { 95 // runBit == 0 表明該Node還應遷移到下標i的位置 96 ln = lastRun; 97 hn = null; 98 } 99 else { 100 // runBit != 0 表明該Node應遷移到下標i + n的位置 101 hn = lastRun; 102 ln = null; 103 } 104 // 遍歷鏈表,拆分之,拆分后基本是原鏈表的倒序(最后一段鏈表除外,它還是以順序的方式處於鏈表末尾) 105 for (Node<K,V> p = f; p != lastRun; p = p.next) { 106 int ph = p.hash; K pk = p.key; V pv = p.val; 107 if ((ph & n) == 0) // 該Node應該遷移到下標i位置 108 ln = new Node<K,V>(ph, pk, pv, ln); 109 else // 該Node應該遷移到下標i+n位置 110 hn = new Node<K,V>(ph, pk, pv, hn); 111 } 112 setTabAt(nextTab, i, ln); 113 setTabAt(nextTab, i + n, hn); 114 // 處理完后將tab[i]設置為ForwardingNode,其它線程發現tab[i] == ForwardingNode則會跳過tab[i]繼續往后執行 115 setTabAt(tab, i, fwd); 116 advance = true; 117 } 118 else if (f instanceof TreeBin) { // TreeBin的hash為-2 119 TreeBin<K,V> t = (TreeBin<K,V>)f; 120 TreeNode<K,V> lo = null, loTail = null; 121 TreeNode<K,V> hi = null, hiTail = null; 122 int lc = 0, hc = 0; 123 for (Node<K,V> e = t.first; e != null; e = e.next) { 124 int h = e.hash; 125 TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null); 126 if ((h & n) == 0) { 127 if ((p.prev = loTail) == null) 128 lo = p; 129 else 130 loTail.next = p; 131 loTail = p; 132 ++lc; 133 } else { 134 if ((p.prev = hiTail) == null) 135 hi = p; 136 else 137 hiTail.next = p; 138 hiTail = p; 139 ++hc; 140 } 141 } 142 // 如果長度小於UNTREEIFY_THRESHOLD=8,則將樹轉換為鏈表,否則將lo和hi重建為紅黑樹 143 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; 144 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; 145 setTabAt(nextTab, i, ln); 146 setTabAt(nextTab, i + n, hn); 147 setTabAt(tab, i, fwd); 148 advance = true; 149 } 150 } 151 } 152 } 153 } 154 }
1.3.8 get方法
1 public V get(Object key) { 2 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 3 // 根據key計算得到hash 4 int h = spread(key.hashCode()); 5 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { 6 if ((eh = e.hash) == h) { 7 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 8 return e.val; 9 } 10 else if (eh < 0) // 紅黑樹,從紅黑樹中查找 11 return (p = e.find(h, key)) != null ? p.val : null; 12 // 遍歷鏈表查找 13 while ((e = e.next) != null) { 14 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) 15 return e.val; 16 } 17 } 18 return null; 19 }
參考博客:
http://www.importnew.com/23610.html
http://blog.csdn.net/u010723709/article/details/48007881
http://blog.csdn.net/qq924862077/article/details/74530103
http://www.cnblogs.com/mickole/articles/3757278.html
http://www.techsite.cn/?p=5520
http://blog.csdn.net/dfdsggdgg/article/details/51538601