HashMap的put,get,size等方法都不是線程安全的,而HashTable雖然保證了線程安全,但卻是用了效率極低的方法,在put,get,size等方法上加上了synchronized,這就導致所有的並發進程都要競爭同一把鎖,一個線程在進行同步操作時,其他線程都需要等待。
為了保證集合的線程安全性,Jdk提供了同步包裝器,比如說Collections.synchronizedMap,不過它們都是利用粗粒度的同步方式,高並發情況下性能較差。
可以看看Collections.synchronizedMap的實現。
與並發安全有關的代碼都使用了synchronized關鍵字進行了修飾,使用的鎖mutex可以在構造方法中看到就是this。
雖然說不在是用synchronized來修飾方法,但是還是使用了this做mutex鎖,治標不治本。
更加普遍的選擇是利用並發包提供的線程安全容器類,它提供了各種並發容器,比如ConcurrentHashMap、CopyOnWriteArrayList。
各種線程安全隊列,如ArrayBlockingQueue、SynchronousQueue。
ConcurrentHashMap分析
ConcurrentHashMap和HashMap一樣,在Java8時結構上發生了很大變化。
之前的ConcurrentHashMap的實現是基於分離鎖
,也就是內部進行分段,每一個段中都擁有HashEntry數組,hashcode相同的key也是按照鏈表的方式存儲的。
HashEntry內部使用volatile修飾的value字段來保證可見性,也利用了不可變對象的機制以改進利用Unsafe提供的底層能力,比如volatile access,去直接完成部分操作,以最優化性能,畢竟Unsafe中的很多操作都是JVM intrinsic優化過的。
Segment的初始大小由DEFAULT_CONCURRENCY_LEVEL來確定,默認是16,在構造方法中可以自定義大小,不過必須是2的冪,如果輸入7,會自動創建8。
對於get方法來說,只需要保證可見性,無需其他同步操作。
關鍵是put方法,首先是通過二次哈希避免哈希沖突,然后以Unsafe調用方式,直接獲取相應的Segment,然后進行線程安全的put操作。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保證數據的分散性,避免哈希沖突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
put的核心實現如下。
fnal V put(K key, int hash, V value, boolean onlyIfAbsent) {
// scanAndLockForPut會去查找是否有key相同Node
// 無論如何,確保獲取鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> frs = entryAt(tab, index);
for (HashEntry<K,V> e = frs;;) {
if (e != null) {
K k;
// 更新已有value...
} else {
// 放置HashEntry到特定位置,如果超過閾值,進行rehash
// ...
}
}
} fnally {
unlock();
}
return oldValue;
}
首先也是通過 Key 的 Hash 定位到具體的 Segment,在 put 之前會進行一次擴容校驗。這里比 HashMap 要好的一點是:HashMap 是插入元素之后再看是否需要擴容,有可能擴容之后后續就沒有插入就浪費了本次擴容(擴容非常消耗性能)。而 ConcurrentHashMap 不一樣,它是在將數據插入之前檢查是否需要擴容,之后再做插入操作。
ConcurrentHashMap會獲取再入鎖,以保證數據一致性,Segment本身就是基於ReentrantLock的擴展實現,所以,在並發修改期間,相應Segment是被鎖定的。
在最初階段,進行重復性的掃描,以確定相應key值是否已經在數組里面,進而決定是更新還是放置操作,你可以在代碼里看到相應的注釋。重復掃描、檢測沖突
是ConcurrentHashMap的常見技巧。
另外一個Map的size方法同樣需要關注,它的實現涉及分離鎖的一個副作用。
試想,如果不進行同步,簡單的計算所有Segment的總值,可能會因為並發put,導致結果不准確,但是直接鎖定所有Segment進行計算,就會變得非常昂貴。
所以,ConcurrentHashMap的實現是通過重試機制(RETRIES_BEFORE_LOCK,指定重試次數2),來試圖獲得可靠值。如果沒有監控到發生變化(通過對
比Segment.modCount),就直接返回,否則獲取鎖進行操作。
每個Segment都有一個volatile修飾的全局變量count,求整個 ConcurrentHashMap的size時很明顯就是將所有的count累加即可。但是volatile修飾的變量卻不能保證多線程的原子性,所有直接累加很容易出現並發問題。
通過嘗試兩次將count累加,如果容器的count發生了變化再加鎖來統計size,可以有效的避免加鎖的問題。
以上是JDK1.7及之前的ConcurrentHashMap的結構,在1.8之后,ConcurrentHashMap結構發生了很大變化,由之前的分段鎖變為了CAS+synchronized。
我在網上找到了一幅結構圖。
總體結構上,它的內部存儲和HashMap結構非常相似,同樣是大的桶(bucket)數組,然后內部也是一個個所謂的鏈表結構(bin),同步的粒度要更細致一些。
其內部仍然有Segment定義,但僅僅是為了保證序列化時的兼容性而已,不再有任何結構上的用處。
因為不再使用Segment,初始化操作大大簡化,修改為lazy-load形式,這樣可以有效避免初始開銷,解決了老版本很多人抱怨的這一點。
將1.7中存放數據的HashEntry改為Node,但作用都是相同的。其中的val next 都用了volatile修飾,保證了可見性。
satic class Node<K,V> implements Map.Entry<K,V> {
fnal int hash;
fnal K key;
volatile V val;
volatile Node<K,V> next;
// …
}
在Node中,key被定義為了final,因為照常理來說,key是不會改變了,而是value會發生改變,因此在val中添加了volatile來修飾。
來看看它的put操作。
fnal V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 利用CAS去進行無鎖線程安全操作,如果bin是空的
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // 不加鎖,進行檢查
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
// 細粒度的同步修改操作...
}
}
// Bin超過閾值,進行樹化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
具體流程如下:
- 根據key計算出hashcode 。
- 判斷是否需要進行初始化initTable。
- f即為當前key定位出的Node,如果為空表示當前位置可以寫入數據,利用 CAS 嘗試寫入,失敗則自旋保證成功。
- 如果當前位置的 hashcode == MOVED == -1,則需要進行擴容。
- 如果都不滿足,則利用 synchronized 鎖寫入數據。
- 如果數量大於 TREEIFY_THRESHOLD 則要轉換為紅黑樹。
可以發現1.8以后的鎖的顆粒度,是加在鏈表頭上的,這是一個很重要的突破。
1.8中不依賴與segment加鎖,segment數量與桶數量一致。
首先判斷容器是否為空,為空則進行初始化利用volatile的sizeCtl作為互斥手段,如果發現競爭性的初始化,就暫停在那里,等待條件恢復,否則利用CAS設置排他標志(U.compareAndSwapInt(this, SIZECTL, sc, -1)),否則重試
對key hash計算得到該key存放的桶位置,判斷該桶是否為空,為空則利用CAS設置新節點,否則使用synchronize加鎖,遍歷桶中數據,替換或新增加點到桶中。最后判斷是否需要轉為紅黑樹,轉換之前判斷是否需要擴容。
最后來看看1.8 ConcurrentHashMap的size方法實現。
在sumCount方法中使用到了CounterCell,對於CounterCell的操作,是基於java.util.concurrent.atomic.LongAdder進行的,是一種JVM利用空間換取更高效率的方法,利用了Striped64內部的復雜邏輯。
// 一種用於分配計數的填充單元。改編自LongAdder和Striped64。請查看他們的內部文檔進行解釋。
@sun.misc.Contended
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}