多線程十一之ConcurrentHashMap1.7源碼分析


簡介

  本文是基於JDK7分析ConcurrentHashMap的實現原理,這個版本ConcurrentHashMap的代碼實現比較清晰,代碼加注釋總共也就1622行,適合用來分析學習。
  ConcurrentHashMap相當於多線程版本的HashMap,不會有線程安全問題,在多線程環境下使用HashMap可能產生死循環等問題,在這篇博客里做了很好的解釋:老生常談,HashMap的死循環,我們知道除了HashMap,還有線程安全的HashTable,HashTable的實現原理與HashMap一致,只是HashTable所有的方法都使用了synchronized來修飾確保線程安全性,這在多線程競爭激烈的環境下效率是很低的;ConcurrentHashMap通過鎖分段,把整個哈希表ConcurrentHashMap分成了多個片段(segment),來確保線程安全。下面是JDK對ConcurrentHashMap的介紹:

A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.

大意是ConcurrentHashMap支持並發的讀寫,支持HashTable的所有方法,實現並發讀寫不會鎖定整個ConcurrentHashMap。

ConcurrentHashMap數據結構

  我們回憶一下HashMap的數據結構(JDK7版本),核心是一個鍵值對Entry數組,鍵值對通過鍵的hash值映射到數組上:

  ConcurrentHashMap在初始化時會要求初始化concurrencyLevel作為segment數組長度,即並發度,代表最多有多少個線程可以同時操作ConcurrentHashMap,默認是16,每個segment片段里面含有鍵值對HashEntry數組,是真正存放鍵值對的地方,這就是ConcurrentHashMap的數據結構。

源碼解析

  從圖中可以看到,ConcurrentHashMap離不開Segment,Segment是ConcurrentHashMap的一個靜態內部類,可以看到Segment繼承了重入鎖ReentrantLock,要想訪問Segment片段,線程必須獲得同步鎖,結構如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {

	//嘗試獲取鎖的最多嘗試次數,即自旋次數
    static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

	//HashEntry數組,也就是鍵值對數組
    transient volatile HashEntry<K, V>[] table;
	//元素的個數
    transient int count;
	//segment中發生改變元素的操作的次數,如put/remove
    transient int modCount;
	//當table大小超過閾值時,對table進行擴容,值為capacity *loadFactor
    transient int threshold;
	//加載因子
    final float loadFactor;

    Segment(float lf, int threshold, HashEntry<K, V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
}

  鍵值對HashEntry是ConcurrentHashMap的基本數據結構,多個HashEntry可以形成鏈表用於解決hash沖突。

static final class HashEntry<K,V> {
	//hash值
    final int hash;
	//鍵
    final K key;
	//值
    volatile V value;
	//下一個鍵值對
    volatile HashEntry<K, V> next;

    HashEntry(int hash, K key, V value, HashEntry<K, V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
}

  ConcurrentHashMap成員變量和構造方法如下:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {

    private static final long serialVersionUID = 7249069246763182397L;

	//默認的初始容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;

	//默認加載因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

	//默認的並發度,也就是默認的Segment數組長度
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    //最大容量,ConcurrentMap最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    
	//每個segment中table數組的長度,必須是2^n,最小為2
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

	//允許最大segment數量,用於限定concurrencyLevel的邊界,必須是2^n
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

	//非鎖定情況下調用size和contains方法的重試次數,避免由於table連續被修改導致無限重試
    static final int RETRIES_BEFORE_LOCK = 2;

	//計算segment位置的掩碼值
    final int segmentMask;

	//用於計算算segment位置時,hash參與運算的位數
    final int segmentShift;

	//Segment數組
    final Segment<K,V>[] segments;


    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
		//參數校驗
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
		//找到一個大於等於傳入的concurrencyLevel的2^n數,且與concurrencyLevel最接近
		//ssize作為Segment數組
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
		// 計算每個segment中table的容量
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
		// 確保cap是2^n
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
		// 創建segments並初始化第一個segment數組,其余的segment延遲初始化
        Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                        (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

}

concurrencyLevel 參數表示期望並發的修改 ConcurrentHashMap 的線程數量,用於決定 Segment 的數量,通過算法可以知道就是找到最接近傳入的concurrencyLevel的2的冪次方。而segmentMask 和 segmentShift看上去有點難以理解,作用主要是根據key的hash值做計算定位在哪個Segment片段。

對於哈希表而言,最重要的方法就是put和get了,下面分別來分析這兩個方法的實現:

put(K key, V value)

  put方法實際上只有兩步:1.根據鍵的值定位鍵值對在那個segment片段 2.調用Segment的put方法

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
		//計算鍵的hash值
        int hash = hash(key);
		//通過hash值運算把鍵值對定位到segment[j]片段上
        int j = (hash >>> segmentShift) & segmentMask;
		//檢查segment[j]是否已經初始化了,沒有的話調用ensureSegment初始化segment[j]
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
		//向片段中插入鍵值對
        return s.put(key, hash, value, false);
    }
  • ensureSegment(int k)

  我們從ConcurrentHashMap的構造函數可以發現Segment數組只初始化了Segment[0],其余的Segment是用到了在初始化,用了延遲加載的策略,而延遲加載調用的就是ensureSegment方法

    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
		//按照segment[0]的HashEntry數組長度和加載因子初始化Segment[k]
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
  • put(K key, int hash, V value, boolean onlyIfAbsent)

  調用Segment的put方法插入鍵值對到Segment的HashEntry數組

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
		//Segment繼承ReentrantLock,嘗試獲取獨占鎖
        HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
			//定位鍵值對在HashEntry數組上的位置
            int index = (tab.length - 1) & hash;
			//獲取這個位置的第一個鍵值對
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {//此處有鏈表結構,一直循環到e==null
                    K k;
					//存在與待插入鍵值對相同的鍵,則替換value
                    if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {//onlyIfAbsent默認為false
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
					//node不為null,設置node的next為first,node為當前鏈表的頭節點
                    if (node != null)
                        node.setNext(first);
					//node為null,創建頭節點,指定next為first,node為當前鏈表的頭節點
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
					//擴容條件 (1)entry數量大於閾值 (2) 當前數組tab長度小於最大容量。滿足以上條件就擴容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
						//擴容
                        rehash(node);
                    else
						//tab的index位置設置為node,
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

  • scanAndLockForPut(K key, int hash, V value)

  在不超過最大重試次數MAX_SCAN_RETRIES通過CAS嘗試獲取鎖

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
		//first,e:鍵值對的hash值定位到數組tab的第一個鍵值對
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // negative while locating node
		//線程嘗試通過CAS獲取鎖
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
				//當e==null或key.equals(e.key)時retry=0,走出這個分支
                if (e == null) {
                    if (node == null) // speculatively create node
						//初始化鍵值對,next指向null
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
			//超過最大自旋次數,阻塞
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
			//頭節點發生變化,重新遍歷
            else if ((retries & 1) == 0 &&
                    (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }
  • rehash(HashEntry<K,V> node)

  用於對Segment的table數組進行擴容,擴容后的數組長度是原數組的兩倍。

    private void rehash(HashEntry<K,V> node) {
		//擴容前的舊tab數組
        HashEntry<K,V>[] oldTable = table;
		//擴容前數組長度
        int oldCapacity = oldTable.length;
		//擴容后數組長度(擴容前兩倍)
        int newCapacity = oldCapacity << 1;
		//計算新的閾值
        threshold = (int)(newCapacity * loadFactor);
		//新的tab數組
        HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
		//新的掩碼
        int sizeMask = newCapacity - 1;
		//遍歷舊的數組
        for (int i = 0; i < oldCapacity ; i++) {
			//遍歷數組的每一個元素
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
				//元素e指向的下一個節點,如果存在hash沖突那么e不為空
                HashEntry<K,V> next = e.next;
				//計算元素在新數組的索引
                int idx = e.hash & sizeMask;
				// 桶中只有一個元素,把當前的e設置給新的table
                if (next == null)   //  Single node on list
                    newTable[idx] = e;
				//桶中有布置一個元素的鏈表
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e;
					// idx 是當前鏈表的頭結點 e 的新位置
                    int lastIdx = idx;
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
						//k是單鏈表元素在新數組的位置
                        int k = last.hash & sizeMask;
                        //lastRun是最后一個擴容后不在原桶處的Entry
                        if (k != lastIdx) {
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
					//lastRun以及它后面的元素都在一個桶中
                    newTable[lastIdx] = lastRun;
                    // Clone remaining nodes
                    //遍歷到lastRun即可
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
		//處理引起擴容的那個待添加的節點
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
		//把Segment的table指向擴容后的table
        table = newTable;
    }

get(Object key)

  get獲取元素不需要加鎖,效率高,獲取key定位到的segment片段還是遍歷table數組的HashEntry元素時使用了UNSAFE.getObjectVolatile保證了能夠無鎖且獲取到最新的volatile變量的值

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
		//計算key的hash值
        int h = hash(key);
		//根據hash值計算key在哪個segment片段
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
		//獲取segments[u]的table數組
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
			//遍歷table中的HashEntry元素
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
				//找到相同的key,返回value
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

size()

  size方法用來計算ConcurrentHashMap中儲存元素的個數。那么在統計所有的segment元素的個數是否都需要上鎖呢?如果不上鎖在統計的過程中可能存在其他線程並發存儲/刪除元素,而如果上鎖又會降低讀寫效率。ConcurrentHashMap在實現時使用了折中的方法,它會無鎖遍歷三次把所有的segment的modCount加到sum里面,如果與前一次遍歷結果相比sum沒有改變那么說明這兩次遍歷沒有其他線程修改ConcurrentHashMap,返回segment的count的和;如果每次遍歷與上一次相比都不一樣那就上鎖進行同步。

    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
				//達到RETRIES_BEFORE_LOCK,也就是三次
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
					//遍歷計算segment的modCount和count的和
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
						//是否溢出int范圍
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
				//last是上一次的sum值,相等跳出循環
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
			//解鎖
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

remove(Object key)

  調用Segment的remove方法

    public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }
  • remove(Object key, int hash, Object value)

  獲取同步鎖,移除指定的鍵值對

    final V remove(Object key, int hash, Object value) {
		//獲取同步鎖
        if (!tryLock())
            scanAndLock(key, hash);
        V oldValue = null;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> e = entryAt(tab, index);
			//遍歷鏈表用來保存當前鏈表節點的前一個節點
            HashEntry<K,V> pred = null;
            while (e != null) {
                K k;
                HashEntry<K,V> next = e.next;
				//找到key對應的鍵值對
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    V v = e.value;
					//鍵值對的值與傳入的value相等
                    if (value == null || value == v || value.equals(v)) {
						//當前元素為頭節點,把當前元素的下一個節點設為頭節點
                        if (pred == null)
                            setEntryAt(tab, index, next);
						//不是頭節點,把當前鏈表節點的前一個節點的next指向當前節點的下一個節點
                        else
                            pred.setNext(next);
                        ++modCount;
                        --count;
                        oldValue = v;
                    }
                    break;
                }
                pred = e;
                e = next;
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

  • scanAndLock(Object key, int hash)

  掃描是否含有指定的key並且獲取同步鎖,當方法執行完畢也就是跳出循環肯定成功獲取到同步鎖,跳出循環有兩種方式:1.tryLock方法嘗試獲取獨占鎖成功 2.嘗試獲取超過最大自旋次數MAX_SCAN_RETRIES線程堵塞,當線程從等待隊列中被喚醒獲取到鎖跳出循環。

    private void scanAndLock(Object key, int hash) {
        // similar to but simpler than scanAndLockForPut
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        int retries = -1;
        while (!tryLock()) {
            HashEntry<K,V> f;
            if (retries < 0) {
                if (e == null || key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
            else if ((retries & 1) == 0 &&
                    (f = entryForHash(this, hash)) != first) {
                e = first = f;
                retries = -1;
            }
        }
    }

isEmpty()

  檢查ConcurrentHashMap是否為空。同樣沒有使用同步鎖,通過兩次遍歷:1.確定每個segment是否為0,其中任何一個segment的count不為0,就返回,都為0,就累加modCount為sum.2.第一個循環執行完還沒有推出,map可能為空,再做一次遍歷,如果在這個過程中任何一個segment的count不為0返回false,同時sum減去每個segment的modCount,若循環執行完程序還沒有退出,比較sum是否為0,為0表示兩次檢查沒有元素插入,map確實為空,否則map不為空。

    public boolean isEmpty() {
		//累計segment的modCount值
        long sum = 0L;
        final Segment<K,V>[] segments = this.segments;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum += seg.modCount;
            }
        }
		//再次檢查
        if (sum != 0L) { // recheck unless no modifications
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    if (seg.count != 0)
                        return false;
                    sum -= seg.modCount;
                }
            }
            if (sum != 0L)
                return false;
        }
        return true;
    }

總結

ConcurrentHashMap引入分段鎖的概念提高了並發量,每當線程要修改哈希表時並不是鎖住整個表,而是去操作某一個segment片段,只對segment做同步,通過細化鎖的粒度提高了效率,相對與HashTable對整個哈希表做同步處理更實用與多線程環境。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM