《java.util.concurrent 包源碼閱讀》04 ConcurrentMap


Java集合框架中的Map類型的數據結構是非線程安全,在多線程環境中使用時需要手動進行線程同步。因此在java.util.concurrent包中提供了一個線程安全版本的Map類型數據結構:ConcurrentMap。本篇文章主要關注ConcurrentMap接口以及它的Hash版本的實現ConcurrentHashMap。

ConcurrentMap是Map接口的子接口

public interface ConcurrentMap<K, V> extends Map<K, V>

與Map接口相比,ConcurrentMap多了4個方法:

1)putIfAbsent方法:如果key不存在,添加key-value。方法會返回與key關聯的value。

V putIfAbsent(K key, V value);

2)remove方法

boolean remove(Object key, Object value);

Map接口中也有一個remove方法:

V remove(Object key);

ConcurrentMap中的remove方法需要比較原有的value和參數中的value是否一致,只有一致才會刪除。

3)Replace方法:有2個重載

boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);

兩個重載的區別和2)中的兩個remove方法的區別很類似,多了一個檢查value一致。

 

通過ConcurrentMap多出來的方法可以看到多線程中一個很重要的概念:compare。compare的作用就是為了保證value的一致性。

 

重頭戲來了:ConcurrentHashMap。

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable

ConcurrentHashMap和HashMap類似,這里重點關注的是如何實現線程安全,也就是如何加鎖。

對於HashMap來說,有一個Entry數組,根據Key的hash值對數組長度取模得到數組下標,找到Entry,遍歷整個Entry鏈表,用equals比較來確定key所在的Entry。

ConcurrentHashMap的基本思想是采取分塊的方式加鎖,分塊數由參數“concurrencyLevel”來決定(和HashMap中的“initialCapacity”類似,實際塊數是第一個大於concurrencyLevel的2的n次方)。每個分塊被稱為Segment,Segment的索引方式和HashMap中的Entry索引方式一致(hash值對數組長度取模)。

對Segment加鎖的方式很簡單,直接把Segment定義為ReentrantLock的子類。同時Segment又是一個特定實現的hash table。

static final class Segment<K,V> extends ReentrantLock implements Serializable

下面分析ConcurrentHashMap讀寫時如何加鎖。

首先是讀操作類的方法,來看get方法:

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

可以看到,讀取的時候沒有調用的Segment的獲取鎖的方法,而是通過hash值定位到Entry,然后遍歷Entry的鏈表。為什么這里不用加鎖呢?看看HashEntry的代碼就會明白了。

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

value和next屬性是帶有volatile修飾符的,可以大膽放心的遍歷和比較。

 

接着是寫操作,寫操作是肯定要加鎖的。因為Segment可以看成是一個hash table,因此ConcurrentHashMap直接調用Segment的對應的寫入方法如put,replace等。

比如put方法

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

因此這里直接關注Segment的對應寫操作方法即可。在每個寫操作的方法開頭都這樣的類似代碼:

        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);

也就是,首先嘗試獲取鎖,如果成功則會帶鎖繼續操作,失敗則要通過scanAndLockscanAndLockForPut獲取鎖,因此這里關注的重點也就轉移到這兩個方法了。

按照多線程環境的規則,如果嘗試獲取鎖失敗的話就會進入阻塞等待狀態,那么這兩個方法的作用應該是類似的。

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

這兩個方法的邏輯:在等待的時候閑着沒事兒干把該做好的准備做好,查找一下目標entry,如果是新建entry就把entry創建好,然后如果一切沒問題就用lock()方法把自己給阻塞了,也就是做好准備然后去等着了。

 

因為Segment本身就可以看成一個hash table,因此必然涉及rehash的問題,因為和HashMap中的rehash類似,在這里就省略了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM