1. 概述
接上一篇 學習 ConcurrentHashMap1.8 並發寫機制, 本文主要學習 Segment分段鎖
的實現原理。
雖然 JDK1.7
在生產環境已逐漸被 JDK1.8
替代,然而一些好的思想還是需要進行學習的。比方說位圖中尋找 bit
位的思路是不是和 ConcurrentHashMap1.7
有點相似?
接下來,本文基於 OpenJDK7
來做源碼解析。
2. ConcurrentHashMap1.7 初認識
ConcurrentHashMap 中 put()是線程安全的。但是很多時候, 由於業務需求, 需要先 get()
操作再 put()
操作,這 2 個操作無法保證原子性,這樣就會產生線程安全問題了。大家在開發中一定要注意。
ConcurrentHashMap 的結構示意圖如下:
在進行數據的定位時,會首先找到 segment
, 然后在 segment
中定位 bucket
。如果多線程操作同一個 segment
, 就會觸發 segment
的鎖 ReentrantLock
, 這就是分段鎖的基本實現原理。
3. 源碼分析
3.1 HashEntry
HashEntry
是 ConcurrentHashMap
的基礎單元(節點),是實際數據的載體。
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* Sets next field with volatile write semantics. (See above
* about use of putOrderedObject.)
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
3.2 Segment
Segment
繼承 ReentrantLock
鎖,用於存放數組 HashEntry[]
。在這里可以看出, 無論 1.7 還是 1.8 版本, ConcurrentHashMap
底層並不是對 HashMap
的擴展, 而是同樣從底層基於數組+鏈表進行功能實現。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
// 數據節點存儲在這里(基礎單元是數組)
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
// 具體方法不在這里討論...
}
3.3 構造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 對於concurrencyLevel的理解, 可以理解為segments數組的長度,即理論上多線程並發數(分段鎖), 默認16
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// 默認concurrencyLevel = 16, 所以ssize在默認情況下也是16,此時 sshift = 4
// ssize = 2^sshift 即 ssize = 1 << sshift
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 段偏移量,32是因為hash是int值,int值32位,默認值情況下此時segmentShift = 28
this.segmentShift = 32 - sshift;
// 散列算法的掩碼,默認值情況下segmentMask = 15, 定位segment的時候需要根據segment[]長度取模, 即hash(key)&(ssize - 1)
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 計算每個segment中table的容量, 初始容量=16, 並發數=16, 則segment中的Entry[]長度為1。
int c = initialCapacity / ssize;
// 處理無法整除的情況,取上限
if (c * ssize < initialCapacity)
++c;
// MIN_SEGMENT_TABLE_CAPACITY默認時2,cap是2的n次方
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 創建segments並初始化第一個segment數組,其余的segment延遲初始化
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 默認並發數=16
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
由圖和源碼可知,當用默認構造函數時,最大並發數是 16,即最大允許 16 個線程同步寫操作,且無法擴展。所以如果我們的場景數據量比較大時,應該設置合適的並發數,避免頻繁鎖沖突。
3.4 put()操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 根據key的hash再次進行hash運算
int hash = hash(key.hashCode());
// 基於hash定位segment數組的索引。
// hash值是int值,32bits。segmentShift=28,無符號右移28位,剩下高4位,其余補0。
// segmentMask=15,二進制低4位全部是1,所以j相當於hash右移后的低4位。
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 找到對應segment
s = ensureSegment(j);
// 將新節點插入segment中
return s.put(key, hash, value, false);
}
找出對應 segment,如果不存在就創建並初始化
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
// 當前的segments數組
final Segment<K,V>[] ss = this.segments;
// 計算原始偏移量,在segments數組的位置
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 判斷沒有被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 獲取第一個segment ss[0]作為原型
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length; // 容量
float lf = proto.loadFactor; // 負載因子
int threshold = (int)(cap * lf); // 閾值
// 初始化ss[k] 內部的tab數組 // recheck
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再次檢查這個ss[k] 有沒有被初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋。getObjectVolatile 保證了讀的可見性,所以一旦有一個線程初始化了,那么就結束自旋
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
3.5 segment 插入節點
上一步找到 segment 位置后計算節點在 segment 中的位置。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 是否獲取鎖,失敗自旋獲取鎖(直到成功)
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value); // 失敗了才會scanAndLockForPut
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 獲取到bucket位置的第一個節點
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
// hash沖突
if (e != null) {
K k;
// key相等則覆蓋
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
// 不相等則遍歷鏈表
e = e.next;
}
else {
if (node != null)
// 將新節點插入鏈表作為表頭
node.setNext(first);
else
// 創建新節點並插入表頭
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 判斷元素個數是否超過了閾值或者segment中數組的長度超過了MAXIMUM_CAPACITY,如果滿足條件則rehash擴容!
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 擴容
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
}
如果加鎖失敗則先走 scanAndLockForPut()
方法。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 根據hash獲取頭結點
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 嘗試獲取鎖,成功就返回,失敗就開始自旋
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
// 如果頭結點不存在
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
// 和頭結點key相等
else if (key.equals(e.key))
retries = 0;
else
// 下一個節點 直到為null
e = e.next;
}
// 達到自旋的最大次數
else if (++retries > MAX_SCAN_RETRIES) {
// lock()是阻塞方法。進入加鎖方法,失敗進入隊列,阻塞當前線程
lock();
break;
}
// TODO (retries & 1) == 0 沒理解
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
// 頭結點變化,需要重新遍歷,說明有新的節點加入或者移除
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
(retries & 1) == 0 沒理解是在做什么,有小伙伴看明白了請賜教。
最后
本文到此結束,主要是學習分段鎖是如何工作的。謝謝大家的觀看。