Java中的阻塞隊列-ArrayBlockingQueue(一)


最近在看一些java基礎的東西,看到了隊列這章,打算對復習的一些知識點做一個筆記,也算是對自己思路的一個整理,本章先聊聊java中的阻塞隊列

參考文章:

http://ifeve.com/java-blocking-queue/

https://blog.csdn.net/u014082714/article/details/52215130

由上圖可以用看出java中的阻塞隊列都實現了 BlockingQueue接口,BlockingQueue又繼承自Queue

1、什么是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

阻塞隊列提供了四種處理方法:

  • 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

2.、Java里的阻塞隊列

JDK7提供了7個阻塞隊列。分別是

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

 ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
   throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull = lock.newCondition();
}

通過源碼我們可以看到,構造器第一個參數是指定有界隊列的大小(及數組的大小),第二個參數指定是否使用公平鎖,這里可以看到阻塞隊列的公平訪問隊列是通過重入鎖來實現的(關於重入鎖我們在別的章節介紹)

下邊我們結合源碼對其提供的方法做一個簡單分析

關於構造器相關說明

/**
* * 構造函數,設置隊列的初始容量 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 構造函數。capacity設置數組大小 ,fair設置是否為公平鎖 * capacity and the specified access policy. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//是否為公平鎖,如果是的話,那么先到的線程先獲得鎖對象。 //否則,由操作系統調度由哪個線程獲得鎖,一般為false,性能會比較高 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** *構造函數,帶有初始內容的隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //要給數組設置內容,先上鎖 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e;//依次拷貝內容 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;//如果putIndex大於數組大小 ,那么從0重新開始 } finally { lock.unlock();//最后一定要釋放鎖 } }
關於方法的說明

/**
       * 添加一個元素,其實super.add里面調用了offer方法       */       public boolean add(E e) {           return super.add(e);       }    
/**
* 當調用offer方法返回false時,直接拋出異常
*/
     public boolean add(E e) {
       if (offer(e))
           return true;
else
throw new IllegalStateException("Queue full");
}
}

    /** 
     *加入成功返回true,否則返回false 
     *  
     */  
    public boolean offer(E e) {  
        checkNotNull(e);  
        final ReentrantLock lock = this.lock;  
        lock.lock();//上鎖  
        try {  
            if (count == items.length) //超過數組的容量  
                return false;  
            else {  
                enqueue(e); //放入元素  
                return true;  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 如果隊列已滿的話,就會等待 
     */  
    public void put(E e) throws InterruptedException {  
        checkNotNull(e);  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();//和lock()方法的區別是讓它在阻塞時也可拋出異常跳出  
        try {  
            while (count == items.length)  
                notFull.await(); //這里就是阻塞了,要注意。如果運行到這里,那么它會釋放上面的鎖,一直等到notify  
            enqueue(e);  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 帶有超時時間的插入方法,unit表示是按秒、分、時哪一種 
     */  
    public boolean offer(E e, long timeout, TimeUnit unit)  
        throws InterruptedException {  
  
        checkNotNull(e);  
        long nanos = unit.toNanos(timeout);  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            while (count == items.length) {  
                if (nanos <= 0)  
                    return false;  
                nanos = notFull.awaitNanos(nanos);//帶有超時等待的阻塞方法  
            }  
            enqueue(e);//入隊  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //實現的方法,如果當前隊列為空,返回null  
    public E poll() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return (count == 0) ? null : dequeue();  
        } finally {  
            lock.unlock();  
        }  
    }  
     //實現的方法,如果當前隊列為空,一直阻塞  
    public E take() throws InterruptedException {  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            while (count == 0)  
                notEmpty.await();//隊列為空,阻塞方法  
            return dequeue();  
        } finally {  
            lock.unlock();  
        }  
    }  
    //帶有超時時間的取元素方法,否則返回Null  
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
        long nanos = unit.toNanos(timeout);  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            while (count == 0) {  
                if (nanos <= 0)  
                    return null;  
                nanos = notEmpty.awaitNanos(nanos);//超時等待  
            }  
            return dequeue();//取得元素  
        } finally {  
            lock.unlock();  
        }  
    }  
    //只是看一個隊列最前面的元素,取出是不刪除隊列中的原來元素。隊列為空時返回null  
    public E peek() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return itemAt(takeIndex); // 隊列為空時返回null  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 返回隊列當前元素個數 
     * 
     */  
    public int size() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return count;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 返回當前隊列再放入多少個元素就滿隊 
     */  
    public int remainingCapacity() {  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            return items.length - count;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     *  從隊列中刪除一個元素的方法。刪除成功返回true,否則返回false 
     */  
    public boolean remove(Object o) {  
        if (o == nullreturn false;  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            if (count > 0) {  
                final int putIndex = this.putIndex;  
                int i = takeIndex;  
                do {  
                    if (o.equals(items[i])) {  
                        removeAt(i); //真正刪除的方法  
                        return true;  
                    }  
                    if (++i == items.length)  
                        i = 0;  
                } while (i != putIndex);//一直不斷的循環取出來做判斷  
            }  
            return false;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 是否包含一個元素 
     */  
    public boolean contains(Object o) {  
        if (o == nullreturn false;  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            if (count > 0) {  
                final int putIndex = this.putIndex;  
                int i = takeIndex;  
                do {  
                    if (o.equals(items[i]))  
                        return true;  
                    if (++i == items.length)  
                        i = 0;  
                } while (i != putIndex);  
            }  
            return false;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 清空隊列 
     * 
     */  
    public void clear() {  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            int k = count;  
            if (k > 0) {  
                final int putIndex = this.putIndex;  
                int i = takeIndex;  
                do {  
                    items[i] null;  
                    if (++i == items.length)  
                        i = 0;  
                } while (i != putIndex);  
                takeIndex = putIndex;  
                count = 0;  
                if (itrs != null)  
                    itrs.queueIsEmpty();  
                for (; k > 0 && lock.hasWaiters(notFull); k--)  
                    notFull.signal();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    /** 
     * 取出所有元素到集合 
     */  
    public int drainTo(Collection<? super E> c) {  
        return drainTo(c, Integer.MAX_VALUE);  
    }  
  
    /** 
     * 取出所有元素到集合 
     */  
    public int drainTo(Collection<? super E> c, int maxElements) {  
        checkNotNull(c);  
        if (c == this)  
            throw new IllegalArgumentException();  
        if (maxElements <= 0)  
            return 0;  
        final Object[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lock();  
        try {  
            int n = Math.min(maxElements, count);  
            int take = takeIndex;  
            int i = 0;  
            try {  
                while (i < n) {  
                    @SuppressWarnings("unchecked")  
                    E x = (E) items[take];  
                    c.add(x);  
                    items[take] null;  
                    if (++take == items.length)  
                        take = 0;  
                    i++;  
                }  
                return n;  
            } finally {  
                // Restore invariants even if c.add() threw  
                if (i > 0) {  
                    count -= i;  
                    takeIndex = take;  
                    if (itrs != null) {  
                        if (count == 0)  
                            itrs.queueIsEmpty();  
                        else if (i > take)  
                            itrs.takeIndexWrapped();  
                    }  
                    for (; i > 0 && lock.hasWaiters(notFull); i--)  
                        notFull.signal();  
                }  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM