Java並發容器——ConcurrentSkipListMap和ConcurrentHashMap


附錄:

https://www.cnblogs.com/ygj0930/p/6543901.html 

     一:ConcurrentSkipListMap

          TreeMap使用紅黑樹按照key的順序(自然順序、自定義順序)來使得鍵值對有序存儲但是只能在單線程下安全使用;多線程下想要使鍵值對按照key的順序來存儲,則需要使用ConcurrentSkipListMap。 

           ConcurrentSkipListMap的底層是通過跳表來實現的。跳表是一個鏈表,但是通過使用“跳躍式”查找的方式使得插入、讀取數據時復雜度變成了O(logn)。

           跳表(SkipList):使用“空間換時間”的算法,令鏈表的每個結點不僅記錄next結點位置,還可以按照level層級分別記錄后繼第level個結點。在查找時,首先按照層級查找,比如:當前跳表最高層級為3,即每個結點中不僅記錄了next結點(層級1),還記錄了next的next(層級2)、next的next的next(層級3)結點。現在查找一個結點,則從頭結點開始先按高層級開始查:head->head的next的next的next->。。。直到找到結點或者當前結點q的值大於所查結點,則此時當前查找層級的q的前一節點p開始,在p~q之間進行下一層級(隔1個結點)的查找......直到最終迫近、找到結點。此法使用的就是“先大步查找確定范圍,再逐漸縮小迫近”的思想進行的查找。

           例如:有當前的跳表存儲如下:有4個層級,層級1為最下面的level,是一個包含了所有結點的普通鏈表。往上數就是2,3,4層級。

           (注:圖來自 http://blog.csdn.net/sunxianghuang/article/details/52221913,如有冒犯,請見諒)

 

           現在,我們查找結點值為19的結點:

 

           明白了查找的原理后,插入、刪除就容易理解了。為了保存跳表的有序性,所以分三步:查找合適位置——進行插入/刪除——更新跳表指針,維護層級性。

    插入結點:

 

    刪除結點:

     知道了底層所用數據結構的原理后,我們來看看concurrentskiplistmap的部分源碼:

     插入:

private V doPut(K kkey, V value, boolean onlyIfAbsent) {
    Comparable<? super K> key = comparable(kkey);
    for (;;) {
        // 找到key的前繼節點
        Node<K,V> b = findPredecessor(key);
        // 設置n為“key的前繼節點的后繼節點”,即n應該是“插入節點”的“后繼節點”
        Node<K,V> n = b.next;
        for (;;) {
            if (n != null) {
                Node<K,V> f = n.next;
                // 如果兩次獲得的b.next不是相同的Node,就跳轉到”外層for循環“,重新獲得b和n后再遍歷。
                if (n != b.next)
                    break;
                // v是“n的值”
                Object v = n.value;
                // 當n的值為null(意味着其它線程刪除了n);此時刪除b的下一個節點,然后跳轉到”外層for循環“,重新獲得b和n后再遍歷。
                if (v == null) {               // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                // 如果其它線程刪除了b;則跳轉到”外層for循環“,重新獲得b和n后再遍歷。
                if (v == n || b.value == null) // b is deleted
                    break;
                // 比較key和n.key
                int c = key.compareTo(n.key);
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value))
                        return (V)v;
                    else
                        break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }

            // 新建節點(對應是“要插入的鍵值對”)
            Node<K,V> z = new Node<K,V>(kkey, value, n);
            // 設置“b的后繼節點”為z
            if (!b.casNext(n, z))
                break;         // 多線程情況下,break才可能發生(其它線程對b進行了操作)
            // 隨機獲取一個level
            // 然后在“第1層”到“第level層”的鏈表中都插入新建節點
            int level = randomLevel();
            if (level > 0)
                insertIndex(z, level);
            return null;
        }
    }
}

 

刪除:

final V doRemove(Object okey, Object value) {
    Comparable<? super K> key = comparable(okey);
    for (;;) {
        // 找到“key的前繼節點”
        Node<K,V> b = findPredecessor(key);
        // 設置n為“b的后繼節點”(即若key存在於“跳表中”,n就是key對應的節點)
        Node<K,V> n = b.next;
        for (;;) {
            if (n == null)
                return null;
            // f是“當前節點n的后繼節點”
            Node<K,V> f = n.next;
            // 如果兩次讀取到的“b的后繼節點”不同(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (n != b.next)                    // inconsistent read
                break;
            // 如果“當前節點n的值”變為null(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            Object v = n.value;
            if (v == null) {                    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            // 如果“前繼節點b”被刪除(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (v == n || b.value == null)      // b is deleted
                break;
            int c = key.compareTo(n.key);
            if (c < 0)
                return null;
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }

            // 以下是c=0的情況
            if (value != null && !value.equals(v))
                return null;
            // 設置“當前節點n”的值為null
            if (!n.casValue(v, null))
                break;
            // 設置“b的后繼節點”為f
            if (!n.appendMarker(f) || !b.casNext(n, f))
                findNode(key);                  // Retry via findNode
            else {
                // 清除“跳表”中每一層的key節點
                findPredecessor(key);           // Clean index
                // 如果“表頭的右索引為空”,則將“跳表的層次”-1。
                if (head.right == null)
                    tryReduceLevel();
            }
            return (V)v;
        }
    }
}

 

    查找:

 

private Node<K,V> findNode(Comparable<? super K> key) {
    for (;;) {
        // 找到key的前繼節點
        Node<K,V> b = findPredecessor(key);
        // 設置n為“b的后繼節點”(即若key存在於“跳表中”,n就是key對應的節點)
        Node<K,V> n = b.next;
        for (;;) {
            // 如果“n為null”,則跳轉中不存在key對應的節點,直接返回null。
            if (n == null)
                return null;
            Node<K,V> f = n.next;
            // 如果兩次讀取到的“b的后繼節點”不同(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (n != b.next)                // inconsistent read
                break;
            Object v = n.value;
            // 如果“當前節點n的值”變為null(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (v == null) {                // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (v == n || b.value == null)  // b is deleted
                break;
            // 若n是當前節點,則返回n。
            int c = key.compareTo(n.key);
            if (c == 0)
                return n;
            // 若“節點n的key”小於“key”,則說明跳表中不存在key對應的節點,返回null
            if (c < 0)
                return null;
            // 若“節點n的key”大於“key”,則更新b和n,繼續查找。
            b = n;
            n = f;
        }
    }
}

 

  

通過上面的源碼可以發現:ConcurrentSkipListMap線程安全的原理與非阻塞隊列ConcurrentBlockingQueue的原理一樣:利用底層的插入、刪除的CAS原子性操作,通過死循環不斷獲取最新的結點指針來保證不會出現競態條件。

 

    二:ConcurrentHashMap【本文concurrentHashMap是jdk1.7中的實現,jdk1.8中使用的不是Segment,特此說明】

        快速存取<Key, Value>鍵值對使用HashMap;多線程並發存取<Key, Value>鍵值對使用ConcurrentHashMap;

        我們知道,HashTable和和Collections類中提供的同步HashTable是線程安全的,但是他們線程安全是通過在進行讀寫操作時對整個map加鎖來實現的,故此性能比較低。那既然是由於鎖粒度(加鎖的范圍叫鎖粒度)太大造成的性能低下,可不可以從鎖粒度着手去改良呢?由此,就引申出了ConcurrentHashMap。

        ConcurrentHashMap采取了“鎖分段”技術來細化鎖的粒度:把整個map划分為一系列被成為segment的組成單元,一個segment相當於一個小的hashtable。這樣,加鎖的對象就從整個map變成了一個更小的范圍——一個segment。ConcurrentHashMap線程安全並且提高性能原因就在於:對map中的讀是並發的,無需加鎖;只有在put、remove操作時才加鎖,而加鎖僅是對需要操作的segment加鎖,不會影響其他segment的讀寫,由此,不同的segment之間可以並發使用,極大地提高了性能。

        1:結構分析

   Segment的結構:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;
    transient int modCount;
    transient int threshold;
    transient volatile HashEntry<K,V>[] table;
    final float loadFactor;
}

 

  • count:Segment中元素的數量,用於map.size()時統計整個map的大小使用
  • modCount:對table的大小造成影響的操作的數量(比如put或者remove操作),用於統計size時驗證結果的正確性
  • threshold:閾值,Segment里面元素的數量超過這個值依舊就會對Segment進行擴容,concurrenthashmap自身不會擴容(segment的數量在map創建后不會再增加,在容量不足時只會增加segment的容量
  • table:鏈表數組,數組中的每一個元素代表了一個鏈表的頭部,一個鏈表用於存儲相同hash值的不同元素們
  • loadFactor:負載因子,用於確定threshold,決定擴容的時機

   

    2:查詢

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

HashEntry<K,V> getFirst(int hash) {
    HashEntry<K,V>[] tab = table;
    return tab[hash & (tab.length - 1)];
}

 

    由上面可以看到:concurrenthashmap的查詢操作經過三步:第一次hash確定key在哪個segment中;第二次hash在segment中確定key在鏈表數組的哪個鏈表中;第三步遍歷這個鏈表,調用equals()進行比對,找到與所查找key相等的結點並讀取。

    3:插入

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

 

       插入過程也分三步:首先由key值經過hash計算得到是哪個segment,如果segment大小以及到達閥值則擴容;然后再次hash確定key所在鏈表的數組下標,獲取鏈表頭;最后遍歷鏈表,如果找到相同的key的結點則更新value值,如果沒有則插入新結點;

    4:刪除

       segment的鏈表數組中的鏈表結構如下:

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

 

      我們可以看到,鏈表中結點只有value是可修改的,因此,如果我們需要刪除結點時,是不能簡單地由前繼結點指向被刪結點的后繼結點來實現。所以,我們只能重構鏈表。

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

 

       刪除過程:首先由key經過hash確定所在segment;然后再hash確定具體的數組下標,獲得鏈表頭;最后遍歷鏈表,找到被刪除結點后,以被刪除結點的next結點開始建立新的鏈表,然后再把原鏈表頭直到被刪結點的前繼結點依次復制、插入新鏈表,最后把新鏈表頭設置為當前數組下標元素取代舊鏈表。

    5:統計大小—Size()

       統計整個map的大小時,如果在統計過程中把整個map鎖住,則會造成影響讀寫。ConcurrentHashMap通過采用segment中的屬性成員來優化這個過程。

 
static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;
    transient int modCount;
   ....
}

       我們看到,每個segment中有一個count記錄當前segment的元素數量,每當put/remove成功就會把這個值+1/-1。因此,在統計map的大小時,我們把每個segment的count加起來就是了。但是,如果在加的過程中,發生了修改怎么辦呢?比如:把segment[2]的count加到total后,segment[2]發生了remove操作,這樣就會造成統計結果不正確。此時就需要用modCount,modCount記錄了segment的修改次數,這個值只增不減,無論是插入、刪除都會導致該值+1.

       ConcurrentHashMap在統計size時,經歷了兩次遍歷:第一次不加鎖地遍歷所以segment,統計count和modCount的總和得到C1和M1;然后再次不加鎖地遍歷,得到C2和M2,比較M1和M2,如果修改次數沒有發生變化則說明兩次遍歷期間map沒有發生數量變化,那么C1就是可用的。如果M1不等於M2,則說明在統計過程中map的數量發生了變化,此時才采取最終手段——鎖住整個map進行統計。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM