引子
1、不安全:大家都知道HashMap不是線程安全的,在多線程環境下,對HashMap進行put操作會導致死循環。是因為多線程會導致Entry鏈表形成環形數據結構,這樣Entry的next節點將永遠不為空,就會產生死循環獲取Entry。具體內容見HashMap隨筆。
2、不高效:Collections.synchronizedMap(hashMap)和HashTable的線程安全原理都是對方法進行同步,所有操作競爭同一把鎖,性能比較低。
如何構造一個線程安全且高效的HashMap?ConcurrentHashMap登場。
鎖分段技術
ConcurrentHashMap將數據分為很多段(Segment),Segment繼承了ReentrantLock,每個段都是一把鎖。每個Segment都包含一個HashEntry數組,HashEntry數組存放鍵值對數據。當一個線程要訪問Entry數組時,需要獲取所在Segment鎖,保證在同一個Segment的操作是線程安全的,但其他Segment的數據的訪問不受影響,可以實現並發的訪問不同的Segment。同一個段中才存在競爭關系,不同的段之間沒有競爭關系。
ConcurrentHashMap源碼分析
源碼分析基於jdk1.7,不同版本實現有所不同。
類圖
初始化
segmentShift和segmentMask的作用是定位Segment索引。以默認值為例,concurrencyLevel為16,需要移位4次(sshift為4),segmentShift就等於28,segmentMask等於15。
concurrencyLevel是指並發級別,即Segment數組的大小。concurrencyLevel值得設定應該根據並發線程數決定。如果並發級別設置的太小,同一個Segment的元素數量過多,會引起鎖競爭的加重;如果太大,原本屬於同一個Segment的元素會被分配到不同的Segment,會引起Cpu緩存命中率下降,進而導致程序性能下降。

1 //initialCapacity:初始容量,默認16。 2 //loadFactor:負載因子,默認0.75。當元素個數大於loadFactor*最大容量時需要擴容(rehash) 3 //concurrencyLevel:並發級別,默認16。確定Segment的個數,Segment的個數為大於等於concurrencyLevel的第一個2^n。 4 public ConcurrentHashMap(int initialCapacity, 5 float loadFactor, int concurrencyLevel) { 6 //判斷參數是否合法 7 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 8 throw new IllegalArgumentException(); 9 //Segment最大個數MAX_SEGMENTS = 1 << 16,即65536; 10 if (concurrencyLevel > MAX_SEGMENTS) 11 concurrencyLevel = MAX_SEGMENTS; 12 13 // Find power-of-two sizes best matching arguments 14 int sshift = 0; 15 int ssize = 1; 16 //使用循環找到大於等於concurrencyLevel的第一個2^n。ssize就表示Segment的個數。 17 while (ssize < concurrencyLevel) { 18 ++sshift; //記錄移位的次數, 19 ssize <<= 1;//左移1位 20 } 21 this.segmentShift = 32 - sshift; //用於定位hash運算的位數,之所以用32是因為ConcurrentHashMap里的hash()方法輸出的最大數是32位的 22 this.segmentMask = ssize - 1; //hash運算的掩碼,ssize為2^n,所以segmentMask每一位都為1。目的是之后可以通過key的hash值與這個值做&運算確定Segment的索引。 23 //最大容量MAXIMUM_CAPACITY = 1 << 30; 24 if (initialCapacity > MAXIMUM_CAPACITY) 25 initialCapacity = MAXIMUM_CAPACITY; 26 //計算每個Segment所需的大小,向上取整 27 int c = initialCapacity / ssize; 28 if (c * ssize < initialCapacity) 29 ++c; 30 int cap = MIN_SEGMENT_TABLE_CAPACITY;//每個Segment最小容量MIN_SEGMENT_TABLE_CAPACITY = 2; 31 //cap表示每個Segment的容量,也是大於等於c的2^n。 32 while (cap < c) 33 cap <<= 1; 34 //創建一個Segment實例,作為Segment數組ss的第一個元素 35 // create segments and segments[0] 36 Segment<K,V> s0 = 37 new Segment<K,V>(loadFactor, (int)(cap * loadFactor), 38 (HashEntry<K,V>[])new HashEntry[cap]); 39 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 40 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] 41 this.segments = ss; 42 }
插入元素(put)
可以分為三步:
1、定位Segment:通過Hash值與segmentShift、segmentMask的計算定位到對應的Segment;
2、鎖獲取:獲取對應Segment的鎖,如果獲取鎖失敗,需要自旋重新獲取鎖;如果自旋超過最大重試次數,則阻塞。
3、插入元素:如果key已經存在,直接更新;如果key不存在,先判斷是否需要擴容,若需要則執行rehash()后插入原因,否則直接存入元素。
為了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。
Segment的擴容判斷比HashMap更恰當,因為HashMap是在插入元素后判斷元素是否已經到達容量的,如果到達了就進行擴容,但是很有可能擴容之后沒有新元素插入,這時HashMap就進行了一次無效的擴容。
與HashMap不同ConcurrentHashMap並不允許key或者value為null。

1 /**ConcurrentHashMap中方法**/ 2 public V put(K key, V value) { 3 Segment<K,V> s; 4 if (value == null) 5 throw new NullPointerException(); 6 int hash = hash(key); //計算hash值,hash值是一個32位的整數 7 //計算Segment索引 8 //在默認情況下,concurrencyLevel為16,segmentShift為28,segmentMask為15。 9 //先右移28位,hash值變為0000 0000 0000 0000 0000 0000 0000 xxxx, 10 //與segmentMask做&運算,就是取最后四位的值。這個值就是Segment的索引 11 int j = (hash >>> segmentShift) & segmentMask; 12 //通過UNSAFE的方式獲取索引j對應的Segment對象。 13 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck 14 (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment 15 //Segment采用延遲初始化機制,如果sement為null,則調用ensureSegment創建Segment 16 s = ensureSegment(j); 17 //向Segment中put元素 18 return s.put(key, hash, value, false); 19 } 20 21 /**ConcurrentHashMap$Segment中方法**/ 22 //向Segment中put元素 23 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 24 //獲取鎖。如果獲取鎖成功,插入元素,和普通的hashMap差不多。 25 //如果獲取鎖失敗,執行scanAndLockForPut進行重試。重試設計見scanAndLockForPut方法源碼。 26 HashEntry<K,V> node = tryLock() ? null : 27 scanAndLockForPut(key, hash, value); 28 V oldValue; 29 try { 30 HashEntry<K,V>[] tab = table; 31 int index = (tab.length - 1) & hash;//計算HashEntry數組索引 32 HashEntry<K,V> first = entryAt(tab, index); 33 for (HashEntry<K,V> e = first;;) { 34 if (e != null) { //該索引處已經有元素 35 K k; 36 37 //如果key相同,替換value。 38 if ((k = e.key) == key || 39 (e.hash == hash && key.equals(k))) { 40 oldValue = e.value; 41 //onlyIfAbsent=true參數表示如果key存在,則不更新value值,只有在key不存在的情況下,才更新。 42 //在putIfAbsent方法中onlyIfAbsent=true 43 //在put方法中onlyIfAbsent=false 44 if (!onlyIfAbsent) {Scans 45 e.value = value; 46 ++modCount;//修改次數 47 } 48 break; 49 } 50 e = e.next;//繼續找下一個元素 51 } 52 else { 53 if (node != null) 54 node.setNext(first); 55 else 56 node = new HashEntry<K,V>(hash, key, value, first); 57 int c = count + 1; //count為ConcurrentHashMap$Segment中的域 58 if (c > threshold && tab.length < MAXIMUM_CAPACITY) 59 //如果元素數量超過閾值且表長度小於MAXIMUM_CAPACITY,擴容 60 rehash(node); 61 else 62 setEntryAt(tab, index, node);//將node節點更新到table中 63 ++modCount; 64 count = c; 65 oldValue = null; 66 break; 67 } 68 } 69 } finally { 70 unlock(); 71 } 72 return oldValue; 73 } 74 75 /**ConcurrentHashMap$Segment中方法**/ 76 //自旋獲取鎖 77 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { 78 //entryForHash根據hash值找到當前segment中對應的HashEntry數組索引。 79 HashEntry<K,V> first = entryForHash(this, hash); 80 HashEntry<K,V> e = first; 81 HashEntry<K,V> node = null; 82 int retries = -1; // negative while locating node 83 //自旋獲取鎖。若獲取到鎖,則跳出循環;否則一直循環直到獲取到鎖或retries大於MAX_SCAN_RETRIES。 84 while (!tryLock()) { 85 HashEntry<K,V> f; // to recheck first below 86 //當retries = -1時(即第一次循環或更新操作導致的first節點發生變化),會遍歷該Segment的HashEntry數組中hash對應的鏈表,如果key對應的HashEntry不存在,則創建該節點。 87 //此處遍歷鏈表的原因:希望遍歷的鏈表被CPU cache所緩存,為后續實際put過程中的鏈表遍歷操作提升性能。怎么理解呢?放在put時再去遍歷不行嗎?因為此時當前線程沒有獲取到Segment鎖,所以不能進行put操作,但可以為put操作做一些准備工作(有可能加載到緩存),使put的操作更快,從而減少鎖競爭。這種思想在remove()方法中也有體現。 88 if (retries < 0) { 89 if (e == null) { 90 //如果key不存在創建node,然后進入下一個循環 91 if (node == null) // speculatively create node 92 node = new HashEntry<K,V>(hash, key, value, null); 93 retries = 0; 94 } 95 else if (key.equals(e.key)) 96 //如果key存在直接進入下一個循環 97 retries = 0; 98 else 99 e = e.next; //鏈表的下一個節點 100 } 101 else if (++retries > MAX_SCAN_RETRIES) { 102 //每次循環,retries加1,判斷是否大於最大重試次數MAX_SCAN_RETRIES. 103 //static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; 104 //為了防止自旋鎖大量消耗CPU的缺點。如果超過MAX_SCAN_RETRIES,使用lock方法獲取鎖。如果獲取不到鎖則當前線程阻塞並跳出循環。 105 //ReentrantLock的lock()和tryLock()方法的區別。 106 lock(); 107 break; 108 } 109 else if ((retries & 1) == 0 && 110 (f = entryForHash(this, hash)) != first) { 111 //每隔一次循環,檢查所在數組索引的鏈表頭結點有沒有變化(其他線程有更新Map的操作,如put,rehash或者remove操作)。 112 //如果改變,retries更新為-1,重新遍歷 113 e = first = f; // re-traverse if entry changed 114 retries = -1; 115 } 116 } 117 return node; 118 } 119 120 /**ConcurrentHashMap$Segment中方法**/ 121 //rehash 122 private void rehash(HashEntry<K,V> node) { 123 HashEntry<K,V>[] oldTable = table; 124 int oldCapacity = oldTable.length; 125 int newCapacity = oldCapacity << 1; //新容量為舊容量的2倍 126 threshold = (int)(newCapacity * loadFactor); //新閾值 127 HashEntry<K,V>[] newTable = 128 (HashEntry<K,V>[]) new HashEntry[newCapacity]; //新表 129 int sizeMask = newCapacity - 1; //新掩碼 130 //對舊表做遍歷 131 for (int i = 0; i < oldCapacity ; i++) { 132 HashEntry<K,V> e = oldTable[i]; 133 if (e != null) { 134 HashEntry<K,V> next = e.next; 135 int idx = e.hash & sizeMask; 136 if (next == null) // Single node on list 鏈表中只存在一個節點 137 newTable[idx] = e; 138 else { // Reuse consecutive sequence at same slot 139 //鏈表中存在多個節點. 140 /* 141 相對於HashMap的resize,ConcurrentHashMap的rehash原理類似,但是Doug Lea為rehash做了一定的優化,避免讓所有的節點都進行復制操作:由於擴容是基於2的冪指來操作,假設擴容前某HashEntry對應到Segment中數組的index為i,數組的容量為capacity,那么擴容后該HashEntry對應到新數組中的index只可能為i或者i+capacity,因此大多數HashEntry節點在擴容前后index可以保持不變。基於此,rehash方法中會定位第一個后續所有節點在擴容后index都保持不變的節點,然后將這個節點之前的所有節點重排即可 142 */ 143 HashEntry<K,V> lastRun = e; 144 int lastIdx = idx; 145 //找到第一個在擴容后index都保持不變的節點lastRun 146 for (HashEntry<K,V> last = next; 147 last != null; 148 last = last.next) { 149 int k = last.hash & sizeMask; 150 if (k != lastIdx) { 151 lastIdx = k; 152 lastRun = last; 153 } 154 } 155 newTable[lastIdx] = lastRun; 156 // Clone remaining nodes 157 //將這個節點之前的所有節點重排 158 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { 159 V v = p.value; 160 int h = p.hash; 161 int k = h & sizeMask; 162 HashEntry<K,V> n = newTable[k]; 163 newTable[k] = new HashEntry<K,V>(h, p.key, v, n); 164 } 165 } 166 } 167 } 168 int nodeIndex = node.hash & sizeMask; // add the new node 169 node.setNext(newTable[nodeIndex]); 170 newTable[nodeIndex] = node; 171 table = newTable; 172 }
Segment延遲初始化機制
Segment采用延遲初始化機制,如果sement為null,則調用ensureSegment確保創建Segment。
ensureSegment方法可能被多個線程調用,ensureSegment()是怎么保證線程安全的呢?
通過源代碼可看出ensureSegment方法並未使用鎖來控制競爭,而是使用了Unsafe對象的getObjectVolatile()提供的原子讀語義結合CAS來確保Segment創建的原子性。
ensureSegment()源代碼:

1 @SuppressWarnings("unchecked") 2 private Segment<K,V> ensureSegment(int k) { 3 final Segment<K,V>[] ss = this.segments; 4 long u = (k << SSHIFT) + SBASE; // raw offset 5 Segment<K,V> seg; 6 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { 7 //使用第一個segment作為模板來創建segment,第一個segment在Map初始化時已經被創建 8 Segment<K,V> proto = ss[0]; // use segment 0 as prototype 9 int cap = proto.table.length; 10 float lf = proto.loadFactor; 11 int threshold = (int)(cap * lf); 12 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; 13 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) 14 == null) { // recheck 15 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //根據第一個segment的參數創建新的Segment 16 //自旋CAS。如果seg!=null,說明該segment已經被其他線程創建,則方法結束;如果seg==null,說明該segment還沒有被創建,則當前線程采用CAS更新Segment數組,如果CAS成功,則結束,否則說明其他線程對Segment數組有過更新,繼續下一個循環指定該segment創建成功。 17 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) 18 == null) { 19 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) 20 break; 21 } 22 } 23 } 24 return seg; 25 }
scanAndLockForPut方法
自旋獲取鎖中,當第一次循環或更新操作導致的first節點發生變化時,會遍歷該Segment的HashEntry數組中hash對應的鏈表,如果key對應的HashEntry不存在,則創建該節點。
此處遍歷鏈表的原因:希望遍歷的鏈表被CPU cache所緩存,為后續實際put過程中的鏈表遍歷操作提升性能。怎么理解呢?put還是要再去遍歷一次(即使鏈表在緩存中)?因為此時當前線程沒有獲取到Segment鎖,所以不能進行put操作,但可以為put操作做一些准備工作(有可能加載到緩存,在緩存中執行遍歷更快),使put的操作更快,從而減少鎖競爭。這種思想在remove()方法中也有體現。
獲取元素(get)
get操作不需要加鎖,當拿到的值為空時才會加鎖重讀。讀操作不用加鎖的原因是它的get方法里將要使用的共享變量都定義成volatile類型,如volatile V value。定義成volatile的變量,能夠在線程之間保持可見性,能夠被多線程同時讀,並且保證不會讀到過期的值,但是只能被單線程寫(有一種情況可以被多線程寫,就是寫入的值不依賴於原值)。get方法使用UNSAFE提供的原子讀語義來獲的Segmnet和對應的鏈表。
containsKey方法和get相似,都不用加鎖。

1 public V get(Object key) { 2 Segment<K,V> s; // manually integrate access methods to reduce overhead 3 HashEntry<K,V>[] tab; 4 int h = hash(key); 5 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 6 //通過Hash值找到相應的Segment 7 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && 8 (tab = s.table) != null) { 9 //找到HashEntry鏈表的索引,遍歷鏈表找到對應的key 10 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile 11 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); 12 e != null; e = e.next) { 13 K k; 14 if ((k = e.key) == key || (e.hash == h && key.equals(k))) 15 return e.value; 16 } 17 } 18 return null; 19 }
統計大小(size)
統計Map的大小需要統計所有Segment的大小然后求和。
問題:累加的過程中Segment的大小可能會發生變化,導致統計的結果不准確。
解決方案:1)簡單的方法就是對所有的Segment加鎖,但方法低效。
2)考慮到累加的過程中Segment的大小變化的可能性很小,作者給出了更高效的方案,首先嘗試幾次在不對Segment加鎖的情況下統計各個Segment的大小,如果累加期間Map的大小發生了變化,再使用加鎖的方式統計各個Segment的大小。判斷Map的大小是否發生了變化,需要通過Segment的modCount變量實現。modCount表示對Segment的修改次數。相同的思想也用在了containsValue操作。
注意事項:使用加鎖方式進行統計大小時,對每一個Segment加鎖,需要強制創建所有的Segment,這么做的目的是防止其他線程創建Segment並進行更新操作。所以應盡量避免在多線程環境下使用size和containsValue方法。

1 public int size() { 2 // Try a few times to get accurate count. On failure due to 3 // continuous async changes in table, resort to locking. 4 final Segment<K,V>[] segments = this.segments; 5 int size; 6 boolean overflow; // true if size overflows 32 bits 7 long sum; // sum of modCounts 8 long last = 0L; // previous sum 9 int retries = -1; // first iteration isn't retry 10 try { 11 for (;;) { 12 //static final int RETRIES_BEFORE_LOCK = 2; 13 //判斷是否到達無鎖統計map大小的最大次數,若達到最大次數需要鎖所有Segment 14 if (retries++ == RETRIES_BEFORE_LOCK) { 15 //對每一個Segment加鎖,此時需要強制創建所有的Segment,這么做的目的是防止其他線程創建Segment並進行更新操作。 16 //所以應避免在多線程環境下使用size和containsValue方法。 17 for (int j = 0; j < segments.length; ++j) 18 ensureSegment(j).lock(); // force creation 19 } 20 sum = 0L; 21 size = 0; 22 overflow = false; 23 for (int j = 0; j < segments.length; ++j) { 24 Segment<K,V> seg = segmentAt(segments, j); 25 if (seg != null) { 26 sum += seg.modCount; 27 int c = seg.count; 28 if (c < 0 || (size += c) < 0) 29 overflow = true; 30 } 31 } 32 //判斷前后兩次統計的modCount之和是否相等,若相等則說明沒有被修改郭 33 //由於last初始值為0,如果該Map從創建到現在都沒有被修改過,即所有Segment的modCount都為0,則只執行一次循環;否則至少執行兩次循環,比較兩次統計的sum有沒有發生變化。又因為retries初始值-1,所以可以說重試無鎖統計大小的次數為3次。 34 if (sum == last) 35 break; 36 last = sum; 37 } 38 } finally { 39 //重試次數大於最大次數,需要釋放鎖 40 if (retries > RETRIES_BEFORE_LOCK) { 41 for (int j = 0; j < segments.length; ++j) 42 segmentAt(segments, j).unlock(); 43 } 44 } 45 return overflow ? Integer.MAX_VALUE : size; 46 }
Java8的ConcurrentHashMap
相對於Java7中的實現,主要有以下兩點改進:
1)取消segment分段,直接使用數組transient volatile Node<K,V>[] table存儲數據,將table數組元素作為鎖,實現對數組中每一個桶進行加鎖,進一步減少並發沖突的概率。
2)類似於Java8中的HashMap,將數組+鏈表的結構變更為數組+鏈表+紅黑樹的結構。當鏈表的長度大於8時,將鏈表轉換為紅黑樹,原因見HashMap。
通過 Node + CAS + Synchronized 來保證線程安全。
Fields

1 transient volatile Node<K,V>[] table;//存放元素的數組,懶加載,大小是2的n次方 2 private transient volatile Node<K,V>[] nextTable;//擴容時用到 3 //基本計數器,通過CAS更新 4 private transient volatile long baseCount; 5 /*控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義 6 *當為負數時:-1代表正在初始化,-N代表有N-1個線程正在進行擴容 7 *當為0時:代表當時的table還沒有被初始化 8 *當為正數時:表示初始化或者下一次進行擴容的大小 9 */ 10 private transient volatile int sizeCtl; 11 12 /** 13 * The next table index (plus one) to split while resizing. 14 */ 15 private transient volatile int transferIndex; 16 17 /** 18 * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. 19 */ 20 private transient volatile int cellsBusy; 21 22 /** 23 * Table of counter cells. When non-null, size is a power of 2. 24 */ 25 private transient volatile CounterCell[] counterCells; 26 27 // views 28 private transient KeySetView<K,V> keySet; 29 private transient ValuesView<K,V> values; 30 private transient EntrySetView<K,V> entrySet;
Node
hash值和key都是final的,不可更改;val和next都是volatile的保證可見性和禁止指令重排序。

1 static class Node<K,V> implements Map.Entry<K,V> { 2 //hash值和key都是final的,不可更改;val和next都是volatile的保證可見性和禁止指令重排序。 3 final int hash; 4 final K key; 5 volatile V val; 6 volatile Node<K,V> next; 7 8 Node(int hash, K key, V val, Node<K,V> next) { 9 this.hash = hash; 10 this.key = key; 11 this.val = val; 12 this.next = next; 13 } 14 15 public final K getKey() { return key; } 16 public final V getValue() { return val; } 17 public final int hashCode() { return key.hashCode() ^ val.hashCode(); } 18 public final String toString(){ return key + "=" + val; } 19 //不允許更改值 20 public final V setValue(V value) { 21 throw new UnsupportedOperationException(); 22 } 23 public final boolean equals(Object o) { 24 Object k, v, u; Map.Entry<?,?> e; 25 return ((o instanceof Map.Entry) && 26 (k = (e = (Map.Entry<?,?>)o).getKey()) != null && 27 (v = e.getValue()) != null && 28 (k == key || k.equals(key)) && 29 (v == (u = val) || v.equals(u))); 30 } 31 //用於map中的get()方法,子類重寫 32 Node<K,V> find(int h, Object k) { 33 Node<K,V> e = this; 34 if (k != null) { 35 do { 36 K ek; 37 if (e.hash == h && 38 ((ek = e.key) == k || (ek != null && k.equals(ek)))) 39 return e; 40 } while ((e = e.next) != null); 41 } 42 return null; 43 } 44 }
put
添加元素的大致過程如下:
1)如果table沒有初始化,先通過initTable()方法進行初始化;
2)計算hash值,找到對應的桶,如果該桶的首節點f為null(即不存在hash沖突),使用CAS直接將新Node放入該桶;
3)如果首節點f的hash值為MOVED,說明正在擴容,先進行擴容;
4)如果存在hash沖突,則通過加鎖(獲取首節點f的監視器鎖)來保證線程安全,分兩種情況:鏈表和紅黑樹;如果是鏈表,則遍歷鏈表,存在相同的key就進行覆蓋,否則插入到鏈表的尾部;如果是紅黑樹,則向紅黑樹中插入新節點;
5)判斷是否需要將鏈表轉化為紅黑樹,如果需要,調用treeifyBin方法;
6)如果添加成功,就調用addCount方法統計size,並檢查是否需要擴容。

1 public V put(K key, V value) { 2 return putVal(key, value, false); 3 } 4 5 final V putVal(K key, V value, boolean onlyIfAbsent) { 6 if (key == null || value == null) throw new NullPointerException(); 7 int hash = spread(key.hashCode());//計算hash值 8 //用於記錄相應鏈表的長度 9 int binCount = 0; 10 for (Node<K,V>[] tab = table;;) { 11 Node<K,V> f; int n, i, fh; 12 if (tab == null || (n = tab.length) == 0) 13 //如果數組為空,則進行初始化 14 tab = initTable(); 15 //找到該hash值對應的下標i,得到第一個節點f 16 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 17 //如果第一個節點f為null,使用CAS直接將新Node放入該桶 18 //如果CAS成功,跳出循環結束;如果失敗,進入下一個循環 19 if (casTabAt(tab, i, null, 20 new Node<K,V>(hash, key, value, null))) 21 break; // no lock when adding to empty bin 22 } 23 //如果f的哈希值為MOVED,則進行數據遷移(擴容) 24 else if ((fh = f.hash) == MOVED) 25 tab = helpTransfer(tab, f); 26 else { 27 //這種情況下,說明f是第一個節點且不為null 28 V oldVal = null; 29 //獲取該桶第一個節點f的監視器鎖 30 synchronized (f) { 31 if (tabAt(tab, i) == f) { 32 //第一個節點f的hash值大於0,說明是鏈表 33 if (fh >= 0) { 34 binCount = 1; 35 //遍歷鏈表, 36 for (Node<K,V> e = f;; ++binCount) { 37 K ek; 38 //如果找到同樣的key,判斷onlyIfAbsent然后進行覆蓋,跳出循環 39 if (e.hash == hash && 40 ((ek = e.key) == key || 41 (ek != null && key.equals(ek)))) { 42 oldVal = e.val; 43 if (!onlyIfAbsent) 44 e.val = value; 45 break; 46 } 47 Node<K,V> pred = e; 48 //如果遍歷到鏈表尾部沒有找到相同的key,則將新Node插入到鏈表的尾部 49 if ((e = e.next) == null) { 50 pred.next = new Node<K,V>(hash, key, 51 value, null); 52 break; 53 } 54 } 55 } 56 //如果第一個節點是紅黑樹節點 57 else if (f instanceof TreeBin) { 58 Node<K,V> p; 59 binCount = 2; 60 // 調用紅黑樹的插值方法插入新節點 61 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 62 value)) != null) { 63 oldVal = p.val; 64 if (!onlyIfAbsent) 65 p.val = value; 66 } 67 } 68 } 69 } 70 //binCount != 0說明上面做了鏈表操作 71 if (binCount != 0) { 72 //判斷是否將鏈表轉化為紅黑樹,TREEIFY_THRESHOLD為8 73 if (binCount >= TREEIFY_THRESHOLD) 74 //可能轉化為紅黑樹 75 //如果當前數組的長度小於64,那么會選擇進行數組擴容,而不是轉換為紅黑樹 76 treeifyBin(tab, i); 77 if (oldVal != null) 78 return oldVal; 79 break; 80 } 81 } 82 } 83 addCount(1L, binCount); 84 return null; 85 }
initTable
初始化table

1 private final Node<K,V>[] initTable() { 2 Node<K,V>[] tab; int sc; 3 while ((tab = table) == null || tab.length == 0) { 4 //如果sizeCtl小於0,說明其他線程已經初始化了 5 if ((sc = sizeCtl) < 0) 6 //yield()使線程由運行狀態變為就緒狀態,把CPU讓出來,讓自己或者其它的線程運行。 7 Thread.yield(); // lost initialization race; just spin 8 //通過CAS操作將sizeCtl設置為-1,返回true代表搶到鎖 9 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 10 try { 11 if ((tab = table) == null || tab.length == 0) { 12 //默認容量DEFAULT_CAPACITY為16 13 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 14 //初始化指定容量的數組,並賦給table,table為volatile的 15 @SuppressWarnings("unchecked") 16 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 17 table = tab = nt; 18 //>>>為無符號右移運算,無符號右移2位,相當於除以2 19 //即相當於sc=0.75n 20 sc = n - (n >>> 2); 21 } 22 } finally { 23 //將sc賦值給sizeCtl 24 sizeCtl = sc; 25 } 26 break; 27 } 28 } 29 return tab; 30 }
treeifyBin
鏈表轉紅黑樹

1 private final void treeifyBin(Node<K,V>[] tab, int index) { 2 Node<K,V> b; int n, sc; 3 if (tab != null) { 4 //如果數組長度小於MIN_TREEIFY_CAPACITY(64)的時候,進行擴容。 5 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) 6 tryPresize(n << 1); 7 //b是該桶中的第一個節點 8 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { 9 //獲取鎖 10 synchronized (b) { 11 if (tabAt(tab, index) == b) { 12 TreeNode<K,V> hd = null, tl = null; 13 //遍歷鏈表,創建一顆紅黑樹 14 for (Node<K,V> e = b; e != null; e = e.next) { 15 TreeNode<K,V> p = 16 new TreeNode<K,V>(e.hash, e.key, e.val, 17 null, null); 18 if ((p.prev = tl) == null) 19 hd = p; 20 else 21 tl.next = p; 22 tl = p; 23 } 24 //將紅黑樹設置到數組的相應桶中 25 setTabAt(tab, index, new TreeBin<K,V>(hd)); 26 } 27 } 28 } 29 } 30 }
tryPresize
擴容,每次都是擴容為原來的2倍,size是已經翻完倍的數值。

1 private final void tryPresize(int size) { 2 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : 3 tableSizeFor(size + (size >>> 1) + 1);//取大於1.5倍的size+1的最近的2的n次方的值 4 int sc; 5 while ((sc = sizeCtl) >= 0) { 6 Node<K,V>[] tab = table; int n; 7 //如果數組為空,先初始化數組 8 if (tab == null || (n = tab.length) == 0) { 9 n = (sc > c) ? sc : c; 10 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 11 try { 12 if (table == tab) { 13 @SuppressWarnings("unchecked") 14 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 15 table = nt; 16 sc = n - (n >>> 2); 17 } 18 } finally { 19 sizeCtl = sc; 20 } 21 } 22 } 23 else if (c <= sc || n >= MAXIMUM_CAPACITY) 24 break; 25 else if (tab == table) { 26 int rs = resizeStamp(n); 27 if (sc < 0) { 28 Node<K,V>[] nt; 29 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 30 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 31 transferIndex <= 0) 32 break; 33 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 34 transfer(tab, nt); 35 } 36 else if (U.compareAndSwapInt(this, SIZECTL, sc, 37 (rs << RESIZE_STAMP_SHIFT) + 2)) 38 transfer(tab, null); 39 } 40 } 41 } 42 43 44 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 45 int n = tab.length, stride; 46 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 47 stride = MIN_TRANSFER_STRIDE; // subdivide range 48 if (nextTab == null) { // initiating 49 try { 50 @SuppressWarnings("unchecked") 51 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 52 nextTab = nt; 53 } catch (Throwable ex) { // try to cope with OOME 54 sizeCtl = Integer.MAX_VALUE; 55 return; 56 } 57 nextTable = nextTab; 58 transferIndex = n; 59 } 60 int nextn = nextTab.length; 61 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 62 boolean advance = true; 63 boolean finishing = false; // to ensure sweep before committing nextTab 64 for (int i = 0, bound = 0;;) { 65 Node<K,V> f; int fh; 66 while (advance) { 67 int nextIndex, nextBound; 68 if (--i >= bound || finishing) 69 advance = false; 70 else if ((nextIndex = transferIndex) <= 0) { 71 i = -1; 72 advance = false; 73 } 74 else if (U.compareAndSwapInt 75 (this, TRANSFERINDEX, nextIndex, 76 nextBound = (nextIndex > stride ? 77 nextIndex - stride : 0))) { 78 bound = nextBound; 79 i = nextIndex - 1; 80 advance = false; 81 } 82 } 83 if (i < 0 || i >= n || i + n >= nextn) { 84 int sc; 85 if (finishing) { 86 nextTable = null; 87 table = nextTab; 88 sizeCtl = (n << 1) - (n >>> 1); 89 return; 90 } 91 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 92 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 93 return; 94 finishing = advance = true; 95 i = n; // recheck before commit 96 } 97 } 98 else if ((f = tabAt(tab, i)) == null) 99 advance = casTabAt(tab, i, null, fwd); 100 else if ((fh = f.hash) == MOVED) 101 advance = true; // already processed 102 else { 103 synchronized (f) { 104 if (tabAt(tab, i) == f) { 105 Node<K,V> ln, hn; 106 if (fh >= 0) { 107 int runBit = fh & n; 108 Node<K,V> lastRun = f; 109 for (Node<K,V> p = f.next; p != null; p = p.next) { 110 int b = p.hash & n; 111 if (b != runBit) { 112 runBit = b; 113 lastRun = p; 114 } 115 } 116 if (runBit == 0) { 117 ln = lastRun; 118 hn = null; 119 } 120 else { 121 hn = lastRun; 122 ln = null; 123 } 124 for (Node<K,V> p = f; p != lastRun; p = p.next) { 125 int ph = p.hash; K pk = p.key; V pv = p.val; 126 if ((ph & n) == 0) 127 ln = new Node<K,V>(ph, pk, pv, ln); 128 else 129 hn = new Node<K,V>(ph, pk, pv, hn); 130 } 131 setTabAt(nextTab, i, ln); 132 setTabAt(nextTab, i + n, hn); 133 setTabAt(tab, i, fwd); 134 advance = true; 135 } 136 else if (f instanceof TreeBin) { 137 TreeBin<K,V> t = (TreeBin<K,V>)f; 138 TreeNode<K,V> lo = null, loTail = null; 139 TreeNode<K,V> hi = null, hiTail = null; 140 int lc = 0, hc = 0; 141 for (Node<K,V> e = t.first; e != null; e = e.next) { 142 int h = e.hash; 143 TreeNode<K,V> p = new TreeNode<K,V> 144 (h, e.key, e.val, null, null); 145 if ((h & n) == 0) { 146 if ((p.prev = loTail) == null) 147 lo = p; 148 else 149 loTail.next = p; 150 loTail = p; 151 ++lc; 152 } 153 else { 154 if ((p.prev = hiTail) == null) 155 hi = p; 156 else 157 hiTail.next = p; 158 hiTail = p; 159 ++hc; 160 } 161 } 162 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 163 (hc != 0) ? new TreeBin<K,V>(lo) : t; 164 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 165 (lc != 0) ? new TreeBin<K,V>(hi) : t; 166 setTabAt(nextTab, i, ln); 167 setTabAt(nextTab, i + n, hn); 168 setTabAt(tab, i, fwd); 169 advance = true; 170 } 171 } 172 } 173 } 174 } 175 }
addCount
在添加完元素之后,調用addCount方法進行計數。
addCount方法主要完成兩個功能:
1)對table的長度計數+1,有兩種情況:一是通過修改 baseCount,二是通過使用 CounterCell。當 CounterCell 被初始化后,就優先使用他,不再使用 baseCount了;
2)檢查是否需要擴容,或者是否正在擴容。如果需要擴容,就調用擴容方法,如果正在擴容,就幫助其擴容。

1 //從putVal傳入的參數x是1,參數check為binCount,binCount>=0,默認要檢查是否需要擴容 2 private final void addCount(long x, int check) { 3 CounterCell[] as; long b, s; 4 //如果counterCells不為null或者更新baseCount失敗 5 if ((as = counterCells) != null || 6 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 7 CounterCell a; long v; int m; 8 boolean uncontended = true; 9 //如果counterCells的大小為0, 10 //或者隨機取其中一個元素為null, 11 //或者修改這個槽位的變量失敗,則執行fullAddCount方法 12 if (as == null || (m = as.length - 1) < 0 || 13 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 14 !(uncontended = 15 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 16 fullAddCount(x, uncontended); 17 return; 18 } 19 if (check <= 1) 20 return; 21 s = sumCount();//計算map的size賦值給s 22 } 23 //判斷是否需要擴容,在putVal中調用,默認都是要檢查的 24 if (check >= 0) { 25 Node<K,V>[] tab, nt; int n, sc; 26 //如果map的size大於sizeCtl(擴容閾值), 27 //且table不是null, 28 //且table的長度小於MAXIMUM_CAPACITY,則擴容 29 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 30 (n = tab.length) < MAXIMUM_CAPACITY) { 31 int rs = resizeStamp(n); 32 //sizeCtl小於0表示正在擴容 33 if (sc < 0) { 34 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 35 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 36 transferIndex <= 0) 37 break; 38 // 如果可以幫助擴容,那么將 sc 加 1. 表示多了一個線程在幫助擴容 39 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 40 transfer(tab, nt); 41 } 42 //如果沒有在擴容,將 sc 更新為負數,更新成功就進行擴容 43 else if (U.compareAndSwapInt(this, SIZECTL, sc, 44 (rs << RESIZE_STAMP_SHIFT) + 2)) 45 transfer(tab, null);//進行擴容。 46 s = sumCount(); 47 } 48 } 49 }
get
獲取元素的大致過程如下:
1)計算hash值,找到數組table中對應的桶;
2)如果該桶的首節點為null,直接返回null;
3)如果該桶的首節點的key就是要找的key,直接返回其value;
4)如果該桶的首節點的hash值<0,說明正在擴容或者該位置是紅黑樹,通過find方法找到想要的值;
5)如果是鏈表,遍歷鏈表查找相同的key。

1 public V get(Object key) { 2 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 3 int h = spread(key.hashCode());//計算hash值 4 if ((tab = table) != null && (n = tab.length) > 0 && 5 (e = tabAt(tab, (n - 1) & h)) != null) { 6 //如果該桶的第一個節點就是要找的key,直接返回value 7 if ((eh = e.hash) == h) { 8 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 9 return e.val; 10 } 11 //第一個節點的hash值<0,說明正在擴容或者該位置是紅黑樹 12 else if (eh < 0) 13 return (p = e.find(h, key)) != null ? p.val : null; 14 //這種情況下,肯定是鏈表 15 while ((e = e.next) != null) { 16 if (e.hash == h && 17 ((ek = e.key) == key || (ek != null && key.equals(ek)))) 18 return e.val; 19 } 20 } 21 return null; 22 }
size
size = baseCount + CounterCell數組中元素的個數。因為在addCount方法中,使用CAS更新baseCount,有可能在並發情況下更新失敗。即節點已經被添加到數組table中,但數量沒有被統計。當更新失敗時,會調用fullAddCount方法將這些失敗的節點包裝成一個CounterCell對象,並保存在CounterCell數組中。

1 public int size() { 2 long n = sumCount(); 3 return ((n < 0L) ? 0 : 4 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 5 (int)n); 6 } 7 8 final long sumCount() { 9 CounterCell[] as = counterCells; CounterCell a; 10 long sum = baseCount; 11 if (as != null) { 12 for (int i = 0; i < as.length; ++i) { 13 if ((a = as[i]) != null) 14 sum += a.value; 15 } 16 } 17 return sum; 18 }
總結
Java8版本的ConcurrentHashMap相對於Java7有什么優勢:
1)Java7中鎖的粒度為segment,每個segment中包含多個HashEntry,而Java8中鎖的粒度就是HashEntry(首節點);
2)Java7中鎖使用的是ReentrantLock,而Java8中使用的是synchronized;
為什么使用內置鎖synchronized來代替重入鎖ReentrantLock?
1 在低粒度的加鎖方式中,synchronized的性能不比ReentrantLock差;Java8中ConcurrentHashMap的鎖粒度更低了,發生沖突的概率更低,JVM對synchronized進行了大量的優化(自旋鎖、偏向鎖、輕量級鎖等等),只要線程在可以在自旋過程中拿到鎖,那么就不會升級為重量級鎖,就避免了線程掛起和喚醒的上下文開銷。但使用ReentrantLock不會自旋,而是直接被掛起,當然,也可以使用tryLock(),但是這樣又出現了一個問題,你怎么知道tryLock的時間呢?在時間范圍里還好,假如超過了呢?
所以在低粒度的加鎖方式中,synchronized是最好的選擇。Synchronized和ReentrantLock他們的開銷差距是在釋放鎖時喚醒線程的數量,Synchronized是喚醒鎖池里所有的線程+剛好來訪問的線程,而ReentrantLock則是當前線程后進來的第一個線程+剛好來訪問的線程。
2 synchronized內置鎖使用起來更加簡便、易懂、程序可讀性好;
3 ReentrantLock需要消耗更多的內存
3)Java8中使用鏈表+紅黑樹的數據結構,代替Java7中的鏈表,當鏈表長度比較長時,紅黑樹的查找速度更快;
參考資料:
《java並發編程的藝術》
ConcurrentHashMap 1.8為什么要使用CAS+Synchronized取代Segment+ReentrantLock
ConcurrentHashMap原理分析(1.7與1.8)