1. 什么是阻塞隊列
阻塞隊列(BlockingQueue)是 Java 5 並發新特性中的內容,阻塞隊列的接口是 java.util.concurrent.BlockingQueue,它提供了兩個附加操作:當隊列中為空時,從隊列中獲取元素的操作將被阻塞;當隊列滿時,向隊列中添加元素的操作將被阻塞。
阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器。
阻塞隊列提供了四種操作方法:
- 拋出異常:當隊列滿時,再向隊列中插入元素,則會拋出IllegalStateException異常。當隊列空時,再向隊列中獲取元素,則會拋出NoSuchElementException異常。
- 返回特殊值:當隊列滿時,向隊列中添加元素,則返回false,否則返回true。當隊列為空時,向隊列中獲取元素,則返回null,否則返回元素。
- 一直阻塞:當阻塞隊列滿時,如果生產者向隊列中插入元素,則隊列會一直阻塞當前線程,直到隊列可用或響應中斷退出。當阻塞隊列為空時,如果消費者線程向阻塞隊列中獲取數據,則隊列會一直阻塞當前線程,直到隊列空閑或響應中斷退出。
- 超時退出:當隊列滿時,如果生產線程向隊列中添加元素,則隊列會阻塞生產線程一段時間,超過指定的時間則退出返回false。當隊列為空時,消費線程從隊列中移除元素,則隊列會阻塞一段時間,如果超過指定時間退出返回null。
2. Java中的阻塞隊列
JDK7提供了7個阻塞隊列。分別是
下面分別簡單介紹一下:
-
ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】
-
LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度為Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
-
PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
-
DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
-
SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
-
LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
-
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程並發時,可以將鎖的競爭最多降到一半。
Java中線程安全的內置隊列還有兩個:ConcurrentLinkedQueue和LinkedTransferQueue,它們使用了CAS這種無鎖的方式來實現了線程安全的隊列。無鎖的方式性能好,但是隊列是無界的,用在生產系統中,生產者生產速度過快,可能導致內存溢出。有界的阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue,為了減少Java的垃圾回收對系統性能的影響,會盡量選擇array/heap格式的數據結構。這樣的話就只剩下ArrayBlockingQueue。(先埋個坑在這兒,近來接觸到了disruptor,感覺妙不可言。disruptor)
3. 阻塞隊列的實現原理
這里分析下ArrayBlockingQueue的實現原理。
構造方法:
ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
ArrayBlockingQueue提供了三種構造方法,參數含義如下:
- capacity:容量,即隊列大小。
- fair:是否公平鎖。
- c:隊列初始化元素,順序按照Collection遍歷順序。
插入元素:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
從源碼可以看出,生產者首先獲得鎖lock,然后判斷隊列是否已經滿了,如果滿了,則等待,直到被喚醒,然后調用enqueue插入元素。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
以上是enqueue的實現,實現的操作是插入元素到一個環形數組,然后喚醒notEmpty上阻塞的線程。
獲取元素:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
從源碼可以看出,消費者首先獲得鎖,然后判斷隊列是否為空,為空,則等待,直到被喚醒,然后調用dequeue獲取元素。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
以上是dequeue的實現,獲取環形數組當前takeIndex的元素,並及時將當前元素置為null,設置下一次takeIndex的值takeIndex++,然后喚醒notFull上阻塞的線程。
還有其他方法offer(E e)
、poll()
、add(E e)
、remove()
、 offer(E e, long timeout, TimeUnit unit)
等的實現,因為常用take和put,這些方法就不一一贅述了。
4. 阻塞隊列的基本使用
使用阻塞隊列實現生產者-消費者模式:
/**
* Created by noly on 2017/5/19.
*/
public class BlockingQueueTest {
public static void main (String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Consumer consumer = new Consumer(queue);
Producer producer = new Producer(queue);
producer.start();
consumer.start();
}
}
class Consumer extends Thread {
private ArrayBlockingQueue<Integer> queue;
public Consumer(ArrayBlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
Integer i = queue.take();
System.out.println("消費者從隊列取出元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
private ArrayBlockingQueue<Integer> queue;
public Producer(ArrayBlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.put(i);
System.out.println("生產者向隊列插入元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
如果不使用阻塞隊列,使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式,考慮線程間的通訊,會非常麻煩。
參考資料: