Concurrent下的線程安全集合


1.ArrayBlockingQueue

ArrayBlockingQueue是由數組支持的線程安全的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。此類支持對等待的生產者線程和消費者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。 

 public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

從改造方法可以看出,ArrayBlockingQueue的實現機制是ReentrantLock和Condition來實現的。

 

2、LinkedBlockingDeque

LinkedBlockingDeque是用雙向鏈表實現的,需要說明的是LinkedList也已經加入了Deque的一部分

   /** Maximum number of items in the deque */
    private final int capacity;
 /**
     * Creates a {@code LinkedBlockingDeque} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
 public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
  1. 要想支持阻塞功能,隊列的容量一定是固定的,否則無法在入隊的時候掛起線程。也就是capacity是final類型的。
  2. 既然是雙向鏈表,每一個結點就需要前后兩個引用,這樣才能將所有元素串聯起來,支持雙向遍歷。也即需要prev/next兩個引用。
  3. 雙向鏈表需要頭尾同時操作,所以需要first/last兩個節點,當然可以參考LinkedList那樣采用一個節點的雙向來完成,那樣實現起來就稍微麻煩點。
  4. 既然要支持阻塞功能,就需要鎖和條件變量來掛起線程。這里使用一個鎖兩個條件變量來完成此功能。

 

3、LinkedBlockingQueue

LinkedBlockingQueue是一個基於已鏈接節點的、范圍任意的blocking queue的實現,也是線程安全的。按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。

 /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

  可選的容量范圍構造方法參數作為防止隊列過度擴展的一種方法。如果未指定容量,則它等於 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入后會動態地創建鏈接節點。

 此外它還不接受null值:

  public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();

 

4、PriorityBlockingQueue

PriorityBlockingQueue是一個無界的線程安全的阻塞隊列,它使用與PriorityQueue相同的順序規則,並且提供了阻塞檢索的操作。

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
  public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

從其構造方法可以看到到,有一個Comparator的接口。沒錯,這個就是判斷元素Priority的關鍵:當前和其他對象比較,如果compare方法返回負數,那么在隊列里面的優先級就比較高。當然,你在創建PriorityBlockingQueue的時候可以不指定Comparator對象,但是你被要求在被存放元素中去實現。

  public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
  private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

  每次offer元素,都會有一個siftUpComparable操作,也就是排序,如果沒有構造的時候傳入自己實現的比較器,就采用自然排序,否則采用比較器規則,進行二分查找,比較,保持列頭是比較器希望的那個最大或則最小元素。

 

5、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentLinkedDeque

ConcurrentHashMap支持高並發、高吞吐量的線程安全HashMap實現,其實現原理是鎖分離機制,將數據分Segment管理。每個Segment擁有獨立的鎖。

    /**
     * The segments, each of which is a specialized hash table
     */
    final Segment<K,V>[] segments;

 下面代碼是Hash鏈中的元素:

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }

  我們可以看到Key,hash,HashEntry都是final類型的,這就決定了ConcurrentHashMap的必須在鏈表頭插入,修改也只能從鏈表頭開始遍歷找到對應Key的元素進行修改,而刪除這就需要將要刪除節點的前面所有節點整個復制一遍,最后一個節點指向要刪除結點的下一個結點。注意到Value使用了volatile修飾,這樣程序在讀的時候就不用加鎖也能保證內存可見性。當然,在跨段操作(contains,size)中,還是會獲取全部Segment中的鎖去操作的,盡量避免跨段操作。

  ConcurrentLinkedQueue、ConcurrentLinkedDeque分別是使用單向鏈表和雙向鏈表實現,原理還是鎖分離機制。

7、ConcurrentSkipListMap

 ConcurrentSkipListMap提供了一種線程安全的並發訪問的排序映射表。內部是SkipList(跳表)結構實現,在理論上能夠在O(log(n))時間內完成查找、插入、刪除操作。 在非多線程的情況下,應當盡量使用TreeMap。此外對於並發性相對較低的並行程序可以使用Collections.synchronizedSortedMap將TreeMap進行包裝,也可以提供較好的效率。對於高並發程序,應當使用ConcurrentSkipListMap,能夠提供更高的並發度。同樣,ConcurrentSkipListMap支持Map的鍵值進行排序(參考:http://hi.baidu.com/yao1111yao/item/0f3008163c4b82c938cb306d)

 

  concurrentHashMap與ConcurrentSkipListMap性能測試   在4線程1.6萬數據的條件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。 
 但ConcurrentSkipListMap有幾個ConcurrentHashMap 不能比擬的優點:   1、ConcurrentSkipListMap 的key是有序的。 
2、ConcurrentSkipListMap 支持更高的並發。ConcurrentSkipListMap 的存取時間是log(N),和線程數幾乎無關。也就是說在數據量一定的情況下,並發的線程越多,ConcurrentSkipListMap越能體現出他的優勢(參考:http://wenku.baidu.com/link?url=n40zltjgTbXUuV2CtXX1E4sBila9SI5rBs_qK1flOkwmJThF5ICLpF1xvU504PyUYGxx5RmqDdJdnYljcMro9gQ8AQe7RXgxKVfs2MV1J7m)。 

8、ConcurrentSkipListSet

ConcurrentSkipListSet是線程安全的有序的集合,適用於高並發的場景。ConcurrentSkipListSet和TreeSet,它們雖然都是有序的集合。但是,第一,它們的線程安全機制不同,TreeSet是非線程安全的,而ConcurrentSkipListSet是線程安全的。第二,ConcurrentSkipListSet是通過ConcurrentSkipListMap實現的,而TreeSet是通過TreeMap實現的。

 

9、CopyOnWriteArrayList、CopyOnWriteArraySet

傳統的List在多線程同時讀寫的時候會拋出java.util.ConcurrentModificationException,而CopyOnWriteArrayList是使用CopyOnWrite(寫時復制)技術解決了這個問題,這一般需要很大的開銷,但是當遍歷操作的數量大大超過可變操作的數量時,這種方法可能比其他替代方法更有效。

寫時復制:

 /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

我們可以看到寫的過程中加了鎖,因為如果不加鎖的話,每條線程都會生成一個快照,造成內存消耗。先Arrays.copyOf了一份內存快照,然后寫這份內存快照,寫完最后將這份內存快照的應用轉移到CopyOnWriteArrayList中。

/** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

關於讀,存儲的變量使用volatile關鍵字,可以不加鎖的情況下解決內存可見性的問題。對於CopyOnWriteArraySet而言就簡單多了,只是持有一個CopyOnWriteArrayList,僅僅在add/addAll的時候檢測元素是否存在,如果存在就不加入集合中。

  最后關於CopyOnWrite的建議,由於插入會Copy內存,最后會導致垃圾回收,所以盡量少使用add操作,如果需要,盡量使用批量插入操作。對於經常插入的容器是不建議用這個的。

 

10、DelayQueue

  DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿后保存時間最長的Delayed 元素。根據這個特性么我們可以使用DelayQueue來實現緩存系統、實時調度系統等。

  DelayQueue是一個BlockingQueue,其特化的參數是Delayed。Delayed擴展了Comparable接口,比較的基准為延時的時間值,Delayed接口的實現類getDelay的返回值應為固定值(final)。DelayQueue內部是使用PriorityQueue實現的。我們可以說DelayQueue = BlockingQueue + PriorityQueue + Delayed;

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

查看器take方法的實現,可以了解到它確實是根據元素的延遲期來決定是否可讀的:

  public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

 

11、LinkedTransferQueue

LinkedTransferQueue=ConcurrentLinkedQueue+SynchronousQueue (in “fair” mode)+LinkedBlockingQueue,LinkedTransferQueue實現了一個重要的接口TransferQueue,該接口含有下面幾個重要方法:

1. transfer(E e)
   若當前存在一個正在等待獲取的消費者線程,即立刻移交之;否則,會插入當前元素e到隊列尾部,並且等待進入阻塞狀態,到有消費者線程取走該元素。
2. tryTransfer(E e)
   若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;
   若不存在,則返回false,並且不進入隊列。這是一個不阻塞的操作。
3. tryTransfer(E e, long timeout, TimeUnit unit)
   若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它; 否則將插入元素e到隊列尾部,並且等待被消費者線程獲取消費掉,
   若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素被移除。
4. hasWaitingConsumer()
   判斷是否存在消費者線程
5. getWaitingConsumerCount()
   獲取所有等待獲取元素的消費線程數量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM