BlockingQueue深入分析


1.BlockingQueue定義的常用方法如下
  拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用

1)add(anObject):anObject加到BlockingQueue,即如果BlockingQueue可以容納,則返回true,否則招聘異常

2)offer(anObject):表示如果可能的話,anObject加到BlockingQueue,即如果BlockingQueue可以容納,則返回true,否則返回false.

3)put(anObject):anObject加到BlockingQueue,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續.

4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null

5)take():取走BlockingQueue里排在首位的對象,BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止

其中:BlockingQueue 不接受null 元素。試圖addput 或offer 一個null 元素時,某些實現會拋出NullPointerExceptionnull 被用作指示poll 操作失敗的警戒值。 

2、BlockingQueue的幾個注意點

【1】BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何內部容量約束的BlockingQueue 總是報告Integer.MAX_VALUE 的剩余容量。

【2】BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持Collection 接口。因此,舉例來說,使用remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常 會有效執行,只能有計划地偶爾使用,比如在取消排隊信息時。

【3】BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖或其他形式的並發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。

【4】BlockingQueue 實質上不支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream 或poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。

3、簡要概述BlockingQueue常用的四個實現類


1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的

3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序.

4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.

    
其中LinkedBlockingQueueArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue.  

下面主要看一下ArrayBlockingQueue的源碼:
public boolean offer(E e) {    
        if (e == null) throw new NullPointerException();    
        final ReentrantLock lock = this.lock;//每個對象對應一個顯示的鎖    
        lock.lock();//請求鎖直到獲得鎖(不可以被interrupte)    
        try {    
            if (count == items.length)//如果隊列已經滿了    
                return false;    
            else {    
                insert(e);    
                return true;    
            }    
        } finally {    
            lock.unlock();//    
        }    
}    
看insert方法:    
private void insert(E x) {    
        items[putIndex] = x;    
        //增加全局index的值。    
        /*   
        Inc方法體內部:   
        final int inc(int i) {   
        return (++i == items.length)? 0 : i;   
            }   
        這里可以看出ArrayBlockingQueue采用從前到后向內部數組插入的方式插入新元素的。如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中隊列為滿)   
        */   
        putIndex = inc(putIndex);     
        ++count;    
        notEmpty.signal();//wake up one waiting thread    
}    
public void put(E e) throws InterruptedException {    
        if (e == null) throw new NullPointerException();    
        final E[] items = this.items;    
        final ReentrantLock lock = this.lock;    
        lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted    
        try {    
            try {    
                while (count == items.length)//如果滿了,當前線程進入noFull對應的等waiting狀態    
                    notFull.await();    
            } catch (InterruptedException ie) {    
                notFull.signal(); // propagate to non-interrupted thread    
                throw ie;    
            }    
            insert(e);    
        } finally {    
            lock.unlock();    
        }    
}    
public boolean offer(E e, long timeout, TimeUnit unit)    
        throws InterruptedException {    
   
        if (e == null) throw new NullPointerException();    
    long nanos = unit.toNanos(timeout);    
        final ReentrantLock lock = this.lock;    
        lock.lockInterruptibly();    
        try {    
            for (;;) {    
                if (count != items.length) {    
                    insert(e);    
                    return true;    
                }    
                if (nanos <= 0)    
                    return false;    
                try {    
                //如果沒有被 signal/interruptes,需要等待nanos時間才返回    
                    nanos = notFull.awaitNanos(nanos);    
                } catch (InterruptedException ie) {    
                    notFull.signal(); // propagate to non-interrupted thread    
                    throw ie;    
                }    
            }    
        } finally {    
            lock.unlock();    
        }    
    }    
public boolean add(E e) {    
    return super.add(e);    
}    
父類:    
public boolean add(E e) {    
        if (offer(e))    
            return true;    
        else   
            throw new IllegalStateException("Queue full");    
    }  

該類中有幾個實例變量:takeIndex/putIndex/count

用三個數字來維護這個隊列中的數據變更:    
    /** items index for next take, poll or remove */   
    private int takeIndex;    
    /** items index for next put, offer, or add. */   
    private int putIndex;    
    /** Number of items in the queue */   
    private int count;    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM