Java並發編程筆記之ConcurrentHashMap原理探究


在多線程環境下,使用HashMap進行put操作時存在丟失數據的情況,為了避免這種bug的隱患,強烈建議使用ConcurrentHashMap代替HashMap。

HashTable是一個線程安全的類,它使用synchronized來鎖住整張Hash表來實現線程安全,即每次鎖住整張表讓線程獨占,相當於所有線程進行讀寫時都去競爭一把鎖,導致效率非常低下。ConcurrentHashMap可以做到讀取數據不加鎖,並且其內部的結構可以讓其在進行寫操作的時候能夠將鎖的粒度保持地盡量地小,允許多個修改操作並發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不同部分進行的修改。ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的Hashtable,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以並發進行。

 

CouncurrentHashMap實現原理

ConcurrentHashMap 為了提高本身的並發能力,在內部采用了一個叫做 Segment 的結構,一個 Segment 其實就是一個類 Hash Table 的結構,Segment 內部維護了一個鏈表數組,我們用下面這一幅圖來看下 ConcurrentHashMap 的內部結構,從下面的結構我們可以了解到,ConcurrentHashMap 定位一個元素的過程需要進行兩次Hash操作,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的鏈表的頭部,因此,這一種結構的帶來的副作用是 Hash 的過程要比普通的 HashMap 要長,但是帶來的好處是寫操作的時候可以只對元素所在的 Segment 進行操作即可,不會影響到其他的 Segment,這樣,在最理想的情況下,ConcurrentHashMap 可以最高同時支持 Segment 數量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的 Segment上),所以,通過這一種結構,ConcurrentHashMap 的並發能力可以大大的提高。我們用下面這一幅圖來看下ConcurrentHashMap的內部結構詳情圖,如下:

不難看出,ConcurrentHashMap采用了二次hash的方式,第一次hash將key映射到對應的segment,而第二次hash則是映射到segment的不同桶(bucket)中。

為什么要用二次hash,主要原因是為了構造分離鎖,使得對於map的修改不會鎖住整個容器,提高並發能力。當然,沒有一種東西是絕對完美的,二次hash帶來的問題是整個hash的過程比hashmap單次hash要長,所以,如果不是並發情形,不要使用concurrentHashmap。

JAVA7之前ConcurrentHashMap主要采用鎖機制,在對某個Segment進行操作時,將該Segment鎖定,不允許對其進行非查詢操作,而在JAVA8之后采用CAS無鎖算法,這種樂觀操作在完成前進行判斷,如果符合預期結果才給予執行,對並發操作提供良好的優化.

讓我們先看JDK1.7的ConcurrentHashMap的原理分析

JDK1.7的ConcurrentHashMap

如上所示,是由 Segment 數組、HashEntry 組成,和 HashMap 一樣,仍然是數組加鏈表。

讓我們看看Segment里面的成員變量,源碼如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;    //Segment中元素的數量
    transient int modCount;          //對table的大小造成影響的操作的數量(比如put或者remove操作)
    transient int threshold;        //閾值,Segment里面元素的數量超過這個值那么就會對Segment進行擴容
    final float loadFactor;         //負載因子,用於確定threshold
    transient volatile HashEntry<K,V>[] table;    //鏈表數組,數組中的每一個元素代表了一個鏈表的頭部
}

 

接着再看看HashEntry中的組成,源碼如下:

  /**
     * ConcurrentHashMap列表Entry。注意,這不會作為用戶可見的Map.Entry導出。
     */
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * 設置具有volatile寫語義的next字段。
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
     //下一個HashEntry的偏移量
static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class;
          //獲取HashEntry next在內存中的偏移量 nextOffset
= UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

和 HashMap 非常類似,唯一的區別就是其中的核心數據如 value ,以及鏈表都是 volatile 修飾的,保證了獲取時的可見性。

原理上來說:ConcurrentHashMap 采用了分段鎖技術,其中 Segment 繼承於 ReentrantLock。不會像 HashTable 那樣不管是 put 還是 get 操作都需要做同步處理,理論上 ConcurrentHashMap 支持 CurrencyLevel (Segment 數組數量)的線程並發。每當一個線程占用鎖訪問一個 Segment 時,不會影響到其他的 Segment。

接着讓我們繼續看看JDK1.7中ConcurrentHashMap的成員變量和構造函數,源碼如下:

// 默認初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默認加載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默認segment層級
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// segment最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 一個segment最大容量
static final int MAX_SEGMENTS = 1 << 16;
// 鎖之前重試次數
static final int RETRIES_BEFORE_LOCK = 2;

public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 找到兩種大小的最匹配參數
        int sshift = 0;
        // segment數組的長度是由concurrentLevel計算來的,segment數組的長度是2的N次方,
        // 默認concurrencyLevel = 16, 所以ssize在默認情況下也是16,此時 sshift = 4
        // sshift相當於ssize從1向左移的次數
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift; 
            ssize <<= 1;
        }
        // 段偏移量,默認值情況下此時segmentShift = 28
        this.segmentShift = 32 - sshift;
        // 散列算法的掩碼,默認值情況下segmentMask = 15
        this.segmentMask = ssize - 1;

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;

        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        // 創建ssize長度的Segment數組
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
 }

其中,concurrencyLevel 一經指定,不可改變,后續如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中鏈表數組的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment里面的元素做一次rehash就可以了。

  整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據concurrencyLevel來new出Segment,這里Segment的數量是不大於concurrencyLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便采用移位操作來進行hash,加快hash的過程。接下來就是根據intialCapacity確定Segment的容量的大小,每一個Segment的容量大小也是2的指數,同樣使為了加快hash的過程。

注意一下兩個變量segmentShift和segmentMask,這兩個變量在后面將會起到很大的作用,假設構造函數確定了Segment的數量是2的n次方,那么segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。

接下來讓我們看看JDK1.7中的ConcurrentHashMap的核心方法 put 方法和get 方法。

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
     //(1)
int hash = hash(key);
     //(2)
int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
     //(3)
return s.put(key, hash, value, false); }

代碼(1)計算key的hash值

代碼(2)根據hash值,segmentShift,segmentMask定位到哪個Segment。

代碼(3)將鍵值對保存到對應的segment中。

可以看到首先是通過 key 定位到 Segment,之后在對應的 Segment 中進行具體的 put。 Segment 中進行具體的 put的源碼如下:

  final V put(K key, int hash, V value, boolean onlyIfAbsent) {
       //(1) HashEntry
<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try {
          //(2) HashEntry
<K,V>[] tab = table;
          //(3)
int index = (tab.length - 1) & hash;
          //(4) HashEntry
<K,V> first = entryAt(tab, index);
          //(5)
for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; }
            //(6)
else {
              
if (node != null)
                 //(7) node.setNext(first);
else //(8) node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1;
              //(9)
if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else //(10) setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally {
         //(11) unlock(); }
return oldValue; }

雖然 HashEntry 中的 value 是用 volatile 關鍵詞修飾的,但是並不能保證並發的原子性,所以 put 操作時仍然需要加鎖處理。

代碼(1)首先第一步的時候會嘗試獲取鎖,如果獲取失敗肯定就有其他線程存在競爭,則利用 scanAndLockForPut() 自旋獲取鎖。

代碼(2)每一個Segment對應一個HashEntry[ ]數組。

代碼(3)計算對應HashEntry數組的下標 ,每個segment中數組的長度都是2的N次方,所以這里經過運算之后,取的是hash的低幾位數據。

代碼(4)定位到HashEntry的某一個結點(對應鏈表的表頭結點)。

代碼(5)遍歷鏈表。

代碼(6)如果鏈表為空(即表頭為空)

代碼(7)將新節點插入到鏈表作為鏈表頭。、

代碼(8)根據key和value 創建結點並插入鏈表。

代碼(9)判斷元素個數是否超過了閾值或者segment中數組的長度超過了MAXIMUM_CAPACITY,如果滿足條件則rehash擴容!

代碼(10)不需要擴容時,將node放到數組(HashEntry[])中對應的位置

代碼(11)最后釋放鎖。

總的來說,put 的流程如下:

  1. 將當前 Segment 中的 table 通過 key 的 hashcode 定位到 HashEntry。
  2. 遍歷該 HashEntry,如果不為空則判斷傳入的 key 和當前遍歷的 key 是否相等,相等則覆蓋舊的 value。
  3. 不為空則需要新建一個 HashEntry 並加入到 Segment 中,同時會先判斷是否需要擴容。
  4. 最后會解除在 代碼(1) 中所獲取當前 Segment 的鎖。

接着讓我們看看其擴容,rehash源碼如下:

       /**
         * 兩倍於之前的容量
         */
        @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {

            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            // 擴大1倍(左移一位)
            int newCapacity = oldCapacity << 1;
            // 計算新的閾值
            threshold = (int)(newCapacity * loadFactor);
            // 創建新的數組
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            // mask
            int sizeMask = newCapacity - 1;
            // 遍歷舊數組數據
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i]; // 對應一個鏈表的表頭結點
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    // 計算e對應的這條鏈表在新數組中對應的下標
                    int idx = e.hash & sizeMask; 
                    if (next == null)   //  只有一個結點時直接放入(新的)數組中
                        newTable[idx] = e;
                    else { // 鏈表有多個結點時:
                        HashEntry<K,V> lastRun = e; // 就鏈表的表頭結點做為新鏈表的尾結點
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            // 舊數組中一個鏈表中的數據並不一定在新數組中屬於同一個鏈表,所以這里需要每次都重新計算
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // lastRun(和之后的元素)插入數組中。
                        newTable[lastIdx] = lastRun;
                        // 從(舊鏈表)頭結點向后遍歷,遍歷到最后一組不同於前面hash值的組頭。
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n); // 拼接鏈表
                        }
                    }
                }
            }
            // 將之前的舊數據都添加到新的結構中之后,才會插入新的結點(依舊是插入表頭)
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
      }

 

接着,再看看scanAndLockForPut() 自旋獲取鎖,源碼如下:

 

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // 定位節點時為負數
       //(1) while (!tryLock()) { HashEntry<K,V> f; // 首先在下面重新檢查 if (retries < 0) { if (e == null) { if (node == null) // 推測性地創建節點 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; }
          //(2)
else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // 如果Entry改變則重新遍歷 retries = -1; } } return node; }

掃描包含給定key的節點,同時嘗試獲取鎖,如果沒有找到,則創建並返回一個。

返回時,保證鎖被持有。

與大多數方法不同,對方法equals的調用不進行篩選:由於遍歷速度無關緊要,我們還可以幫助預熱相關代碼和訪問。

代碼(1)嘗試自旋獲取鎖。

代碼(2)如果重試的次數達到了 MAX_SCAN_RETRIES 則改為阻塞鎖獲取,保證能獲取成功。

 

接下來,再讓我們看看JDK1.7中的get方法,源碼如下:

   public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        // 首先計算出segment數組的下標  ((h >>> segmentShift) & segmentMask))
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) { // 根據下標找到segment
            // 然后(tab.length - 1) & h) 得到對應HashEntry數組的下標
            // 遍歷鏈表
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;

                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

可以看到get 邏輯沒有前面的方法復雜:

只需要將 Key 通過 Hash 之后定位到具體的 Segment ,再通過一次 Hash 定位到具體的元素上。

由於 HashEntry 中的 value 屬性是用 volatile 關鍵詞修飾的,保證了內存可見性,所以每次獲取時都是最新值。

ConcurrentHashMap 的 get 方法是非常高效的,因為整個過程都不需要加鎖

 

接着再看看remove方法,源碼如下:

public V remove(Object key) {
        // 計算hash值
        int hash = hash(key);
        // 根據hash值找到對應的segment
        Segment<K,V> s = segmentForHash(hash);
        // 調用Segment.remove 函數
        return s == null ? null : s.remove(key, hash, null);
}
public boolean remove(Object key, Object value) {
        int hash = hash(key);
        Segment<K,V> s;
        return value != null && (s = segmentForHash(hash)) != null &&
            s.remove(key, hash, value) != null;
}

Segment.remove函數的源碼如下:

     /**
         * Remove; match on key only if value null, else match both.
         */
        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                // 計算HashEntry數組下標
                int index = (tab.length - 1) & hash;
                // 找到頭結點
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) { // 找到對應節點
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                // 當pred為空時,表示要移除的是鏈表的表頭節點,重新設置鏈表
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            // 記錄舊value值
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

 

JDK1.8中的ConcurrentHashMap原理分析

1.7 已經解決了並發問題,並且能支持 N 個 Segment 這么多次數的並發,但依然存在 HashMap 在 1.7 版本中的問題。那么是什么問題呢?

  很明顯那就是查詢遍歷鏈表效率太低。

因此 1.8 做了一些數據結構上的調整。,在 JAVA8 中它摒棄了 Segment(鎖段)的概念,而是啟用了一種全新的方式實現,利用 CAS 算法。底層依然由“數組”+鏈表+紅黑樹的方式思想,但是為了做到並發,又增加了很多輔助的類,例如 TreeBin、Traverser等對象內部類。

如何讓多線程之間,對象的狀態對於各線程的“可視性”是順序一致的:ConcurrentHashMap 使用了 happens-before 規則來實現。 happens-before規則(摘取自 JAVA 並發編程):

  • 程序次序法則:線程中的每個動作A都 happens-before 於該線程中的每一個動作B,其中,在程序中,所有的動作B都能出現在A之后。
  • 監視器鎖法則:對一個監視器鎖的解鎖 happens-before 於每一個后續對同一監視器鎖的加鎖。
  • volatile 變量法則:對 volatile 域的寫入操作 happens-before 於每一個后續對同一個域的讀寫操作。
  • 線程啟動法則:在一個線程里,對 Thread.start 的調用會 happens-before 於每個啟動線程的動作。
  • 線程終結法則:線程中的任何動作都 happens-before 於其他線程檢測到這個線程已經終結、或者從 Thread.join 調用中成功返回,或 Thread.isAlive 返回 false。
  • 中斷法則:一個線程調用另一個線程的 interrupt happens-before 於被中斷的線程發現中斷。
  • 終結法則:一個對象的構造函數的結束 happens-before 於這個對象 finalizer 的開始。
  • 傳遞性:如果 A happens-before 於 B,且 B happens-before 於 C,則 A happens-before於C:

        假設代碼有兩條語句,代碼順序是語句1先於語句2執行;那么只要語句之間不存在依賴關系,那么打亂它們的順序對最終的結果沒有影響的話,那么真正交給CPU去執行時,他們的執行順序可以是先執行語句2然后語句1。

首先來看下底層的組成結構(下圖是百度來的,懶得畫了):

可以看到JDK1.8ConcurrentHashMap 和JDK1.8的HashMap是很相似的。其中拋棄了原有的 Segment 分段鎖,而采用了 CAS + synchronized 來保證並發安全性。

//鍵值輸入。 此類永遠不會作為用戶可變的Map.Entry導出(即,一個支持setValue;請參閱下面的MapEntry),
//但可以用於批量任務中使用的只讀遍歷。 具有負哈希字段的節點的子類是特殊的,並且包含空鍵和值(但永遠不會導出)。 否則,鍵和val永遠不會為空。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * 對map.get()的虛擬化支持; 在子類中重寫。 */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }

也將 1.7 中存放數據的 HashEntry 改為 Node,但作用都是相同的。

其中的 val next 都用了 volatile 修飾,保證了可見性。

接着再看看put方法的源碼,源碼如下:

   public V put(K key, V value) {
        return putVal(key, value, false);
   }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
     //(1)
if (key == null || value == null) throw new NullPointerException();
     //(2)
int hash = spread(key.hashCode()); int binCount = 0;
     //(3)
for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh;
       //(4)
if (tab == null || (n = tab.length) == 0) tab = initTable();
       //(5)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
       //(6)
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null;
          //(7) synchronized (f) {
if (tabAt(tab, i) == f) {
              //(8)
if (fh >= 0) { binCount = 1;
                 //(9)
for (Node<K,V> e = f;; ++binCount) { K ek;
                   //(10)
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e;
                   //(11)如果遍歷到了最后一個結點,那么就證明新的節點需要插入 就把它插入在鏈表尾部
if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
              //(12)
else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) {
            //(13)
if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } }
     //代碼(14) addCount(
1L, binCount); return null; }

代碼(1)若為空 拋異常

代碼(2)計算hash值

代碼(3)

代碼(4)判斷是否需要進行初始化。

代碼(5)f 即為當前 key 定位出的 Node,如果為空表示當前位置可以寫入數據,利用 CAS 嘗試寫入,失敗則自旋保證成功。

代碼(6)如果當前位置的 hashcode == MOVED == -1,則需要進行擴容。

代碼(7)如果都不滿足,則利用 synchronized 鎖寫入數據。結點上鎖 這里的結點可以理解為hash值相同組成的鏈表的頭結點

代碼(8)fh〉0 說明這個節點是一個鏈表的節點 不是樹的節點.

代碼(9)在這里遍歷鏈表所有的結點

代碼(10)如果hash值和key值相同 則修改對應結點的value值

代碼(11)如果遍歷到了最后一個結點,那么就證明新的節點需要插入 就把它插入在鏈表尾部

代碼(12)如果這個節點是樹節點,就按照樹的方式插入值

代碼(13)如果鏈表長度已經達到臨界值8 就需要把鏈表轉換為樹結構。如果數量大於 TREEIFY_THRESHOLD 則要轉換為紅黑樹。

代碼(14)將當前ConcurrentHashMap的元素數量+1

 

接着我我們在看看JDK1.8中ConcurrentHashMap的get方法源碼,源碼如下:

// GET方法(JAVA8)
public V get(Object key) {  
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
    //計算hash值  
    int h = spread(key.hashCode());  
    //根據hash值確定節點位置  
    if ((tab = table) != null && (n = tab.length) > 0 &&  
        (e = tabAt(tab, (n - 1) & h)) != null) {  
        //如果搜索到的節點key與傳入的key相同且不為null,直接返回這個節點    
        if ((eh = e.hash) == h) {  
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                return e.val;  
        }  
        //如果eh<0 說明這個節點在樹上 直接尋找  
        else if (eh < 0)  
            return (p = e.find(h, key)) != null ? p.val : null;  
         //否則遍歷鏈表 找到對應的值並返回  
        while ((e = e.next) != null) {  
            if (e.hash == h &&  
                ((ek = e.key) == key || (ek != null && key.equals(ek))))  
                return e.val;  
        }  
    }  
    return null;  
}  

 

接着再看看JDK1.8中ConcurrentHashMap的remove方法源碼,源碼如下:

// REMOVE OR REPLACE方法(JAVA8)
 final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 數組不為空,長度不為0,指定hash碼值為0
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        // 是一個 forwardNode
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        // 循環尋找
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            // equal 相同 取出
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                 // value為null或value和查到的值相等  
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    // 若是樹 紅黑樹高效查找/刪除
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

可以看出 JDK1.8 和 JDK1.7 對 ConcurrentHashMap 的實現改變,筆者更喜歡 CAS 無鎖機制,如果只是看我寫以上代碼注釋明顯不足以了解 JAVA8 的 ConcurrentHashMap 的實現,我也僅僅提供源碼閱讀的思路,其中 cas、volatile、final 等注意已經給解釋,所以如果大家真的感興趣還是寫程序,打斷點,一步步看看這個代碼的實現.

1.8 在 1.7 的數據結構上做了大的改動,采用紅黑樹之后可以保證查詢效率(O(logn)),甚至取消了 ReentrantLock 改為了 synchronized,這樣可以看出在新版的 JDK 中對 synchronized 優化是很到位的。

 

相信到這里為止,理解上面的內容,遇到面試,問題都迎刃而解,下面是網上找的面試題,如下:

(1)你知道 HashMap 的工作原理嗎?你知道 HashMap 的 get() 方法的工作原理嗎?

HashMap 是基於 hashing 的原理,我們使用 put(key, value) 存儲對象到 HashMap 中,使用 get(key) 從 HashMap 中獲取對象。當我們給 put() 方法傳遞鍵和值時,我們先對鍵調用 hashCode() 方法,返回的 hashCode 用於找到 bucket 位置來儲存 Entry 對象。

(2)你知道 ConcurrentHashMap 的工作原理嗎?你知道 ConcurrentHashMap 在 JAVA8 和 JAVA7 對比有哪些不同呢?

ConcurrentHashMap 為了提高本身的並發能力,在內部采用了一個叫做 Segment 的結構,一個 Segment 其實就是一個類 Hash Table 的結構,Segment 內部維護了一個鏈表數組,我們用下面這一幅圖來看下 ConcurrentHashMap 的內部結構,從下面的結構我們可以了解到,ConcurrentHashMap 定位一個元素的過程需要進行兩次Hash操作,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的鏈表的頭部,因此,這一種結構的帶來的副作用是 Hash 的過程要比普通的 HashMap 要長,但是帶來的好處是寫操作的時候可以只對元素所在的 Segment 進行操作即可,不會影響到其他的 Segment,這樣,在最理想的情況下,ConcurrentHashMap 可以最高同時支持 Segment 數量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的 Segment上),所以,通過這一種結構,ConcurrentHashMap 的並發能力可以大大的提高。

JAVA7之前ConcurrentHashMap主要采用鎖機制,在對某個Segment進行操作時,將該Segment鎖定,不允許對其進行非查詢操作,而在JAVA8之后采用CAS無鎖算法,這種樂觀操作在完成前進行判斷,如果符合預期結果才給予執行,對並發操作提供良好的優化

(3)當兩個對象的hashcode相同會發生什么?

因為hashcode相同,所以它們的bucket位置相同,‘碰撞’會發生。因為Map使用LinkedList存儲對象,這個Entry(包含有鍵值對的Map.Entry對象)會存儲在LinkedList中。(當向 Map 中添加 key-value 對,由其 key 的 hashCode() 返回值決定該 key-value 對(就是 Entry 對象)的存儲位置。當兩個 Entry 對象的 key 的 hashCode() 返回值相同時,將由 key 通過 eqauls() 比較值決定是采用覆蓋行為(返回 true),還是產生 Entry 鏈(返回 false)),此時若你能講解JDK1.8紅黑樹引入,面試官或許會刮目相看。

(4)如果兩個鍵的 hashcode 相同,你如何獲取值對象?

當我們調用get()方法,HashMap 會使用鍵對象的 hashcode 找到 bucket 位置,然后獲取值對象。如果有兩個值對象儲存在同一個 bucket,將會遍歷 LinkedList 直到找到值對象。找到 bucket 位置之后,會調用 keys.equals() 方法去找到 LinkedList 中正確的節點,最終找到要找的值對象。(當程序通過 key 取出對應 value 時,系統只要先計算出該 key 的 hashCode() 返回值,在根據該 hashCode 返回值找出該 key 在 table 數組中的索引,然后取出該索引處的 Entry,最后返回該 key 對應的 value 即可)。

(5)如果HashMap的大小超過了負載因子(load factor)定義的容量,怎么辦?

當一個map填滿了75%的bucket時候,和其它集合類(如ArrayList等)一樣,將會創建原來HashMap大小的兩倍的bucket數組,來重新調整map的大小,並將原來的對象放入新的bucket數組中。這個過程叫作rehashing,因為它調用hash方法找到新的bucket位置。

(6)你了解重新調整HashMap大小存在什么問題嗎?

當重新調整HashMap大小的時候,確實存在條件競爭,因為如果兩個線程都發現HashMap需要重新調整大小了,它們會同時試着調整大小。在調整大小的過程中,存儲在LinkedList中的元素的次序會反過來,因為移動到新的bucket位置的時候,HashMap並不會將元素放在LinkedList的尾部,而是放在頭部,這是為了避免尾部遍歷(tail traversing)。如果條件競爭發生了,那么就死循環了。這個時候,你可以質問面試官,為什么這么奇怪,要在多線程的環境下使用HashMap呢?

(7)請問ConcurrentHashMap中變量使用final和volatile修飾有什么用呢?其中鏈表是final的next屬性,那么發生刪除某個元素,如何實現的?

使用final來實現不變模式(immutable),他是多線程安全里最簡單的一種保障方式。因為你拿他沒有辦法,想改變它也沒有機會。不變模式主要通過final關鍵字來限定的。在JMM中final關鍵字還有特殊的語義。Final域使得確保初始化安全性(initialization safety)成為可能,初始化安全性讓不可變形對象不需要同步就能自由地被訪問和共享。

使用volatile來保證某個變量內存的改變對其他線程即時可見,在配合CAS可以實現不加鎖對並發操作的支持

remove執行的開始就將table賦給一個局部變量tab,將tab依次復制出來,最后直到該刪除位置,將指針指向下一個變量。

(8)描述一下ConcurrentHashMap中remove操作,有什么需要注意的?

需要注意如下幾點。第一,當要刪除的結點存在時,刪除的最后一步操作要將count的值減一。這必須是最后一步操作,否則讀取操作可能看不到之前對段所做的結構性修改。第二,remove執行的開始就將table賦給一個局部變量tab,這是因為table是volatile變量,讀寫volatile變量的開銷很大。編譯器也不能對volatile變量的讀寫做任何優化,直接多次訪問非volatile實例變量沒有多大影響,編譯器會做相應優化。

(9)HashTable與ConcurrentHashMap有什么區別,描述鎖分段技術。

HashTable容器在競爭激烈的並發環境下表現出效率低下的原因,是因為所有訪問HashTable的線程都必須競爭同一把鎖,那假如容器里有多把鎖,每一把鎖用於鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高並發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,並且其成員變量實際上也是final的,但是,僅僅是將數組聲明為final的並不保證數組成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM