最近在看一些java基礎的東西,看到了隊列這章,打算對復習的一些知識點做一個筆記,也算是對自己思路的一個整理,本章先聊聊java中的阻塞隊列
參考文章:
http://ifeve.com/java-blocking-queue/
https://blog.csdn.net/u014082714/article/details/52215130
由上圖可以用看出java中的阻塞隊列都實現了 BlockingQueue接口,BlockingQueue又繼承自Queue
1、什么是阻塞隊列?
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
阻塞隊列提供了四種處理方法:
- 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
- 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
- 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
2.、Java里的阻塞隊列
JDK7提供了7個阻塞隊列。分別是
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
ArrayBlockingQueue
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列:
ArrayBlockingQueue fairQueue =
new
ArrayBlockingQueue(
1000
,
true
);
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
通過源碼我們可以看到,構造器第一個參數是指定有界隊列的大小(及數組的大小),第二個參數指定是否使用公平鎖,這里可以看到阻塞隊列的公平訪問隊列是通過重入鎖來實現的(關於重入鎖我們在別的章節介紹)
下邊我們結合源碼對其提供的方法做一個簡單分析
關於構造器相關說明
/** * * 構造函數,設置隊列的初始容量 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 構造函數。capacity設置數組大小 ,fair設置是否為公平鎖 * capacity and the specified access policy. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//是否為公平鎖,如果是的話,那么先到的線程先獲得鎖對象。 //否則,由操作系統調度由哪個線程獲得鎖,一般為false,性能會比較高 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** *構造函數,帶有初始內容的隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); //要給數組設置內容,先上鎖 try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e;//依次拷貝內容 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i;//如果putIndex大於數組大小 ,那么從0重新開始 } finally { lock.unlock();//最后一定要釋放鎖 } }
關於方法的說明
/** * 添加一個元素,其實super.add里面調用了offer方法 */ public boolean add(E e) { return super.add(e); }
/**
* 當調用offer方法返回false時,直接拋出異常
*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
}
/** *加入成功返回true,否則返回false * */ public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock();//上鎖 try { if (count == items.length) //超過數組的容量 return false; else { enqueue(e); //放入元素 return true; } } finally { lock.unlock(); } } /** * 如果隊列已滿的話,就會等待 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//和lock()方法的區別是讓它在阻塞時也可拋出異常跳出 try { while (count == items.length) notFull.await(); //這里就是阻塞了,要注意。如果運行到這里,那么它會釋放上面的鎖,一直等到notify enqueue(e); } finally { lock.unlock(); } } /** * 帶有超時時間的插入方法,unit表示是按秒、分、時哪一種 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos);//帶有超時等待的阻塞方法 } enqueue(e);//入隊 return true; } finally { lock.unlock(); } } //實現的方法,如果當前隊列為空,返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //實現的方法,如果當前隊列為空,一直阻塞 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//隊列為空,阻塞方法 return dequeue(); } finally { lock.unlock(); } } //帶有超時時間的取元素方法,否則返回Null public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos);//超時等待 } return dequeue();//取得元素 } finally { lock.unlock(); } } //只是看一個隊列最前面的元素,取出是不刪除隊列中的原來元素。隊列為空時返回null public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // 隊列為空時返回null } finally { lock.unlock(); } } /** * 返回隊列當前元素個數 * */ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } /** * 返回當前隊列再放入多少個元素就滿隊 */ public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } } /** * 從隊列中刪除一個元素的方法。刪除成功返回true,否則返回false */ public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); //真正刪除的方法 return true; } if (++i == items.length) i = 0; } while (i != putIndex);//一直不斷的循環取出來做判斷 } return false; } finally { lock.unlock(); } } /** * 是否包含一個元素 */ public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) return true; if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } /** * 清空隊列 * */ public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int k = count; if (k > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { items[i] = null; if (++i == items.length) i = 0; } while (i != putIndex); takeIndex = putIndex; count = 0; if (itrs != null) itrs.queueIsEmpty(); for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal(); } } finally { lock.unlock(); } } /** * 取出所有元素到集合 */ public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } /** * 取出所有元素到集合 */ public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); items[take] = null; if (++take == items.length) take = 0; i++; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }