Java編程的邏輯 (76) - 並發容器 - 各種隊列


本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接http://item.jd.com/12299018.html


本節,我們來探討Java並發包中的各種隊列。Java並發包提供了豐富的隊列類,可以簡單分為:

  • 無鎖非阻塞並發隊列:ConcurrentLinkedQueue和ConcurrentLinkedDeque
  • 普通阻塞隊列:基於數組的ArrayBlockingQueue,基於鏈表的LinkedBlockingQueue和LinkedBlockingDeque
  • 優先級阻塞隊列:PriorityBlockingQueue
  • 延時阻塞隊列:DelayQueue
  • 其他阻塞隊列:SynchronousQueue和LinkedTransferQueue

無鎖非阻塞是這些隊列不使用鎖,所有操作總是可以立即執行,主要通過循環CAS實現並發安全,阻塞隊列是指這些隊列使用鎖和條件,很多操作都需要先獲取鎖或滿足特定條件,獲取不到鎖或等待條件時,會等待(即阻塞),獲取到鎖或條件滿足再返回。

這些隊列迭代都不會拋出ConcurrentModificationException,都是弱一致的,后面就不單獨強調了。下面,我們來簡要探討每類隊列的用途、用法和基本實現原理。

無鎖非阻塞並發隊列

有兩個無鎖非阻塞隊列:ConcurrentLinkedQueue和ConcurrentLinkedDeque,它們適用於多個線程並發使用一個隊列的場合,都是基於鏈表實現的,都沒有限制大小,是無界的,與ConcurrentSkipListMap類似,它們的size方法不是一個常量運算,不過這個方法在並發應用中用處也不大。

ConcurrentLinkedQueue實現了Queue接口,表示一個先進先出的隊列,從尾部入隊,從頭部出隊,內部是一個單向鏈表。ConcurrentLinkedDeque實現了Deque接口,表示一個雙端隊列,在兩端都可以入隊和出隊,內部是一個雙向鏈表。它們的用法類似於LinkedList,我們就不贅述了。

這兩個類最基礎的原理是循環CAS,ConcurrentLinkedQueue的算法基於一篇論文:"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" (https://www.research.ibm.com/people/m/michael/podc-1996.pdf),ConcurrentLinkedDeque擴展了ConcurrentLinkedQueue的技術,但它們的具體實現都非常復雜,我們就不探討了。

普通阻塞隊列

除了剛介紹的兩個隊列,其他隊列都是阻塞隊列,都實現了接口BlockingQueue,在入隊/出隊時可能等待,主要方法有:

//入隊,如果隊列滿,等待直到隊列有空間
void put(E e) throws InterruptedException;
//出隊,如果隊列空,等待直到隊列不為空,返回頭部元素
E take() throws InterruptedException;
//入隊,如果隊列滿,最多等待指定的時間,如果超時還是滿,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出隊,如果隊列空,最多等待指定的時間,如果超時還是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;

普通阻塞隊列是常用的隊列,常用於生產者/消費者模式。

ArrayBlockingQueue和LinkedBlockingQueue都是實現了Queue接口,表示先進先出的隊列,尾部進,頭部出,而LinkedBlockingDeque實現了Deque接口,是一個雙端隊列。

ArrayBlockingQueue是基於循環數組實現的,有界,創建時需要指定大小,且在運行過程中不會改變,這與我們在容器類中介紹的ArrayDeque是不同的,ArrayDeque也是基於循環數組實現的,但是是無界的,會自動擴展。

LinkedBlockingQueue是基於單向鏈表實現的,在創建時可以指定最大長度,也可以不指定,默認是無限的,節點都是動態創建的。LinkedBlockingDeque與LinkedBlockingQueue一樣,最大長度也是在創建時可選的,默認無限,不過,它是基於雙向鏈表實現的。

內部,它們都是使用顯式鎖ReentrantLock顯式條件Condition實現的。

ArrayBlockingQueue的實現很直接,有一個數組存儲元素,有兩個索引表示頭和尾,有一個變量表示當前元素個數,有一個鎖保護所有訪問,有兩個條件,"不滿"和"不空"用於協作,成員聲明如下:

final Object[] items;
int takeIndex; //
int putIndex; //
int count; //元素個數
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

實現思路與我們在72節實現的類似,就不贅述了。

與ArrayBlockingQueue類似,LinkedBlockingDeque也是使用一個鎖和兩個條件,使用鎖保護所有操作,使用"不滿"和"不空"兩個條件,LinkedBlockingQueue稍微不同,因為它使用鏈表,且只從頭部出隊、從尾部入隊,它做了一些優化,使用了兩個鎖,一個保護頭部,一個保護尾部,每個鎖關聯一個條件。

優先級阻塞隊列

普通阻塞隊列是先進先出的,而優先級隊列是按優先級出隊的,優先級高的先出,我們在容器類中介紹過優先級隊列PriorityQueue及其背后的數據結構

PriorityBlockingQueue是PriorityQueue的並發版本,與PriorityQueue一樣,它沒有大小限制,是無界的,內部的數組大小會動態擴展,要求元素要么實現Comparable接口,要么創建PriorityBlockingQueue時提供一個Comparator對象。

與PriorityQueue的區別是,PriorityBlockingQueue實現了BlockingQueue接口,在隊列為空時,take方法會阻塞等待。

另外,PriorityBlockingQueue是線程安全的,它的基本實現原理與PriorityQueue是一樣的,也是基於堆,但它使用了一個鎖ReentrantLock保護所有訪問,使用了一個條件協調阻塞等待。

延時阻塞隊列

延時阻塞隊列DelayQueue是一種特殊的優先級隊列,它也是無界的,它要求每個元素都實現Delayed接口,該接口的聲明為:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

Delayed擴展了Comparable接口,也就是說,DelayQueue的每個元素都是可比較的,它有一個額外方法getDelay返回一個給定時間單位unit的整數,表示再延遲多長時間,如果小於等於0,表示不再延遲。

DelayQueue也是優先級隊列,它按元素的延時時間出隊,它的特殊之處在於,只有當元素的延時過期之后才能被從隊列中拿走,也就是說,take方法總是返回第一個過期的元素,如果沒有,則阻塞等待。

DelayQueue可以用於實現定時任務,我們看段簡單的示例代碼:

public class DelayedQueueDemo {
    private static final AtomicLong taskSequencer = new AtomicLong(0);

    static class DelayedTask implements Delayed {
        private long runTime;
        private long sequence;
        private Runnable task;

        public DelayedTask(int delayedSeconds, Runnable task) {
            this.runTime = System.currentTimeMillis() + delayedSeconds * 1000;
            this.sequence = taskSequencer.getAndIncrement();
            this.task = task;
        }

        @Override
        public int compareTo(Delayed o) {
            if (o == this) {
                return 0;
            }
            if (o instanceof DelayedTask) {
                DelayedTask other = (DelayedTask) o;
                if (runTime < other.runTime) {
                    return -1;
                } else if (runTime > other.runTime) {
                    return 1;
                } else if (sequence < other.sequence) {
                    return -1;
                } else {
                    return 1;
                }
            }
            throw new IllegalArgumentException();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runTime - System.currentTimeMillis(),
                    TimeUnit.MICROSECONDS);
        }

        public Runnable getTask() {
            return task;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedTask> tasks = new DelayQueue<>();
        tasks.put(new DelayedTask(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("execute delayed task");
            }
        }));

        DelayedTask task = tasks.take();
        task.getTask().run();
    }
}

DelayedTask表示延時任務,只有延時過期后任務才會執行,任務按延時時間排序,延時一樣的按照入隊順序排序。

內部,DelayQueue是基於PriorityQueue實現的,它使用一個鎖ReentrantLock保護所有訪問,使用一個條件available表示頭部是否有元素,當頭部元素的延時未到時,take操作會根據延時計算需睡眠的時間,然后睡眠,如果在此過程中有新的元素入隊,且成為頭部元素,則阻塞睡眠的線程會被提前喚醒然后重新檢查。以上是基本思路,DelayQueue的實現有一些優化,以減少不必要的喚醒,具體我們就不探討了。

其他阻塞隊列

Java並發包中還有兩個特殊的阻塞隊列,SynchronousQueue和LinkedTransferQueue。

SynchronousQueue

SynchronousQueue與一般的隊列不同,它不算一種真正的隊列,它沒有存儲元素的空間,存儲一個元素的空間都沒有。它的入隊操作要等待另一個線程的出隊操作,反之亦然。如果沒有其他線程在等待從隊列中接收元素,put操作就會等待。take操作需要等待其他線程往隊列中放元素,如果沒有,也會等待。SynchronousQueue適用於兩個線程之間直接傳遞信息、事件或任務。

LinkedTransferQueue

LinkedTransferQueue實現了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些額外功能,生產者在往隊列中放元素時,可以等待消費者接收后再返回,適用於一些消息傳遞類型的應用中。TransferQueue的接口定義為:

public interface TransferQueue<E> extends BlockingQueue<E> {
    //如果有消費者在等待(執行take或限時的poll),直接轉給消費者,
    //返回true,否則返回false,不入隊
    boolean tryTransfer(E e);
    //如果有消費者在等待,直接轉給消費者,
    //否則入隊,阻塞等待直到被消費者接收后再返回
    void transfer(E e) throws InterruptedException;
    //如果有消費者在等待,直接轉給消費者,返回true
    //否則入隊,阻塞等待限定的時間,如果最后被消費者接收,返回true
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    //是否有消費者在等待
    boolean hasWaitingConsumer();
    //等待的消費者個數
    int getWaitingConsumerCount();
}

LinkedTransferQueue是基於鏈表實現的、無界的TransferQueue,具體實現比較復雜,我們就不探討了。

小結

本節簡要介紹了Java並發包中的各種隊列,包括其基本概念和基本原理。

73節到本節,我們介紹了Java並發包的各種容器,至此,就介紹完了,在實際開發中,應該盡量使用這些現成的容器,而非重新發明輪子。

Java並發包中還提供了一種方便的任務執行服務,使用它,可以將要執行的並發任務與線程的管理相分離,大大簡化並發任務和線程的管理,讓我們下一節來探討。

(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)

----------------

未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM