ConcurrentHashMap1.7和1.8對比
數據結構
1.7中采用Segment
+HashEntry
的方式實現
ConcurrentHashMap
初始化時,計算出Segment
數組的大小ssize
和每個Segment
中HashEntry
數組的大小cap
,並初始化Segment
數組的第一個元素;
其中ssize
大小為2的冪次方,默認為16,cap
大小也是2的冪次方,最小值為2,最終結果根據初始化容量initialCapacity
進行計算,計算過程如下
if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;
因為Segment
繼承了ReentrantLock,所有segment是線程安全的
1.8中放棄了Segment
分段鎖的設計,使用的是Node
+CAS
+Synchronized
來保證線程安全性
只有在第一次執行put
方法是才會初始化Node
數組
put操作
1.7 put
當執行put
方法插入數據的時候,根據key的hash值,在Segment
數組中找到對應的位置
如果當前位置沒有值,則通過CAS進行賦值,接着執行Segment
的put
方法通過加鎖機制插入數據
假如有線程AB同時執行相同Segment
的put
方法
線程A 執行
tryLock
方法成功獲取鎖,然后把HashEntry
對象插入到相應位置線程B 嘗試獲取鎖失敗,則執行
scanAndLockForPut()
方法,通過重復執行tryLock()
方法嘗試獲取鎖在多處理器環境重復64次,單處理器環境重復1次,當執行
tryLock()
方法的次數超過上限時,則執行lock()
方法掛起線程B當線程A執行完插入操作時,會通過
unlock
方法施放鎖,接着喚醒線程B繼續執行
1.8 put
當執行put
方法插入數據的時候,根據key的hash值在Node
數組中找到相應的位置
如果當前位置的Node
還沒有初始化,則通過CAS插入數據
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果當前位置的`Node`還沒有初始化,則通過CAS插入數據 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
如果當前位置的Node已經有值,則對該節點加synchronized
鎖,然后從該節點開始遍歷,直到插入新的節點或者更新新的節點
if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
如果當前節點是TreeBin
類型,說明該節點下的鏈表已經進化成紅黑樹結構,則通過putTreeVal
方法向紅黑樹中插入新的節點
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
如果binCount
不為0,說明put
操作對數據產生了影響,如果當前鏈表的節點個數達到了8個,則通過treeifyBin
方法將鏈表轉化為紅黑樹
size 操作
1.7 size實現
統計每個segment
對象中的元素個數,然后進行累加
但是這種方式計算出來的結果不一定准確
因為在計算后面的segment
的元素個數時
前面計算過了的segment
可能有數據的新增或刪除
計算方式為:
先采用不加鎖的方式,連續計算兩次
如果兩次結果相等,說明計算結果准確
如果兩次結果不相等,說明計算過程中出現了並發新增或者刪除操作
於是給每個
segment
加鎖,然后再次計算
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } }
1.8 size實現
使用一個volatile
類型的變量baseCount
記錄元素的個數
當新增或者刪除節點的時候會調用,addCount()
更新baseCount
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); }
初始化時counterCells
為空
在並發量很高時,如果存在兩個線程同時執行CAS
修改baseCount
值
則失敗的線程會繼續執行方法體中的邏輯
使用CounterCell
記錄元素個數的變化
如果CounterCell
數組counterCells
為空
調用fullAddCount()
方法進行初始化
並插入對應的記錄數,通過CAS
設置cellsBusy字段
只有設置成功的線程才能初始化CounterCell
數組
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
如果通過CAS
設置cellsBusy字段失敗的話
則繼續嘗試通過CAS
修改baseCount
字段
如果修改baseCount
字段成功的話,就退出循環
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break;
否則繼續循環插入CounterCell
對象;
所以在1.8中的size
實現比1.7簡單多,因為元素個數保存baseCount
中,部分元素的變化個數保存在CounterCell
數組中,實現如下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
通過累加baseCount
和CounterCell
數組中的數量,即可得到元素的總個數;