本文整理自漫畫:什么是ConcurrentHashMap? - 小灰的文章 - 知乎 。已獲得作者授權。
HashMap 在高並發下會出現鏈表環,從而導致程序出現死循環。高並發下避免HashMap 出問題的方法有兩種。一是使用HashTable,二是使用Collections.syncronizedMap
但是這兩種方法的性能都能差。因為這兩個在執行讀寫操作時都是將整個集合加鎖,導致多個線程無法同時讀寫集合。高並發下的HashMap出現的問題就需要ConcurrentHashMap 來解決了。
什么是ConcurrentHashMap?
ConcurrentHashMap 中有一個Segment 的概念。
Segment是什么呢?Segment本身就相當於一個HashMap對象。
同HashMap一樣,Segment包含一個HashEntry數組,數組中的每一個HashEntry既是一個鍵值對,也是一個鏈表的頭節點。
單一的Segment結構如下:
像這樣的Segment對象,在ConcurrentHashMap集合中有多少個呢?有2的N次方個,共同保存在一個名為segments的數組當中。
因此整個ConcurrentHashMap的結構如下:
可以說,ConcurrentHashMap 是一個二級哈希表。在一個總的哈希表下面,有若干個子哈希表。
這樣的二級結構,和數據庫的水平拆分有些相似。
ConcurrentHashMap 的優勢
ConcurrentHashMap 的優勢就是采取了鎖分段技術,每一個Segment 的讀寫操作高度自治,Segment 之間互不影響。
Case1:不同Segment 的並發寫入
不同Segment的寫入是可以並發執行的。
Case2:同一Segment的一寫一讀
同一Segment的寫和讀是可以並發執行的。
Case3:同一Segment的並發寫入
Segment的寫入是需要上鎖的,因此對同一Segment的並發寫入會被阻塞。
由此可見,ConcurrentHashMap當中每個Segment各自持有一把鎖。在保證線程安全的同時降低了鎖的粒度,讓並發操作效率更高。
Concurrent 的讀寫過程
Get方法:
1.為輸入的Key做Hash運算,得到hash值。
2.通過hash值,定位到對應的Segment對象
3.再次通過hash值,定位到Segment當中數組的具體位置。
Put方法:
1.為輸入的Key做Hash運算,得到hash值。
2.通過hash值,定位到對應的Segment對象
3.獲取可重入鎖
4.再次通過hash值,定位到Segment當中數組的具體位置。
5.插入或覆蓋HashEntry對象。
6.釋放鎖。
讀寫操作均需要二次定位到Segment
ConcurrentHashMap 在調用Size 方法時如何解決一致性問題?
Size方法的目的是統計ConcurrentHashMap的總元素數量, 自然需要把各個Segment內部的元素數量匯總起來。
但是,如果在統計Segment元素數量的過程中,已統計過的Segment瞬間插入新的元素,這時候該怎么辦呢?
ConcurrentHashMap的Size方法是一個嵌套循環,大體邏輯如下:
1.遍歷所有的Segment。
2.把Segment的元素數量累加起來。
3.把Segment的修改次數累加起來。
4.判斷所有Segment的總修改次數是否大於上一次的總修改次數。如果大於,說明統計過程中有修改,重新統計,嘗試次數+1;如果不是。說明沒有修改,統計結束。
5.如果嘗試次數超過閾值,則對每一個Segment加鎖,再重新統計。
6.再次判斷所有Segment的總修改次數是否大於上一次的總修改次數。由於已經加鎖,次數一定和上次相等。
7.釋放鎖,統計結束。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
為什么這樣設計呢?這種思想和樂觀鎖悲觀鎖的思想如出一轍。
為了盡量不鎖住所有Segment,首先樂觀地假設Size過程中不會有修改。當嘗試一定次數,才無奈轉為悲觀鎖,鎖住所有Segment保證強一致性。
想看原作者的文章可以看看這個公眾號。