Java並發容器--ConcurrentHashMap


引子

  1、不安全:大家都知道HashMap不是線程安全的,在多線程環境下,對HashMap進行put操作會導致死循環。是因為多線程會導致Entry鏈表形成環形數據結構,這樣Entry的next節點將永遠不為空,就會產生死循環獲取Entry。具體內容見HashMap隨筆。

  2、不高效:Collections.synchronizedMap(hashMap)和HashTable的線程安全原理都是對方法進行同步,所有操作競爭同一把鎖,性能比較低。

  如何構造一個線程安全且高效的HashMap?ConcurrentHashMap登場。

鎖分段技術

  ConcurrentHashMap將數據分為很多段(Segment),Segment繼承了ReentrantLock,每個段都是一把鎖。每個Segment都包含一個HashEntry數組,HashEntry數組存放鍵值對數據。當一個線程要訪問Entry數組時,需要獲取所在Segment鎖,保證在同一個Segment的操作是線程安全的,但其他Segment的數據的訪問不受影響,可以實現並發的訪問不同的Segment。同一個段中才存在競爭關系,不同的段之間沒有競爭關系。

ConcurrentHashMap源碼分析

  源碼分析基於jdk1.7,不同版本實現有所不同。

 

  類圖

  

 

  初始化

    segmentShift和segmentMask的作用是定位Segment索引。以默認值為例,concurrencyLevel為16,需要移位4次(sshift為4),segmentShift就等於28,segmentMask等於15。

    concurrencyLevel是指並發級別,即Segment數組的大小。concurrencyLevel值得設定應該根據並發線程數決定。如果並發級別設置的太小,同一個Segment的元素數量過多,會引起鎖競爭的加重;如果太大,原本屬於同一個Segment的元素會被分配到不同的Segment,會引起Cpu緩存命中率下降,進而導致程序性能下降。

 1             //initialCapacity:初始容量,默認16。
 2             //loadFactor:負載因子,默認0.75。當元素個數大於loadFactor*最大容量時需要擴容(rehash)
 3             //concurrencyLevel:並發級別,默認16。確定Segment的個數,Segment的個數為大於等於concurrencyLevel的第一個2^n。
 4             public ConcurrentHashMap(int initialCapacity,
 5                              float loadFactor, int concurrencyLevel) {
 6                 //判斷參數是否合法
 7                 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 8                     throw new IllegalArgumentException();
 9                 //Segment最大個數MAX_SEGMENTS = 1 << 16,即65536;
10                 if (concurrencyLevel > MAX_SEGMENTS)
11                     concurrencyLevel = MAX_SEGMENTS;
12                 
13                 // Find power-of-two sizes best matching arguments
14                 int sshift = 0;
15                 int ssize = 1;
16                 //使用循環找到大於等於concurrencyLevel的第一個2^n。ssize就表示Segment的個數。
17                 while (ssize < concurrencyLevel) {
18                     ++sshift;    //記錄移位的次數,
19                     ssize <<= 1;//左移1位
20                 }
21                 this.segmentShift = 32 - sshift;    //用於定位hash運算的位數,之所以用32是因為ConcurrentHashMap里的hash()方法輸出的最大數是32位的
22                 this.segmentMask = ssize - 1;        //hash運算的掩碼,ssize為2^n,所以segmentMask每一位都為1。目的是之后可以通過key的hash值與這個值做&運算確定Segment的索引。
23                 //最大容量MAXIMUM_CAPACITY = 1 << 30;
24                 if (initialCapacity > MAXIMUM_CAPACITY)
25                     initialCapacity = MAXIMUM_CAPACITY;
26                 //計算每個Segment所需的大小,向上取整
27                 int c = initialCapacity / ssize;
28                 if (c * ssize < initialCapacity)
29                     ++c;
30                 int cap = MIN_SEGMENT_TABLE_CAPACITY;//每個Segment最小容量MIN_SEGMENT_TABLE_CAPACITY = 2;
31                 //cap表示每個Segment的容量,也是大於等於c的2^n。
32                 while (cap < c)
33                     cap <<= 1;
34                 //創建一個Segment實例,作為Segment數組ss的第一個元素
35                 // create segments and segments[0]
36                 Segment<K,V> s0 =
37                     new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
38                                      (HashEntry<K,V>[])new HashEntry[cap]);
39                 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
40                 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
41                 this.segments = ss;
42             }
View Code

 

    

  插入元素(put)

    可以分為三步:

      1、定位Segment:通過Hash值與segmentShift、segmentMask的計算定位到對應的Segment;

      2、鎖獲取:獲取對應Segment的鎖,如果獲取鎖失敗,需要自旋重新獲取鎖;如果自旋超過最大重試次數,則阻塞。

      3、插入元素:如果key已經存在,直接更新;如果key不存在,先判斷是否需要擴容,若需要則執行rehash()后插入原因,否則直接存入元素。

    為了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。

    Segment的擴容判斷比HashMap更恰當,因為HashMap是在插入元素后判斷元素是否已經到達容量的,如果到達了就進行擴容,但是很有可能擴容之后沒有新元素插入,這時HashMap就進行了一次無效的擴容。

    與HashMap不同ConcurrentHashMap並不允許key或者value為null。

  1             /**ConcurrentHashMap中方法**/
  2             public V put(K key, V value) {
  3                 Segment<K,V> s;
  4                 if (value == null)
  5                     throw new NullPointerException();
  6                 int hash = hash(key);    //計算hash值,hash值是一個32位的整數
  7                 //計算Segment索引
  8                 //在默認情況下,concurrencyLevel為16,segmentShift為28,segmentMask為15。
  9                 //先右移28位,hash值變為0000 0000 0000 0000 0000 0000 0000 xxxx,
 10                 //與segmentMask做&運算,就是取最后四位的值。這個值就是Segment的索引
 11                 int j = (hash >>> segmentShift) & segmentMask; 
 12                 //通過UNSAFE的方式獲取索引j對應的Segment對象。
 13                 if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
 14                      (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
 15                     //Segment采用延遲初始化機制,如果sement為null,則調用ensureSegment創建Segment
 16                     s = ensureSegment(j);
 17                 //向Segment中put元素
 18                 return s.put(key, hash, value, false);
 19             }
 20 
 21             /**ConcurrentHashMap$Segment中方法**/
 22             //向Segment中put元素
 23             final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 24                 //獲取鎖。如果獲取鎖成功,插入元素,和普通的hashMap差不多。
 25                 //如果獲取鎖失敗,執行scanAndLockForPut進行重試。重試設計見scanAndLockForPut方法源碼。
 26                 HashEntry<K,V> node = tryLock() ? null :
 27                     scanAndLockForPut(key, hash, value);
 28                 V oldValue;
 29                 try {
 30                     HashEntry<K,V>[] tab = table;
 31                     int index = (tab.length - 1) & hash;//計算HashEntry數組索引
 32                     HashEntry<K,V> first = entryAt(tab, index);
 33                     for (HashEntry<K,V> e = first;;) {
 34                         if (e != null) {    //該索引處已經有元素
 35                             K k;
 36 
 37                             //如果key相同,替換value。
 38                             if ((k = e.key) == key ||
 39                                 (e.hash == hash && key.equals(k))) {
 40                                 oldValue = e.value;
 41                                 //onlyIfAbsent=true參數表示如果key存在,則不更新value值,只有在key不存在的情況下,才更新。
 42                                 //在putIfAbsent方法中onlyIfAbsent=true
 43                                 //在put方法中onlyIfAbsent=false
 44                                 if (!onlyIfAbsent) {Scans
 45                                     e.value = value;
 46                                     ++modCount;//修改次數
 47                                 }
 48                                 break;
 49                             }
 50                             e = e.next;//繼續找下一個元素
 51                         }
 52                         else {    
 53                             if (node != null)
 54                                 node.setNext(first);
 55                             else
 56                                 node = new HashEntry<K,V>(hash, key, value, first);
 57                             int c = count + 1;    //count為ConcurrentHashMap$Segment中的域
 58                             if (c > threshold && tab.length < MAXIMUM_CAPACITY)
 59                                 //如果元素數量超過閾值且表長度小於MAXIMUM_CAPACITY,擴容
 60                                 rehash(node);
 61                             else
 62                                 setEntryAt(tab, index, node);//將node節點更新到table中
 63                             ++modCount;
 64                             count = c;
 65                             oldValue = null;
 66                             break;
 67                         }
 68                     }
 69                 } finally {
 70                     unlock();
 71                 }
 72                 return oldValue;
 73             }
 74             
 75             /**ConcurrentHashMap$Segment中方法**/
 76             //自旋獲取鎖
 77             private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
 78                 //entryForHash根據hash值找到當前segment中對應的HashEntry數組索引。
 79                 HashEntry<K,V> first = entryForHash(this, hash);
 80                 HashEntry<K,V> e = first;
 81                 HashEntry<K,V> node = null;
 82                 int retries = -1; // negative while locating node
 83                 //自旋獲取鎖。若獲取到鎖,則跳出循環;否則一直循環直到獲取到鎖或retries大於MAX_SCAN_RETRIES。
 84                 while (!tryLock()) {
 85                     HashEntry<K,V> f; // to recheck first below
 86                     //當retries = -1時(即第一次循環或更新操作導致的first節點發生變化),會遍歷該Segment的HashEntry數組中hash對應的鏈表,如果key對應的HashEntry不存在,則創建該節點。
 87                     //此處遍歷鏈表的原因:希望遍歷的鏈表被CPU cache所緩存,為后續實際put過程中的鏈表遍歷操作提升性能。怎么理解呢?放在put時再去遍歷不行嗎?因為此時當前線程沒有獲取到Segment鎖,所以不能進行put操作,但可以為put操作做一些准備工作(有可能加載到緩存),使put的操作更快,從而減少鎖競爭。這種思想在remove()方法中也有體現。
 88                     if (retries < 0) {
 89                         if (e == null) {
 90                             //如果key不存在創建node,然后進入下一個循環
 91                             if (node == null) // speculatively create node
 92                                 node = new HashEntry<K,V>(hash, key, value, null);
 93                             retries = 0;
 94                         }
 95                         else if (key.equals(e.key))
 96                             //如果key存在直接進入下一個循環
 97                             retries = 0;
 98                         else
 99                             e = e.next;    //鏈表的下一個節點
100                     }
101                     else if (++retries > MAX_SCAN_RETRIES) {
102                         //每次循環,retries加1,判斷是否大於最大重試次數MAX_SCAN_RETRIES.
103                         //static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
104                         //為了防止自旋鎖大量消耗CPU的缺點。如果超過MAX_SCAN_RETRIES,使用lock方法獲取鎖。如果獲取不到鎖則當前線程阻塞並跳出循環。
105                         //ReentrantLock的lock()和tryLock()方法的區別。
106                         lock();
107                         break;
108                     }
109                     else if ((retries & 1) == 0 &&
110                              (f = entryForHash(this, hash)) != first) {
111                         //每隔一次循環,檢查所在數組索引的鏈表頭結點有沒有變化(其他線程有更新Map的操作,如put,rehash或者remove操作)。
112                         //如果改變,retries更新為-1,重新遍歷
113                         e = first = f; // re-traverse if entry changed
114                         retries = -1;
115                     }
116                 }
117                 return node;
118             }
119 
120             /**ConcurrentHashMap$Segment中方法**/
121             //rehash
122             private void rehash(HashEntry<K,V> node) {
123                 HashEntry<K,V>[] oldTable = table;
124                 int oldCapacity = oldTable.length;
125                 int newCapacity = oldCapacity << 1;    //新容量為舊容量的2倍
126                 threshold = (int)(newCapacity * loadFactor);    //新閾值
127                 HashEntry<K,V>[] newTable =
128                     (HashEntry<K,V>[]) new HashEntry[newCapacity];    //新表
129                 int sizeMask = newCapacity - 1;    //新掩碼
130                 //對舊表做遍歷
131                 for (int i = 0; i < oldCapacity ; i++) {
132                     HashEntry<K,V> e = oldTable[i];
133                     if (e != null) {
134                         HashEntry<K,V> next = e.next;
135                         int idx = e.hash & sizeMask;
136                         if (next == null)   //  Single node on list 鏈表中只存在一個節點
137                             newTable[idx] = e;
138                         else { // Reuse consecutive sequence at same slot
139                             //鏈表中存在多個節點.
140                             /*
141                             相對於HashMap的resize,ConcurrentHashMap的rehash原理類似,但是Doug Lea為rehash做了一定的優化,避免讓所有的節點都進行復制操作:由於擴容是基於2的冪指來操作,假設擴容前某HashEntry對應到Segment中數組的index為i,數組的容量為capacity,那么擴容后該HashEntry對應到新數組中的index只可能為i或者i+capacity,因此大多數HashEntry節點在擴容前后index可以保持不變。基於此,rehash方法中會定位第一個后續所有節點在擴容后index都保持不變的節點,然后將這個節點之前的所有節點重排即可
142                             */
143                             HashEntry<K,V> lastRun = e;
144                             int lastIdx = idx;
145                             //找到第一個在擴容后index都保持不變的節點lastRun
146                             for (HashEntry<K,V> last = next;
147                                  last != null;
148                                  last = last.next) {
149                                 int k = last.hash & sizeMask;
150                                 if (k != lastIdx) {
151                                     lastIdx = k;
152                                     lastRun = last;
153                                 }
154                             }
155                             newTable[lastIdx] = lastRun;
156                             // Clone remaining nodes 
157                             //將這個節點之前的所有節點重排
158                             for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
159                                 V v = p.value;
160                                 int h = p.hash;
161                                 int k = h & sizeMask;
162                                 HashEntry<K,V> n = newTable[k];
163                                 newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
164                             }
165                         }
166                     }
167                 }
168                 int nodeIndex = node.hash & sizeMask; // add the new node
169                 node.setNext(newTable[nodeIndex]);
170                 newTable[nodeIndex] = node;
171                 table = newTable;
172             }
View Code

 

    

    Segment延遲初始化機制

      Segment采用延遲初始化機制,如果sement為null,則調用ensureSegment確保創建Segment。

      ensureSegment方法可能被多個線程調用,ensureSegment()是怎么保證線程安全的呢?

      通過源代碼可看出ensureSegment方法並未使用鎖來控制競爭,而是使用了Unsafe對象的getObjectVolatile()提供的原子讀語義結合CAS來確保Segment創建的原子性。

      ensureSegment()源代碼:

 1                 @SuppressWarnings("unchecked")
 2                 private Segment<K,V> ensureSegment(int k) {
 3                     final Segment<K,V>[] ss = this.segments;
 4                     long u = (k << SSHIFT) + SBASE; // raw offset
 5                     Segment<K,V> seg;
 6                     if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
 7                         //使用第一個segment作為模板來創建segment,第一個segment在Map初始化時已經被創建
 8                         Segment<K,V> proto = ss[0]; // use segment 0 as prototype
 9                         int cap = proto.table.length;
10                         float lf = proto.loadFactor;
11                         int threshold = (int)(cap * lf);
12                         HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
13                         if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
14                             == null) { // recheck
15                             Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);    //根據第一個segment的參數創建新的Segment
16                             //自旋CAS。如果seg!=null,說明該segment已經被其他線程創建,則方法結束;如果seg==null,說明該segment還沒有被創建,則當前線程采用CAS更新Segment數組,如果CAS成功,則結束,否則說明其他線程對Segment數組有過更新,繼續下一個循環指定該segment創建成功。
17                             while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
18                                    == null) {
19                                 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
20                                     break;
21                             }
22                         }
23                     }
24                     return seg;
25                 }
View Code

 

 

    

    scanAndLockForPut方法

      自旋獲取鎖中,當第一次循環或更新操作導致的first節點發生變化時,會遍歷該Segment的HashEntry數組中hash對應的鏈表,如果key對應的HashEntry不存在,則創建該節點。

      此處遍歷鏈表的原因:希望遍歷的鏈表被CPU cache所緩存,為后續實際put過程中的鏈表遍歷操作提升性能。怎么理解呢?put還是要再去遍歷一次(即使鏈表在緩存中)?因為此時當前線程沒有獲取到Segment鎖,所以不能進行put操作,但可以為put操作做一些准備工作(有可能加載到緩存,在緩存中執行遍歷更快),使put的操作更快,從而減少鎖競爭。這種思想在remove()方法中也有體現。

  

 

 

  獲取元素(get)

    get操作不需要加鎖,當拿到的值為空時才會加鎖重讀。讀操作不用加鎖的原因是它的get方法里將要使用的共享變量都定義成volatile類型,如volatile V value。定義成volatile的變量,能夠在線程之間保持可見性,能夠被多線程同時讀,並且保證不會讀到過期的值,但是只能被單線程寫(有一種情況可以被多線程寫,就是寫入的值不依賴於原值)。get方法使用UNSAFE提供的原子讀語義來獲的Segmnet和對應的鏈表。

    containsKey方法和get相似,都不用加鎖。

 1             public V get(Object key) {
 2                 Segment<K,V> s; // manually integrate access methods to reduce overhead
 3                 HashEntry<K,V>[] tab;
 4                 int h = hash(key);
 5                 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 6                 //通過Hash值找到相應的Segment
 7                 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
 8                     (tab = s.table) != null) {
 9                     //找到HashEntry鏈表的索引,遍歷鏈表找到對應的key
10                     for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
11                              (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
12                          e != null; e = e.next) {
13                         K k;
14                         if ((k = e.key) == key || (e.hash == h && key.equals(k)))
15                             return e.value;
16                     }
17                 }
18                 return null;
19             }
View Code

 

 

 

  統計大小(size)

    統計Map的大小需要統計所有Segment的大小然后求和。

    問題:累加的過程中Segment的大小可能會發生變化,導致統計的結果不准確。

    解決方案:1)簡單的方法就是對所有的Segment加鎖,但方法低效。

         2)考慮到累加的過程中Segment的大小變化的可能性很小,作者給出了更高效的方案,首先嘗試幾次在不對Segment加鎖的情況下統計各個Segment的大小,如果累加期間Map的大小發生了變化,再使用加鎖的方式統計各個Segment的大小。判斷Map的大小是否發生了變化,需要通過Segment的modCount變量實現。modCount表示對Segment的修改次數。相同的思想也用在了containsValue操作。

    注意事項:使用加鎖方式進行統計大小時,對每一個Segment加鎖,需要強制創建所有的Segment,這么做的目的是防止其他線程創建Segment並進行更新操作。所以應盡量避免在多線程環境下使用size和containsValue方法。

 1             public int size() {
 2                 // Try a few times to get accurate count. On failure due to
 3                 // continuous async changes in table, resort to locking.
 4                 final Segment<K,V>[] segments = this.segments;
 5                 int size;
 6                 boolean overflow; // true if size overflows 32 bits
 7                 long sum;         // sum of modCounts
 8                 long last = 0L;   // previous sum
 9                 int retries = -1; // first iteration isn't retry
10                 try {
11                     for (;;) {
12                         //static final int RETRIES_BEFORE_LOCK = 2;
13                         //判斷是否到達無鎖統計map大小的最大次數,若達到最大次數需要鎖所有Segment
14                         if (retries++ == RETRIES_BEFORE_LOCK) {
15                             //對每一個Segment加鎖,此時需要強制創建所有的Segment,這么做的目的是防止其他線程創建Segment並進行更新操作。
16                             //所以應避免在多線程環境下使用size和containsValue方法。
17                             for (int j = 0; j < segments.length; ++j)
18                                 ensureSegment(j).lock(); // force creation
19                         }
20                         sum = 0L;
21                         size = 0;
22                         overflow = false;
23                         for (int j = 0; j < segments.length; ++j) {
24                             Segment<K,V> seg = segmentAt(segments, j);
25                             if (seg != null) {
26                                 sum += seg.modCount;
27                                 int c = seg.count;
28                                 if (c < 0 || (size += c) < 0)
29                                     overflow = true;
30                             }
31                         }
32                         //判斷前后兩次統計的modCount之和是否相等,若相等則說明沒有被修改郭
33                         //由於last初始值為0,如果該Map從創建到現在都沒有被修改過,即所有Segment的modCount都為0,則只執行一次循環;否則至少執行兩次循環,比較兩次統計的sum有沒有發生變化。又因為retries初始值-1,所以可以說重試無鎖統計大小的次數為3次。
34                         if (sum == last)
35                             break;
36                         last = sum;
37                     }
38                 } finally {
39                     //重試次數大於最大次數,需要釋放鎖
40                     if (retries > RETRIES_BEFORE_LOCK) {
41                         for (int j = 0; j < segments.length; ++j)
42                             segmentAt(segments, j).unlock();
43                     }
44                 }
45                 return overflow ? Integer.MAX_VALUE : size;
46             }
View Code

 

 Java8的ConcurrentHashMap

 

  相對於Java7中的實現,主要有以下兩點改進:

    1)取消segment分段,直接使用數組transient volatile Node<K,V>[] table存儲數據,將table數組元素作為鎖,實現對數組中每一個桶進行加鎖,進一步減少並發沖突的概率。

    2)類似於Java8中的HashMap,將數組+鏈表的結構變更為數組+鏈表+紅黑樹的結構。當鏈表的長度大於8時,將鏈表轉換為紅黑樹,原因見HashMap。

  通過 Node + CAS + Synchronized 來保證線程安全。

  Fields

  

 1     transient volatile Node<K,V>[] table;//存放元素的數組,懶加載,大小是2的n次方
 2     private transient volatile Node<K,V>[] nextTable;//擴容時用到
 3     //基本計數器,通過CAS更新
 4     private transient volatile long baseCount;
 5     /*控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
 6      *當為負數時:-1代表正在初始化,-N代表有N-1個線程正在進行擴容
 7      *當為0時:代表當時的table還沒有被初始化
 8      *當為正數時:表示初始化或者下一次進行擴容的大小
 9      */
10     private transient volatile int sizeCtl;
11 
12     /**
13      * The next table index (plus one) to split while resizing.
14      */
15     private transient volatile int transferIndex;
16 
17     /**
18      * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
19      */
20     private transient volatile int cellsBusy;
21 
22     /**
23      * Table of counter cells. When non-null, size is a power of 2.
24      */
25     private transient volatile CounterCell[] counterCells;
26 
27     // views
28     private transient KeySetView<K,V> keySet;
29     private transient ValuesView<K,V> values;
30     private transient EntrySetView<K,V> entrySet;
View Code

 

 

  Node

    hash值和key都是final的,不可更改;val和next都是volatile的保證可見性和禁止指令重排序。

 1     static class Node<K,V> implements Map.Entry<K,V> {
 2         //hash值和key都是final的,不可更改;val和next都是volatile的保證可見性和禁止指令重排序。
 3         final int hash;
 4         final K key;
 5         volatile V val;
 6         volatile Node<K,V> next;
 7 
 8         Node(int hash, K key, V val, Node<K,V> next) {
 9             this.hash = hash;
10             this.key = key;
11             this.val = val;
12             this.next = next;
13         }
14 
15         public final K getKey()       { return key; }
16         public final V getValue()     { return val; }
17         public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
18         public final String toString(){ return key + "=" + val; }
19         //不允許更改值
20         public final V setValue(V value) {
21             throw new UnsupportedOperationException();
22         }
23         public final boolean equals(Object o) {
24             Object k, v, u; Map.Entry<?,?> e;
25             return ((o instanceof Map.Entry) &&
26                     (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
27                     (v = e.getValue()) != null &&
28                     (k == key || k.equals(key)) &&
29                     (v == (u = val) || v.equals(u)));
30         }
31         //用於map中的get()方法,子類重寫
32         Node<K,V> find(int h, Object k) {
33             Node<K,V> e = this;
34             if (k != null) {
35                 do {
36                     K ek;
37                     if (e.hash == h &&
38                         ((ek = e.key) == k || (ek != null && k.equals(ek))))
39                         return e;
40                 } while ((e = e.next) != null);
41             }
42             return null;
43         }
44     }
View Code

 

 

  put

    添加元素的大致過程如下:

      1)如果table沒有初始化,先通過initTable()方法進行初始化;

      2)計算hash值,找到對應的桶,如果該桶的首節點f為null(即不存在hash沖突),使用CAS直接將新Node放入該桶;

      3)如果首節點f的hash值為MOVED,說明正在擴容,先進行擴容;

      4)如果存在hash沖突,則通過加鎖(獲取首節點f的監視器鎖)來保證線程安全,分兩種情況:鏈表和紅黑樹;如果是鏈表,則遍歷鏈表,存在相同的key就進行覆蓋,否則插入到鏈表的尾部;如果是紅黑樹,則向紅黑樹中插入新節點;

      5)判斷是否需要將鏈表轉化為紅黑樹,如果需要,調用treeifyBin方法;

      6)如果添加成功,就調用addCount方法統計size,並檢查是否需要擴容。

 1     public V put(K key, V value) {
 2         return putVal(key, value, false);
 3     }
 4     
 5     final V putVal(K key, V value, boolean onlyIfAbsent) {
 6         if (key == null || value == null) throw new NullPointerException();
 7         int hash = spread(key.hashCode());//計算hash值
 8         //用於記錄相應鏈表的長度
 9         int binCount = 0;
10         for (Node<K,V>[] tab = table;;) {
11             Node<K,V> f; int n, i, fh;
12             if (tab == null || (n = tab.length) == 0)
13                 //如果數組為空,則進行初始化
14                 tab = initTable();
15             //找到該hash值對應的下標i,得到第一個節點f
16             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
17                 //如果第一個節點f為null,使用CAS直接將新Node放入該桶
18                 //如果CAS成功,跳出循環結束;如果失敗,進入下一個循環
19                 if (casTabAt(tab, i, null,
20                              new Node<K,V>(hash, key, value, null)))
21                     break;                   // no lock when adding to empty bin
22             }
23             //如果f的哈希值為MOVED,則進行數據遷移(擴容)
24             else if ((fh = f.hash) == MOVED)
25                 tab = helpTransfer(tab, f);
26             else {
27                 //這種情況下,說明f是第一個節點且不為null
28                 V oldVal = null;
29                 //獲取該桶第一個節點f的監視器鎖
30                 synchronized (f) {
31                     if (tabAt(tab, i) == f) {
32                         //第一個節點f的hash值大於0,說明是鏈表
33                         if (fh >= 0) {
34                             binCount = 1;
35                             //遍歷鏈表,
36                             for (Node<K,V> e = f;; ++binCount) {
37                                 K ek;
38                                 //如果找到同樣的key,判斷onlyIfAbsent然后進行覆蓋,跳出循環
39                                 if (e.hash == hash &&
40                                     ((ek = e.key) == key ||
41                                      (ek != null && key.equals(ek)))) {
42                                     oldVal = e.val;
43                                     if (!onlyIfAbsent)
44                                         e.val = value;
45                                     break;
46                                 }
47                                 Node<K,V> pred = e;
48                                 //如果遍歷到鏈表尾部沒有找到相同的key,則將新Node插入到鏈表的尾部
49                                 if ((e = e.next) == null) {
50                                     pred.next = new Node<K,V>(hash, key,
51                                                               value, null);
52                                     break;
53                                 }
54                             }
55                         }
56                         //如果第一個節點是紅黑樹節點
57                         else if (f instanceof TreeBin) {
58                             Node<K,V> p;
59                             binCount = 2;
60                             // 調用紅黑樹的插值方法插入新節點
61                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
62                                                            value)) != null) {
63                                 oldVal = p.val;
64                                 if (!onlyIfAbsent)
65                                     p.val = value;
66                             }
67                         }
68                     }
69                 }
70                 //binCount != 0說明上面做了鏈表操作
71                 if (binCount != 0) {
72                     //判斷是否將鏈表轉化為紅黑樹,TREEIFY_THRESHOLD為8
73                     if (binCount >= TREEIFY_THRESHOLD)
74                         //可能轉化為紅黑樹
75                         //如果當前數組的長度小於64,那么會選擇進行數組擴容,而不是轉換為紅黑樹
76                         treeifyBin(tab, i);
77                     if (oldVal != null)
78                         return oldVal;
79                     break;
80                 }
81             }
82         }
83         addCount(1L, binCount);
84         return null;
85     }
View Code

 

 

  initTable

    初始化table

 1     private final Node<K,V>[] initTable() {
 2         Node<K,V>[] tab; int sc;
 3         while ((tab = table) == null || tab.length == 0) {
 4             //如果sizeCtl小於0,說明其他線程已經初始化了
 5             if ((sc = sizeCtl) < 0)
 6                 //yield()使線程由運行狀態變為就緒狀態,把CPU讓出來,讓自己或者其它的線程運行。
 7                 Thread.yield(); // lost initialization race; just spin
 8             //通過CAS操作將sizeCtl設置為-1,返回true代表搶到鎖
 9             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
10                 try {
11                     if ((tab = table) == null || tab.length == 0) {
12                         //默認容量DEFAULT_CAPACITY為16
13                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
14                         //初始化指定容量的數組,並賦給table,table為volatile的
15                         @SuppressWarnings("unchecked")
16                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
17                         table = tab = nt;
18                         //>>>為無符號右移運算,無符號右移2位,相當於除以2
19                         //即相當於sc=0.75n
20                         sc = n - (n >>> 2);
21                     }
22                 } finally {
23                     //將sc賦值給sizeCtl
24                     sizeCtl = sc;
25                 }
26                 break;
27             }
28         }
29         return tab;
30     }
View Code

 

 

  treeifyBin

    鏈表轉紅黑樹

 1     private final void treeifyBin(Node<K,V>[] tab, int index) {
 2         Node<K,V> b; int n, sc;
 3         if (tab != null) {
 4             //如果數組長度小於MIN_TREEIFY_CAPACITY(64)的時候,進行擴容。
 5             if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
 6                 tryPresize(n << 1);
 7             //b是該桶中的第一個節點
 8             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
 9                 //獲取鎖
10                 synchronized (b) {
11                     if (tabAt(tab, index) == b) {
12                         TreeNode<K,V> hd = null, tl = null;
13                         //遍歷鏈表,創建一顆紅黑樹
14                         for (Node<K,V> e = b; e != null; e = e.next) {
15                             TreeNode<K,V> p =
16                                 new TreeNode<K,V>(e.hash, e.key, e.val,
17                                                   null, null);
18                             if ((p.prev = tl) == null)
19                                 hd = p;
20                             else
21                                 tl.next = p;
22                             tl = p;
23                         }
24                         //將紅黑樹設置到數組的相應桶中
25                         setTabAt(tab, index, new TreeBin<K,V>(hd));
26                     }
27                 }
28             }
29         }
30     }
View Code

 

 

  tryPresize

    擴容,每次都是擴容為原來的2倍,size是已經翻完倍的數值。

  1     private final void tryPresize(int size) {
  2         int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
  3             tableSizeFor(size + (size >>> 1) + 1);//取大於1.5倍的size+1的最近的2的n次方的值
  4         int sc;
  5         while ((sc = sizeCtl) >= 0) {
  6             Node<K,V>[] tab = table; int n;
  7             //如果數組為空,先初始化數組
  8             if (tab == null || (n = tab.length) == 0) {
  9                 n = (sc > c) ? sc : c;
 10                 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
 11                     try {
 12                         if (table == tab) {
 13                             @SuppressWarnings("unchecked")
 14                             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
 15                             table = nt;
 16                             sc = n - (n >>> 2);
 17                         }
 18                     } finally {
 19                         sizeCtl = sc;
 20                     }
 21                 }
 22             }
 23             else if (c <= sc || n >= MAXIMUM_CAPACITY)
 24                 break;
 25             else if (tab == table) {
 26                 int rs = resizeStamp(n);
 27                 if (sc < 0) {
 28                     Node<K,V>[] nt;
 29                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 30                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
 31                         transferIndex <= 0)
 32                         break;
 33                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
 34                         transfer(tab, nt);
 35                 }
 36                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
 37                                              (rs << RESIZE_STAMP_SHIFT) + 2))
 38                     transfer(tab, null);
 39             }
 40         }
 41     }
 42 
 43 
 44     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
 45         int n = tab.length, stride;
 46         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
 47             stride = MIN_TRANSFER_STRIDE; // subdivide range
 48         if (nextTab == null) {            // initiating
 49             try {
 50                 @SuppressWarnings("unchecked")
 51                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
 52                 nextTab = nt;
 53             } catch (Throwable ex) {      // try to cope with OOME
 54                 sizeCtl = Integer.MAX_VALUE;
 55                 return;
 56             }
 57             nextTable = nextTab;
 58             transferIndex = n;
 59         }
 60         int nextn = nextTab.length;
 61         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 62         boolean advance = true;
 63         boolean finishing = false; // to ensure sweep before committing nextTab
 64         for (int i = 0, bound = 0;;) {
 65             Node<K,V> f; int fh;
 66             while (advance) {
 67                 int nextIndex, nextBound;
 68                 if (--i >= bound || finishing)
 69                     advance = false;
 70                 else if ((nextIndex = transferIndex) <= 0) {
 71                     i = -1;
 72                     advance = false;
 73                 }
 74                 else if (U.compareAndSwapInt
 75                          (this, TRANSFERINDEX, nextIndex,
 76                           nextBound = (nextIndex > stride ?
 77                                        nextIndex - stride : 0))) {
 78                     bound = nextBound;
 79                     i = nextIndex - 1;
 80                     advance = false;
 81                 }
 82             }
 83             if (i < 0 || i >= n || i + n >= nextn) {
 84                 int sc;
 85                 if (finishing) {
 86                     nextTable = null;
 87                     table = nextTab;
 88                     sizeCtl = (n << 1) - (n >>> 1);
 89                     return;
 90                 }
 91                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 92                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 93                         return;
 94                     finishing = advance = true;
 95                     i = n; // recheck before commit
 96                 }
 97             }
 98             else if ((f = tabAt(tab, i)) == null)
 99                 advance = casTabAt(tab, i, null, fwd);
100             else if ((fh = f.hash) == MOVED)
101                 advance = true; // already processed
102             else {
103                 synchronized (f) {
104                     if (tabAt(tab, i) == f) {
105                         Node<K,V> ln, hn;
106                         if (fh >= 0) {
107                             int runBit = fh & n;
108                             Node<K,V> lastRun = f;
109                             for (Node<K,V> p = f.next; p != null; p = p.next) {
110                                 int b = p.hash & n;
111                                 if (b != runBit) {
112                                     runBit = b;
113                                     lastRun = p;
114                                 }
115                             }
116                             if (runBit == 0) {
117                                 ln = lastRun;
118                                 hn = null;
119                             }
120                             else {
121                                 hn = lastRun;
122                                 ln = null;
123                             }
124                             for (Node<K,V> p = f; p != lastRun; p = p.next) {
125                                 int ph = p.hash; K pk = p.key; V pv = p.val;
126                                 if ((ph & n) == 0)
127                                     ln = new Node<K,V>(ph, pk, pv, ln);
128                                 else
129                                     hn = new Node<K,V>(ph, pk, pv, hn);
130                             }
131                             setTabAt(nextTab, i, ln);
132                             setTabAt(nextTab, i + n, hn);
133                             setTabAt(tab, i, fwd);
134                             advance = true;
135                         }
136                         else if (f instanceof TreeBin) {
137                             TreeBin<K,V> t = (TreeBin<K,V>)f;
138                             TreeNode<K,V> lo = null, loTail = null;
139                             TreeNode<K,V> hi = null, hiTail = null;
140                             int lc = 0, hc = 0;
141                             for (Node<K,V> e = t.first; e != null; e = e.next) {
142                                 int h = e.hash;
143                                 TreeNode<K,V> p = new TreeNode<K,V>
144                                     (h, e.key, e.val, null, null);
145                                 if ((h & n) == 0) {
146                                     if ((p.prev = loTail) == null)
147                                         lo = p;
148                                     else
149                                         loTail.next = p;
150                                     loTail = p;
151                                     ++lc;
152                                 }
153                                 else {
154                                     if ((p.prev = hiTail) == null)
155                                         hi = p;
156                                     else
157                                         hiTail.next = p;
158                                     hiTail = p;
159                                     ++hc;
160                                 }
161                             }
162                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
163                                 (hc != 0) ? new TreeBin<K,V>(lo) : t;
164                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
165                                 (lc != 0) ? new TreeBin<K,V>(hi) : t;
166                             setTabAt(nextTab, i, ln);
167                             setTabAt(nextTab, i + n, hn);
168                             setTabAt(tab, i, fwd);
169                             advance = true;
170                         }
171                     }
172                 }
173             }
174         }
175     }
View Code

 

  

  addCount

    在添加完元素之后,調用addCount方法進行計數。

    addCount方法主要完成兩個功能:

      1)對table的長度計數+1,有兩種情況:一是通過修改 baseCount,二是通過使用 CounterCell。當 CounterCell 被初始化后,就優先使用他,不再使用 baseCount了;

      2)檢查是否需要擴容,或者是否正在擴容。如果需要擴容,就調用擴容方法,如果正在擴容,就幫助其擴容。

 1     //從putVal傳入的參數x是1,參數check為binCount,binCount>=0,默認要檢查是否需要擴容
 2     private final void addCount(long x, int check) {
 3         CounterCell[] as; long b, s;
 4         //如果counterCells不為null或者更新baseCount失敗
 5         if ((as = counterCells) != null ||
 6             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 7             CounterCell a; long v; int m;
 8             boolean uncontended = true;
 9             //如果counterCells的大小為0,
10             //或者隨機取其中一個元素為null,
11             //或者修改這個槽位的變量失敗,則執行fullAddCount方法
12             if (as == null || (m = as.length - 1) < 0 ||
13                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
14                 !(uncontended =
15                   U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
16                 fullAddCount(x, uncontended);
17                 return;
18             }
19             if (check <= 1)
20                 return;
21             s = sumCount();//計算map的size賦值給s
22         }
23         //判斷是否需要擴容,在putVal中調用,默認都是要檢查的
24         if (check >= 0) {
25             Node<K,V>[] tab, nt; int n, sc;
26             //如果map的size大於sizeCtl(擴容閾值),
27             //且table不是null,
28             //且table的長度小於MAXIMUM_CAPACITY,則擴容
29             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
30                    (n = tab.length) < MAXIMUM_CAPACITY) {
31                 int rs = resizeStamp(n);
32                 //sizeCtl小於0表示正在擴容
33                 if (sc < 0) {
34                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
35                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
36                         transferIndex <= 0)
37                         break;
38                     // 如果可以幫助擴容,那么將 sc 加 1. 表示多了一個線程在幫助擴容
39                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
40                         transfer(tab, nt);
41                 }
42                 //如果沒有在擴容,將 sc 更新為負數,更新成功就進行擴容
43                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
44                                              (rs << RESIZE_STAMP_SHIFT) + 2))
45                     transfer(tab, null);//進行擴容。
46                 s = sumCount();
47             }
48         }
49     }
View Code

 

 

  get

    獲取元素的大致過程如下:

      1)計算hash值,找到數組table中對應的桶;

      2)如果該桶的首節點為null,直接返回null;

      3)如果該桶的首節點的key就是要找的key,直接返回其value;

      4)如果該桶的首節點的hash值<0,說明正在擴容或者該位置是紅黑樹,通過find方法找到想要的值;

      5)如果是鏈表,遍歷鏈表查找相同的key。

 1     public V get(Object key) {
 2         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 3         int h = spread(key.hashCode());//計算hash值
 4         if ((tab = table) != null && (n = tab.length) > 0 &&
 5             (e = tabAt(tab, (n - 1) & h)) != null) {
 6             //如果該桶的第一個節點就是要找的key,直接返回value
 7             if ((eh = e.hash) == h) {
 8                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
 9                     return e.val;
10             }
11             //第一個節點的hash值<0,說明正在擴容或者該位置是紅黑樹
12             else if (eh < 0)
13                 return (p = e.find(h, key)) != null ? p.val : null;
14             //這種情況下,肯定是鏈表
15             while ((e = e.next) != null) {
16                 if (e.hash == h &&
17                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
18                     return e.val;
19             }
20         }
21         return null;
22     }
View Code

 

 

  size

    size = baseCount + CounterCell數組中元素的個數。因為在addCount方法中,使用CAS更新baseCount,有可能在並發情況下更新失敗。即節點已經被添加到數組table中,但數量沒有被統計。當更新失敗時,會調用fullAddCount方法將這些失敗的節點包裝成一個CounterCell對象,並保存在CounterCell數組中。

 1     public int size() {
 2         long n = sumCount();
 3         return ((n < 0L) ? 0 :
 4                 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
 5                 (int)n);
 6     }
 7 
 8     final long sumCount() {
 9         CounterCell[] as = counterCells; CounterCell a;
10         long sum = baseCount;
11         if (as != null) {
12             for (int i = 0; i < as.length; ++i) {
13                 if ((a = as[i]) != null)
14                     sum += a.value;
15             }
16         }
17         return sum;
18     }
View Code

 

 

總結

    Java8版本的ConcurrentHashMap相對於Java7有什么優勢:

    1)Java7中鎖的粒度為segment,每個segment中包含多個HashEntry,而Java8中鎖的粒度就是HashEntry(首節點);

    2)Java7中鎖使用的是ReentrantLock,而Java8中使用的是synchronized;

      為什么使用內置鎖synchronized來代替重入鎖ReentrantLock?

      1 在低粒度的加鎖方式中,synchronized的性能不比ReentrantLock差;Java8中ConcurrentHashMap的鎖粒度更低了,發生沖突的概率更低,JVM對synchronized進行了大量的優化(自旋鎖、偏向鎖、輕量級鎖等等),只要線程在可以在自旋過程中拿到鎖,那么就不會升級為重量級鎖,就避免了線程掛起和喚醒的上下文開銷。但使用ReentrantLock不會自旋,而是直接被掛起,當然,也可以使用tryLock(),但是這樣又出現了一個問題,你怎么知道tryLock的時間呢?在時間范圍里還好,假如超過了呢?

      所以在低粒度的加鎖方式中,synchronized是最好的選擇。Synchronized和ReentrantLock他們的開銷差距是在釋放鎖時喚醒線程的數量,Synchronized是喚醒鎖池里所有的線程+剛好來訪問的線程,而ReentrantLock則是當前線程后進來的第一個線程+剛好來訪問的線程。

      2 synchronized內置鎖使用起來更加簡便、易懂、程序可讀性好;

      3 ReentrantLock需要消耗更多的內存

    3)Java8中使用鏈表+紅黑樹的數據結構,代替Java7中的鏈表,當鏈表長度比較長時,紅黑樹的查找速度更快;

 

 

 

 

參考資料:

  《java並發編程的藝術》

   ConcurrentHashMap原理分析

   ConcurrentHashMap總結

   ConcurrentHashMap 1.8為什么要使用CAS+Synchronized取代Segment+ReentrantLock

   ConcurrentHashMap原理分析(1.7與1.8)

  Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析

  Map 綜述(三):徹頭徹尾理解 ConcurrentHashMap


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM