java並發編程——並發容器


概述

java cocurrent包提供了很多並發容器,在提供並發控制的前提下,通過優化,提升性能。本文主要討論常見的並發容器的實現機制和絕妙之處,但並不會對所有實現細節面面俱到。

為什么JUC需要提供並發容器?

java collection framework提供了豐富的容器,有map、list、set、queue、deque。但是其存在一個不足:多數容器類都是非線程安全的,即使部分容器是線程安全的,由於使用sychronized進行鎖控制,導致讀/寫均需進行鎖操作,性能很低。

java collection framework可以通過以下兩種方式實現容器對象讀寫的並發控制,但是都是基於sychronized鎖控制機制,性能低:

1. 使用sychronized方法進行並發控制,如HashTable 和 Vector。以下代碼為Vector.add(e)的java8實現代碼:

    public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }

2.使用工具類Collections將非線程安全容器包裝成線程安全容器。以下代碼是Collections.synchronizedMap(Map<K,V> m)將原始Map包裝為線程安全的SynchronizedMap,但是實際上最終操作時,仍然是在被包裝的原始m上進行,只是SynchronizedMap的所有方法都加上了synchronized鎖控制。

    public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
        return new SynchronizedMap<>(m);   //將原始Map包裝為線程安全的SynchronizedMap
    }
    private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {

        private final Map<K,V> m;       // Backing Map 原始的非線程安全的map對象
        final Object      mutex;        // Object on which to synchronize  加鎖對象

        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }

        public V get(Object key) {      
            synchronized (mutex) {return m.get(key);} //所有方法加上synchronized鎖控制
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);} //所有方法加上synchronized鎖控制
        }
......
}

為了提供高效地並發容器,java 5在java.util.cocurrent包中 引入了並發容器。

JUC並發容器

本節對juc常用的幾個並發容器進行代碼分析,重點看下這些容器是如何高效地實現並發控制的。在進行具體的並發容器介紹之前,我們提前搞清楚CAS理論是什么東西。因為在juc並發容器的很多地方都使用到了CAS,他比加鎖處理更加高效。

CAS

CAS是一種無鎖的非阻塞算法,全稱為:Compare-and-swap(比較並交換),大致思路是:先比較目標對象現值是否和舊值一致,如果一致,則更新對象為新值;如果不一致,則表明對象已經被其他線程修改,直接返回。算法實現的偽碼如下:

function cas(p : pointer to int, old : int, new : int) returns bool {
    if *p ≠ old {
        return false
    }
    *p ← new
    return true
}

參考自wiki:Compare-and-swap

ConcurrentHashMap

ConcurrentHashMap實現了HashTable的所有功能,線程安全,但卻在檢索元素時不需要鎖定,因此效率更高。

ConcurrentHashMap的key 和 value都不允許null出現。原因在於ConcurrentHashMap不能區分出value是null還是沒有map上,相對的HashMap卻可以允許null值,在於其使用在單線程環境下,可以使用containKey(key)方法提前判定是否能map上,從而區分這兩種情況,但是ConcurrentHashMap在多線程使用上下文中則不能這么判定。參考:關於ConcurrentHashMap為什么不能put null

A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.

ConcurrentHashMap個put和get方法,細節請看代碼對應位置的注釋。

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin,當前hash對應的bin(桶)還不存在時,使用cas寫入; 寫入失敗,則再次嘗試。
            }
            else if ((fh = f.hash) == MOVED) //如果tab[i]不為空並且hash值為MOVED,說明該鏈表正在進行transfer操作,返回擴容完成后的table
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {          // 加鎖保證線程安全,但不是對整個table加鎖,只對當前的Node加鎖,避免其他線程對當前Node進行寫操作。 if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) { //如果在鏈表中找到值為key的節點e,直接設置e.val = value即可
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e; //如果沒有找到值為key的節點,直接新建Node並加入鏈表即可
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  //如果首節點為TreeBin類型,說明為紅黑樹結構,執行putTreeVal操作
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD) //如果節點數大於閾值,則轉換鏈表結構為紅黑樹結構
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); //計數增加1,有可能觸發transfer操作(擴容)
     return null; 
    }
  transient volatile Node<K,V>[] table; //元素所在的table是volatile類型,線程間可見


public V get(Object key) { //get無需更改size和count等公共屬性,加上table是volatile類型,故而無需加鎖。 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

思考一個問題:為什么當新加Node對應的‘桶’不存在時可以直接使用CAS操作新增該桶,並插入新節點,但是當新增Node對應的‘桶’存在時,則必須加鎖處理?

參考資料:Java並發編程總結4——ConcurrentHashMap在jdk1.8中的改進

               java1.7 ConcurrentHashMap實現細節

附上HashMap jdk 1.8版本中的實現原理講解,講的很細也很通俗易懂:Jdk1.8中的HashMap實現原理

ConcurrentLinkedQueue

ConcurrentLinkedQueue使用鏈表作為數據結構,它采用無鎖操作,可以任務是高並發環境下性能最好的隊列。

ConcurrentLinkedQueue是非阻塞線程安全隊列,無界,故不太適合做生產者消費者模式,而LinkedBlockingQueue是阻塞線程安全隊列,可以做到有界,通常用於生產者消費者模式。

下面看下其offer()方法的源碼,體會下:不使用鎖,只是用CAS操作來保證線程安全。細節參考代碼對應位置的注釋。

    
/**
* 不斷嘗試:找到最新的tail節點,不斷嘗試想最新的tail節點后面添加新節點
     */
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { //不斷嘗試:找到最新的tail節點,不斷嘗試想最新的tail節點后面添加新節點。 Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time //t引用有可能並不是真實的tail節點的引用,多線程操作時,允許該情況出現,只要能保證每次新增元素是在真實的tail節點上添加的即可。 casTail(t, newNode); // Failure is OK. 即使失敗,也不影響下次offer新的元素,反正后面會試圖尋找到最新的真實tail元素 return true; } // Lost CAS race to another thread; re-read next CAS競爭失敗,再次嘗試 } else if (p == q) //遇到哨兵節點(next和item相同,空節點或者刪除節點),從head節點重新遍歷。確保找到最新的tail節點 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; //java中'!='運算符不是原子操作,故使用t != (t = tail)做一次判定,如果tail被其他線程更改,則直接使用最新的tail節點返回。 } }

CopyOnWriteArrayList

CopyOnWriteArrayList提供高效地讀取操作,使用在讀多寫少的場景。CopyOnWriteArrayList讀取操作不用加鎖,且是安全的;寫操作時,先copy一份原有數據數組,再對復制數據進行寫入操作,最后將復制數據替換原有數據,從而保證寫操作不影響讀操作。

下面看下CopyOnWriteArrayList的核心代碼,體會下CopyOnWrite的思想:

public class CopyOnWriteArrayList<E>    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();
    /**
     * Sets the array.
     */
    final void setArray(Object[] a) {
        array = a;
    }

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        return array;
    }

    public E get(int index) {
        return get(getArray(), index);
    }

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();   //寫 互斥 讀
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;  //對副本進行修改操作
            setArray(newElements); //將修改后的副本替換原有的數據
            return true;
        } finally {
            lock.unlock();
        }
    }

}

 ConcurrentSkipListMap

SkipList(跳表)是一種隨機性的數據結構,用於替代紅黑樹,因為它在高並發的情況下,性能優於紅黑樹。跳表實際上是以空間換取時間。跳表的基本模型示意圖如下:

ConcurrentSkipListMap的實現就是實現了一個無鎖版的跳表,主要是利用無鎖的鏈表的實現來管理跳表底層,同樣利用CAS來完成替換。

參考資料

從零單排 Java Concurrency, SkipList&ConcurrnetSkipListMap

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM