java.util.ConcurrentHashMap (JDK 1.8)


1.1 java.util.ConcurrentHashMap繼承結構

 

ConcurrentHashMap和HashMap的實現有很大的相似性,建議先看HashMap源碼,再來理解ConcurrentHashMap。

1.2 java.util.ConcurrentHashMap屬性

這里僅展示幾個關鍵的屬性

 1     // ConcurrentHashMap核心數組
 2     transient volatile Node<K,V>[] table;
 3 
 4     // 擴容時才會用的一個臨時數組
 5     private transient volatile Node<K,V>[] nextTable;
 6     
 7     /**
 8      * table初始化和resize控制字段
 9      * 負數表示table正在初始化或resize。-1表示正在初始化,-N表示有N-1個線程正在resize操作
10      * 當table為null的時候,保存初始化表的大小以用於創建時使用,或者直接采用默認值0
11      * table初始化之后,保存下一次擴容的的大小,跟HashMap的threshold = loadFactor*capacity作用相同
12      */
13     private transient volatile int sizeCtl;
14 
15     // resize的時候下一個需要處理的元素下標為index=transferIndex-1
16     private transient volatile int transferIndex;
17 
18     // 通過CAS無鎖更新,ConcurrentHashMap元素總數,但不是准確值
19     // 因為多個線程同時更新會導致部分線程更新失敗,失敗時會將元素數目變化存儲在counterCells中
20     private transient volatile long baseCount;
21 
22     // resize或者創建CounterCells時的一個標志位
23     private transient volatile int cellsBusy;
24 
25     // 用於存儲元素變動
26     private transient volatile CounterCell[] counterCells;

1.3 java.util.ConcurrentHashMap方法

1.3.1 Unsafe.compareAndSwapXXX方法

Unsafe.compareAndSwapXXX方法是sun.misc.Unsafe類中的方法,因為在ConcurrentHashMap中大量使用了這些方法。其聲明如下:

public final native boolean compareAndSwapXXX(type1 object, type2 offset, type4 expect, type5 update);

object為待修改的對象,offset為偏移量(數組可以理解為下標),expect為期望值,update為更新值。這個方法執行的邏輯偽代碼如下:

1 if (object[offset].value equal expect) {
2     object[offset].value = update;
3     return true;
4 } else {
5     return false   
6 }

object[offset].value 等於expect更新value值並返回true,否則不更新並且返回false。之所以不更新是因為多線程執行時有其它線程已經修改該值,expect已經不是最新的值,如果強行修改必然會覆蓋之前的修改,造成臟數據。

CAS方法都是native方法,可以保證原子性,並且效率比synchronized高。 

1.3.2 hash方法

1     static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
2     int hash = spread(key.hashCode());
3 
4     static final int spread(int h) {
5         return (h ^ (h >>> 16)) & HASH_BITS;
6     }

上面源碼為計算hash算法,h ^ (h >>> 16)在計算hash的時候key.hashCode()的高位也參與運算,這部分跟HashMap計算方法一致,不同的是h ^ (h >>> 16)計算結果“與”上 0x7fffffff,從而保證結果一定為正整數。獲得hash之后,通過hash & (n -1)計算下標。 

ConcurrentHashMap中的元素節點總結一下有這么幾種可能:

(1) null 暫無元素

(2) Node<K, V> 普通節點,可以組成單向鏈表,hash > 0

(3) TreeBin<K,V> 紅黑樹節點,TreeBin是對TreeNode的封裝,其hash為TREEBIN = -2。

HashMap和ConcurrentHashMap的TreeNode實現並不相同。

在HashMap中TreeNode封裝了紅黑樹所有的操作方法,而ConcurrentHashMap中紅黑樹操作的方法都封裝在TreeBin中,TreeBin相當於一個紅黑樹容器,容器中的紅黑樹節點為TreeNode。

HashMap可以直接在tab[i]存入TreeNode,而ConCurrentHashMap只能在tab[i]存入TreeBin。

(4) ForwardingNode<K,V> key和value都為null的一個特殊節點,用於resize操作填充已經完成遷移操作的節點。FrowardingNode的hash在初始化的時候被置成MOVED = -1

在resize過程中當發現tab[i]上是ForwardingNode的時候(通過hash判斷)就可知tab[i]已經遷移完了,直接跳過該節點去處理其它節點。

ConcurrentHashMap禁止node的key或value為null或許跟該節點的存在也是有一定關系的。

(5)ReservationNode<K,V>只在compute和computeIfAbsent中使用,其hash為RESERVED = -3

從上面的總結可以看出普通節點hash為正整數是有意義的,hash > 0是判斷該節點是否為鏈表節點(普通節點)的一個重要依據。

1.3.3 get/set/update tab[i] 方法

 1     // 獲取tab[i]節點
 2     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
 3         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
 4     }
 5 
 6     // compare and swap tab[i],期望值是c,tab[i].value == c ? tab[i] = v : return false
 7     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
 8         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
 9     }
10 
11     // 設置tab[i] = v
12     static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
13         U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
14     }

1.3.4 size() 方法

 1     public int size() {
 2         long n = sumCount();
 3         return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
 4     }
 5 
 6     final long sumCount() {
 7         CounterCell[] as = counterCells; CounterCell a;
 8         long sum = baseCount;
 9         // 除了baseCount以外,部分元素變化存儲在counterCells數組中
10         if (as != null) {
11             // 遍歷數組累加獲得結果
12             for (int i = 0; i < as.length; ++i) {
13                 if ((a = as[i]) != null)
14                     sum += a.value;
15             }
16         }
17         return sum;
18     }

ConcurrentHashMap中baseCount用於保存tab中元素總數,但是並不准確,因為多線程同時增刪改,會導致baseCount修改失敗,此時會將元素變動存儲於counterCells數組內。

當需要統計當前的size的時候,除了要統計baseCount之外,還需要統計counterCells中的元素變化。

值得一提的是即使如此,統計出來的依舊不是當前tab中元素的准確值,在多線程環境下統計前后並不能stop the world暫停線程操作,因此無法保證准確性。

1.3.5 put/putIfAbsent方法

 1     public V put(K key, V value) {
 2         // 核心是調用putVal方法
 3         return putVal(key, value, false);
 4     }
 5 
 6     public V putIfAbsent(K key, V value) {
 7         // 如果key存在就不更新value
 8         return putVal(key, value, true);
 9     }
10 
11     /** Implementation for put and putIfAbsent */
12     final V putVal(K key, V value, boolean onlyIfAbsent) {
13         // key或value 為null都是不允許的,因為Forwarding Node就是key和value都為null,是用作標志位的。
14         if (key == null || value == null) throw new NullPointerException();
15         // 根據key計算hash值,有了hash就可以計算下標了
16         int hash = spread(key.hashCode());
17         int binCount = 0;
18         // 可能需要初始化或擴容,因此一次未必能完成插入操作,所以添加上for循環
19         for (Node<K,V>[] tab = table;;) {
20             Node<K,V> f; int n, i, fh;
21             // 表還沒有初始化,先初始化,lazily initialized
22             if (tab == null || (n = tab.length) == 0)
23                 tab = initTable();
24             // 根據hash計算應該插入的index,該位置上還沒有元素,則直接插入
25             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
26                 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
27                     break;                   // no lock when adding to empty bin
28             }
29             // static final int MOVED = -1; // hash for forwarding nodes
30             // 說明f為ForwardingNode,只有擴容的時候才會有ForwardingNode出現在tab中,因此可以斷定該tab正在進行擴容
31             else if ((fh = f.hash) == MOVED)  
32                 // 協助擴容            
33                 tab = helpTransfer(tab, f);   
34             else {
35                 V oldVal = null;
36                 // 節點上鎖,hash值相同的節點組成的鏈表頭結點
37                 synchronized (f) {
38                     if (tabAt(tab, i) == f) {
39                         if (fh >= 0) { // 是鏈表節點
40                             binCount = 1;
41                             for (Node<K,V> e = f;; ++binCount) {
42                                 K ek;
43                                 // 遍歷鏈表查找是否包含該元素
44                                 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
45                                     oldVal = e.val;  // 保存舊的值用於當做返回值
46                                     if (!onlyIfAbsent)
47                                         e.val = value;  // 替換舊的值為新值
48                                     break;
49                                 }
50                                 Node<K,V> pred = e;
51                                 if ((e = e.next) == null) {
52                                     // 遍歷鏈表,如果一直沒找到,則新建一個Node放到鏈表結尾
53                                     pred.next = new Node<K,V>(hash, key, value, null);
54                                     break;
55                                 }
56                             }
57                         }
58                         else if (f instanceof TreeBin) { // 是紅黑樹節點
59                             Node<K,V> p;
60                             binCount = 2;
61                             // 去紅黑樹查找該元素,如果沒找到就添加,找到了就返回該節點
62                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
63                                 // 保存舊的value用於返回
64                                 oldVal = p.val;
65                                 if (!onlyIfAbsent)
66                                     p.val = value; // 替換舊的值
67                             }
68                         }
69                     }
70                 }
71                 if (binCount != 0) {
72                     if (binCount >= TREEIFY_THRESHOLD)
73                         // 鏈表長度超過閾值(默認為8),則需要將鏈表轉為一棵紅黑樹
74                         treeifyBin(tab, i);
75                     if (oldVal != null)
76                         // 如果只是替換,並未帶來節點的增加則直接返回舊的value即可
77                         return oldVal;
78                     break;
79                 }
80             }
81         }
82         // 元素總數加1,並且判斷是否需要擴容
83         addCount(1L, binCount);
84         return null;
85     }

1.3.6 addCount方法

在putVal方法中調用了若干其它方法,下面來看下addCount方法。

 1     // check<0不檢查resize, check<=1只在沒有線程競爭的情況下檢查resize
 2     private final void addCount(long x, int check) {
 3         CounterCell[] as; long b, s;
 4         // counterCells數組不為null       
 5         if ((as = counterCells) != null ||
 6             // CAS更新BASECOUNT失敗(有其它線程更新了BASECOUNT,baseCount已經不是最新值)
 7             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 8             CounterCell a; long v; int m;
 9             boolean uncontended = true;
10             // counterCells為null
11             if (as == null || (m = as.length - 1) < 0 ||
12                 // counterCells對應位置為null,這里不是很懂,有沒有大神解答下?
13                 // ThreadLocalRandom.getProbe() 獲得線程探測值,什么用途?
14                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
15                 // 更新CELLVALUE失敗
16                 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
17                 // 初始化counterCells
18                 fullAddCount(x, uncontended);
19                 return;
20             }
21             // counterCells != null 或者 BASECOUNT CAS更新失敗都是因為有線程競爭,因此不檢查resize
22             if (check <= 1)
23                 return;
24             // 統計下ConcurrentHashMap元素總數    
25             s = sumCount();
26         }
27         if (check >= 0) {
28             Node<K,V>[] tab, nt; int n, sc;
29             // 元素總數大於sizeCtl
30             while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
31                 // 獲取一個resize標志位   
32                 int rs = resizeStamp(n);
33                 // sizeCtl < 0 表示table正在初始化或者resize
34                 if (sc < 0) {
35                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
36                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
37                         break;
38                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
39                         transfer(tab, nt);
40                 }
41                 // 當前線程是第一個發起擴容操作
42                 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
43                     transfer(tab, null);
44                 s = sumCount();
45             }
46         }
47     }

1.3.7 resize相關方法:resizeStamp、helpTransfer、transfer

1     // 返回一個標志位,該標志位經過RESIZE_STAMP_SHIFT左移必定為負數
2     static final int resizeStamp(int n) {
3         // Integer.numberOfLeadingZeros返回n對應32位二進制數左側0的個數,如9(1001)返回28  
4         // 1 << (RESIZE_STAMP_BITS - 1) = 2^15,其中RESIZE_STAMP_BITS固定為16
5         return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
6     }
7     

helpTransfer方法:輔助擴容方法,直接進入transfer方法的遷移元素階段

 1     final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 2         Node<K,V>[] nextTab; int sc;
 3         // 在tab中發現了ForwardingNode,在ForwardingNode初始化的時候保存了nextTable引用
 4         // 因此可以通過f找到nextTable,並且可以斷定nextTable!=null
 5         if (tab != null && (f instanceof ForwardingNode) &&
 6             (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
 7             int rs = resizeStamp(tab.length);
 8             while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
 9                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
10                     sc == rs + MAX_RESIZERS || transferIndex <= 0)
11                     break;
12                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
13                     transfer(tab, nextTab);
14                     break;
15                 }
16             }
17             return nextTab;
18         }
19         return table;
20     }  

transfer方法:resize的核心操作。基本思路是先new一個double capacity的nextTable數組,然后將tab中的元素一個一個遷移到nextTable中。遷移完成后將tab = nextTable操作替換掉tab。

  1     // 擴容操作方法
  2     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3         int n = tab.length, stride;
  4         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  5             stride = MIN_TRANSFER_STRIDE; // subdivide range
  6         // 剛開始resize,需要初始化nextTab
  7         if (nextTab == null) {
  8             try {
  9                 @SuppressWarnings("unchecked")
 10                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  // 擴容為兩倍
 11                 nextTab = nt;
 12             } catch (Throwable ex) {      // try to cope with OOME
 13                 sizeCtl = Integer.MAX_VALUE;
 14                 return;
 15             }
 16             nextTable = nextTab;
 17             transferIndex = n;  // 倒序transfer tab
 18         }
 19         int nextn = nextTab.length; // 擴容后表的length
 20         // 預先定義一個頭節點ForwardingNode,其hash被置成MOVED=-1
 21         // 當線程發現某個元素hash==MOVED則表明該節點已經被處理過
 22         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 23         boolean advance = true;
 24         // 是否完成元素遷移的標志
 25         boolean finishing = false; // to ensure sweep before committing nextTab
 26         for (int i = 0, bound = 0;;) {
 27             Node<K,V> f; int fh;
 28             // 這個while循環是為了找到下一個准備處理的下標
 29             while (advance) {
 30                 int nextIndex, nextBound;
 31                 // --i還未越界,准備處理tab[i]
 32                 // finishing==true,resize完成,可能處於提交前的檢查階段,檢查tab[--i]
 33                 if (--i >= bound || finishing)
 34                     advance = false;
 35                 // 下一個准備處理的元素下標為transferIndex-1<0, 可以斷定tab已經完成了transfer操作    
 36                 else if ((nextIndex = transferIndex) <= 0) {
 37                     i = -1;
 38                     advance = false;
 39                 }
 40                 else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
 41                           nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
 42                     bound = nextBound;
 43                     i = nextIndex - 1; // 下一個准備處理的index
 44                     advance = false;
 45                 }
 46             }
 47             // i越界,可能已經完成元素遷移操作
 48             if (i < 0 || i >= n || i + n >= nextn) {
 49                 int sc;
 50                 if (finishing) { // 擴容完成,替換table
 51                     // 擴容完成nextTable置空
 52                     nextTable = null;
 53                     // 替換table為擴容后的nextTab
 54                     table = nextTab;
 55                     // sizeCtl設置為0.75 * capacity,即為下一次需要擴容的閾值
 56                     sizeCtl = (n << 1) - (n >>> 1);
 57                     return;
 58                 }
 59                 // CAS更新sizeCtl,sc-1表示新加入一個線程參與擴容操作
 60                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 61                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 62                         return;
 63                     finishing = advance = true;
 64                     // 處理完成后重新遍歷一遍,以免多線程操作帶來遺漏
 65                     i = n; // recheck before commit
 66                 }
 67             }
 68             else if ((f = tabAt(tab, i)) == null)
 69                 // tab[i] == null則置一個ForwardingNode
 70                 advance = casTabAt(tab, i, null, fwd);
 71             else if ((fh = f.hash) == MOVED)
 72                 // ForwardingNode的hash為MOVED,說明tab[i]已經被置成ForwardingNode,已經處理過
 73                 advance = true; // already processed
 74             else {
 75                 // 對tab[i]節點加鎖,鎖住了tab[i]節點上所有的Node
 76                 synchronized (f) {
 77                     // 如果AB兩個線程先后執行到這里,A線程獲取鎖,執行完遷移之后釋放鎖;B線程獲取鎖,此時tab[i]是ForwardingNode,不等於f
 78                     if (tabAt(tab, i) == f) {
 79                         Node<K,V> ln, hn;
 80                         // fh >= 0說明是鏈表節點。TreeBin的hash在初始化的時候被置成TREEBIN=-2
 81                         if (fh >= 0) {
 82                             // (fh = f.hash) & n 決定Node應該遷移到原下標i還是應該遷移到i+n位置
 83                             // 這種擴容方法參考HashMap的resize思想 http://www.cnblogs.com/snowater/p/7742287.html
 84                             int runBit = fh & n;
 85                             Node<K,V> lastRun = f;
 86                             // 遍歷鏈表找到最后一個與鏈表頭結點runBit不同的Node,並且將runBit置為該節點的 p.hash & n
 87                             for (Node<K,V> p = f.next; p != null; p = p.next) {
 88                                 int b = p.hash & n;
 89                                 if (b != runBit) {
 90                                     runBit = b;
 91                                     lastRun = p;
 92                                 }
 93                             }
 94                             if (runBit == 0) {
 95                                 // runBit == 0 表明該Node還應遷移到下標i的位置
 96                                 ln = lastRun;
 97                                 hn = null;
 98                             }
 99                             else {
100                                 // runBit != 0 表明該Node應遷移到下標i + n的位置
101                                 hn = lastRun;
102                                 ln = null;
103                             }
104                             // 遍歷鏈表,拆分之,拆分后基本是原鏈表的倒序(最后一段鏈表除外,它還是以順序的方式處於鏈表末尾)
105                             for (Node<K,V> p = f; p != lastRun; p = p.next) {
106                                 int ph = p.hash; K pk = p.key; V pv = p.val;
107                                 if ((ph & n) == 0) // 該Node應該遷移到下標i位置
108                                     ln = new Node<K,V>(ph, pk, pv, ln);
109                                 else // 該Node應該遷移到下標i+n位置
110                                     hn = new Node<K,V>(ph, pk, pv, hn);
111                             }
112                             setTabAt(nextTab, i, ln);
113                             setTabAt(nextTab, i + n, hn);
114                             // 處理完后將tab[i]設置為ForwardingNode,其它線程發現tab[i] == ForwardingNode則會跳過tab[i]繼續往后執行
115                             setTabAt(tab, i, fwd); 
116                             advance = true;
117                         }
118                         else if (f instanceof TreeBin) { // TreeBin的hash為-2
119                             TreeBin<K,V> t = (TreeBin<K,V>)f;
120                             TreeNode<K,V> lo = null, loTail = null;
121                             TreeNode<K,V> hi = null, hiTail = null;
122                             int lc = 0, hc = 0;
123                             for (Node<K,V> e = t.first; e != null; e = e.next) {
124                                 int h = e.hash;
125                                 TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
126                                 if ((h & n) == 0) {
127                                     if ((p.prev = loTail) == null)
128                                         lo = p;
129                                     else
130                                         loTail.next = p;
131                                     loTail = p;
132                                     ++lc;
133                                 } else {
134                                     if ((p.prev = hiTail) == null)
135                                         hi = p;
136                                     else
137                                         hiTail.next = p;
138                                     hiTail = p;
139                                     ++hc;
140                                 }
141                             }
142                             // 如果長度小於UNTREEIFY_THRESHOLD=8,則將樹轉換為鏈表,否則將lo和hi重建為紅黑樹
143                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
144                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
145                             setTabAt(nextTab, i, ln);
146                             setTabAt(nextTab, i + n, hn);
147                             setTabAt(tab, i, fwd);
148                             advance = true;
149                         }
150                     }
151                 }
152             }
153         }
154     }        

1.3.8 get方法

 1     public V get(Object key) {
 2         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 3         // 根據key計算得到hash
 4         int h = spread(key.hashCode());
 5         if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
 6             if ((eh = e.hash) == h) {
 7                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
 8                     return e.val;
 9             }
10             else if (eh < 0)  // 紅黑樹,從紅黑樹中查找
11                 return (p = e.find(h, key)) != null ? p.val : null;
12             // 遍歷鏈表查找
13             while ((e = e.next) != null) {
14                 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
15                     return e.val;
16             }
17         }
18         return null;
19     }

參考博客:

http://www.importnew.com/23610.html
http://blog.csdn.net/u010723709/article/details/48007881
http://blog.csdn.net/qq924862077/article/details/74530103
http://www.cnblogs.com/mickole/articles/3757278.html
http://www.techsite.cn/?p=5520
http://blog.csdn.net/dfdsggdgg/article/details/51538601


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM