源碼分析 CurrentHashMap 1.7


1.0 數據結構

  

 

ConcurrentHashMap 是由 Segment 數組 結構和 HashEntry 數組 結構組成。

  • Segment 是一種可重入鎖 ReentrantLock,在 ConcurrentHashMap 里扮演鎖的角色,HashEntry 則用於存儲鍵值對數據。
  • ConcurrentHashMap 里包含一個 Segment 數組,Segment 的結構和 HashMap 類似,一個 Segment 里包含一個 HashEntry 數組,每個 HashEntry 是一個鏈表結構的元素, 每個 Segment 守護者一個 HashEntry 數組里的元素,當對 HashEntry 數組的數據進行修改時,必須首先獲得它對應的 Segment 鎖。

2.0 構造函數

屬性 說明
concurrencyLevel 並發度,程序運行時能夠同時更新 ConcurrentHashMap 且不產生鎖競爭的最大線程數,分段鎖個數,即 Segment[] 的數組長度,默認為 16。用戶也可以在構造函數中設置並發度。
initialCapacity 初始容量,指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。
loadFactor 負載因子,Segment 數組不可以擴容,負載因子供每個 Segment 內部使用。

 

  • 和 JDK 1. 6 不同,JDK 1. 7 中除了第一個 Segment 之外,剩余的 Segments 采用的是 延遲初始化 機制:每次 put 之前都需要檢查 key 對應的 Segment 是否為 null,如果是則調用 ensureSegment() 以確保對應的 Segment 被創建。
  • ensureSegment() 可能在並發環境下被調用,但並未使用鎖來控制競爭,而是使用了 Unsafe 對象的 getObjectVolatile() 提供的原子讀語義結合 CAS 來確保 Segment 創建的原子性

   

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 默認值,concurrencyLevel 為 16,sshift 為 4
    // 那么計算出 segmentShift 為 28,segmentMask 為 15,后面會用到這兩個值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
 
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
 
    // initialCapacity 是設置整個 map 初始的大小,
    // 這里根據 initialCapacity 計算 Segment 數組中每個位置可以分到的大小
    // 如 initialCapacity 為 64,那么每個 Segment 或稱之為"槽"可以分到 4 個
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,
    // 插入一個元素不至於擴容,插入第二個的時候才會擴容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c)
        cap <<= 1;
 
    // 創建 Segment 數組,
    // 並創建數組的第一個元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往數組寫入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
  • Segment 數組長度為 16,不可以擴容。
  • Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容。
  • 只初始化了 segment[0],其他位置仍然是 null。
  • 當前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,為移位數和掩碼。

3.0 put方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 計算 key 的 hash 值
    int hash = hash(key);
    // 2. 根據 hash 值找到 Segment 數組中的位置 j
    //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,
    //    然后和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最后 4 位,也就是槽的數組下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 初始化的時候只初始化了 segment[0],其他位置還是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);  //初始化槽 // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);  //開始插入
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨占鎖,獲取失敗嘗試獲取自旋鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 內部的數組
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求應該放置的數組下標
        int index = (tab.length - 1) & hash;
        // first 是數組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
 
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順着鏈表走
                e = e.next;
            }
            else {
                // node 是不是 null,這個要看獲取鎖的過程。
                // 如果不為 null,那就直接將它設置為鏈表表頭;如果是 null,初始化並設置為鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容
                else
                    // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
                    // 將新的結點設置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k],這就是之前要初始化 segment[0] 的原因。
        // 為什么要用 " 當前 ",因為 segment[0] 可能早就擴容過了。
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
 
        // 初始化 segment[k] 內部的數組
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // 再次檢查一遍該槽是否被其他線程初始化。
 
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 循環,內部用 CAS,當前線程成功設值或其他線程成功設值后,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}
                                                                        
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
 
    // 循環獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 進到這里說明數組該位置的鏈表是空的,沒有任何元素
                    // 當然,進到這里的另一個原因是 tryLock() 失敗,所以該槽存在並發,不一定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順着鏈表往下走
                e = e.next;
        }
        // 重試次數如果超過 MAX_SCAN_RETRIES(單核 1 次多核 64 次),那么不搶了,進入到阻塞隊列等待鎖
        //    lock() 是阻塞方法,直到獲取鎖后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 進入這里,說明有新的元素進到了鏈表,並且成為了新的表頭
                 // 這邊的策略是,重新執行 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

  segment 數組不能擴容,是對 segment 數組某個位置內部的數組 HashEntry[] 進行擴容,擴容后容量為原來的 2 倍,該方法沒有考慮並發,因為執行該方法之前已經獲取了鎖

// 方法參數上的 node 是這次擴容后,需要添加到新的數組中的數據。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創建新數組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那么 sizeMask 為 31,對應二進制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
 
    // 遍歷原數組,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應該放置在新數組中的位置,
            // 假設原數組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            if (next == null)   // 該位置處只有一個元素
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前鏈表的頭結點 e 的新位置
                int lastIdx = idx;
 
                // for 循環找到一個 lastRun 結點,這個結點之后的所有元素是將要放到一起的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之后的所有結點組成的這個鏈表放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的結點,
                //    這些結點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

  總結:

  put 方法的流程。

  1. 將當前 Segment 中的 table 通過 key 的 hashcode 定位到 HashEntry。
  2. 遍歷該 HashEntry,如果不為空則判斷傳入的 key 和當前遍歷的 key 是否相等,相等則覆蓋舊的 value。
  3. 不為空則需要新建一個 HashEntry 並加入到 Segment 中,同時會先判斷是否需要擴容。
  4. 最后再解除在第 1 步中所獲取當前 Segment 的鎖。

4.0 get方法流程

  

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據 hash 找到對應的 segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內部數組相應位置的鏈表,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

  總結:

  • get 方法的流程。
  1. 計算 hash 值,找到 segment 數組中的具體位置,獲使用的槽。
  2. 槽中也是一個數組,根據 hash 找到數組中具體的位置。
  3. 順着鏈表進行查找即可。
  • 因為 get 過程中沒有加鎖,因此需要考慮並發問題

5.0 其它

size

  • 要統計整個 ConcurrentHashMap 里元素的大小,就必須統計所有 Segment 里元素的大小后求和。

    • Segment 里的全局變量 count 是一個 volatile 變量。
  • ConcurrentHashMap 的做法是先嘗試 2 次通過不鎖住 Segment 的方式統計各個 Segment 大小,如果統計的過程中,容器的 count 發生了變化,則再采用加鎖的方式來統計所有 Segment 的大小

    • 使用 modCount 變量,在 put、remove 和 clean 方法里操作元素前都會將變量 modCount 進行加 1,在統計 size 前后比較 modCount 是否發生變化,從而得知容器的大小是否發生變化。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM