1. 什么是阻塞隊列
阻塞隊列(BlockingQueue)是 Java 5 並發新特性中的內容,阻塞隊列的接口是 java.util.concurrent.BlockingQueue,它提供了兩個附加操作:當隊列中為空時,從隊列中獲取元素的操作將被阻塞;當隊列滿時,向隊列中添加元素的操作將被阻塞。
阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器。
阻塞隊列提供了四種操作方法:
- 拋出異常:當隊列滿時,再向隊列中插入元素,則會拋出IllegalStateException異常。當隊列空時,再向隊列中獲取元素,則會拋出NoSuchElementException異常。
- 返回特殊值:當隊列滿時,向隊列中添加元素,則返回false,否則返回true。當隊列為空時,向隊列中獲取元素,則返回null,否則返回元素。
- 一直阻塞:當阻塞隊列滿時,如果生產者向隊列中插入元素,則隊列會一直阻塞當前線程,直到隊列可用或響應中斷退出。當阻塞隊列為空時,如果消費者線程向阻塞隊列中獲取數據,則隊列會一直阻塞當前線程,直到隊列空閑或響應中斷退出。
- 超時退出:當隊列滿時,如果生產線程向隊列中添加元素,則隊列會阻塞生產線程一段時間,超過指定的時間則退出返回false。當隊列為空時,消費線程從隊列中移除元素,則隊列會阻塞一段時間,如果超過指定時間退出返回null。
2. Java中的阻塞隊列
JDK7提供了7個阻塞隊列。分別是
下面分別簡單介紹一下:
-
ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】
- LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度為Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
- PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
- DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
- SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
- LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
-
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程並發時,可以將鎖的競爭最多降到一半。
Java中線程安全的內置隊列還有兩個:ConcurrentLinkedQueue和LinkedTransferQueue,它們使用了CAS這種無鎖的方式來實現了線程安全的隊列。無鎖的方式性能好,但是隊列是無界的,用在生產系統中,生產者生產速度過快,可能導致內存溢出。有界的阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue,為了減少Java的垃圾回收對系統性能的影響,會盡量選擇array/heap格式的數據結構。這樣的話就只剩下ArrayBlockingQueue。(先埋個坑在這兒,近來接觸到了disruptor,感覺妙不可言。disruptor)
3. 阻塞隊列的實現原理
這里分析下ArrayBlockingQueue的實現原理。
構造方法:
ArrayBlockingQueue(int capacity); ArrayBlockingQueue(int capacity, boolean fair); ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
ArrayBlockingQueue提供了三種構造方法,參數含義如下:
- capacity:容量,即隊列大小。
- fair:是否公平鎖。
- c:隊列初始化元素,順序按照Collection遍歷順序。
插入元素:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
從源碼可以看出,生產者首先獲得鎖lock,然后判斷隊列是否已經滿了,如果滿了,則等待,直到被喚醒,然后調用enqueue插入元素。
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null;
final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
以上是enqueue的實現,實現的操作是插入元素到一個環形數組,然后喚醒notEmpty上阻塞的線程。
獲取元素:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
從源碼可以看出,消費者首先獲得鎖,然后判斷隊列是否為空,為空,則等待,直到被喚醒,然后調用dequeue獲取元素。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null;
final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
以上是dequeue的實現,獲取環形數組當前takeIndex的元素,並及時將當前元素置為null,設置下一次takeIndex的值takeIndex++,然后喚醒notFull上阻塞的線程。
還有其他方法offer(E e)
、poll()
、add(E e)
、remove()
、 offer(E e, long timeout, TimeUnit unit)
等的實現,因為常用take和put,這些方法就不一一贅述了。
4. 阻塞隊列的基本使用
使用阻塞隊列實現生產者-消費者模式:
/** * Created by noly on 2017/5/19. */
public class BlockingQueueTest { public static void main (String[] args) { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10); Consumer consumer = new Consumer(queue); Producer producer = new Producer(queue); producer.start(); consumer.start(); } } class Consumer extends Thread { private ArrayBlockingQueue<Integer> queue; public Consumer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { while(true) { try { Integer i = queue.take(); System.out.println("消費者從隊列取出元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { private ArrayBlockingQueue<Integer> queue; public Producer(ArrayBlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { for (int i = 0; i < 100; i++) { try { queue.put(i); System.out.println("生產者向隊列插入元素:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
如果不使用阻塞隊列,使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式,考慮線程間的通訊,會非常麻煩。