之前因為找實習的緣故,博客1個多月沒有寫了。找實習的經歷總算告一段落,現在重新更新博客,這次的內容是分析Java並發包中的阻塞隊列
關於阻塞隊列,我之前是一直充滿好奇,很好奇這個阻塞是怎么實現。現在我們先看一個該抽象類的實現類ArrayBlockingQueue。下面全部的代碼均在github
ArrayBlockingQueue
ArrayBlockingQueue顧名思義是一種數組形式的阻塞隊列,其自然就有數組的特點,即隊列的長度不可改變,只有初始化的時候指定。
下面,我們看一下例子。
public class ArrayBlock {
private BlockingQueue<String> blockingQueue;
public ArrayBlock(){
blockingQueue = new ArrayBlockingQueue<String>(3);
}
public BlockingQueue<String> getBlockingQueue() {
return blockingQueue;
}
}
創建一個大小為3的ArrayBlockingQueue,下面是一個生產者和消費者,通過ArrayBlockingQueue實現生產者/消費者模型。
public class Producer extends Thread {
private BlockingQueue<String> blockingQueue;
@Override
public void run() {
super.run();
for (int i = 0 ; i < 5;i++) {
try {
blockingQueue.put(i + "");
System.out.println(getName() + " 生產數據");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Producer(ArrayBlock arrayBlock){
this.setName("Producer");
blockingQueue = arrayBlock.getBlockingQueue();
}
}
public class Costumer extends Thread{
private BlockingQueue<String> blockingQueue;
public Costumer(ArrayBlock arrayBlock) {
blockingQueue = arrayBlock.getBlockingQueue();
this.setName("Costumer");
}
@Override
public void run() {
super.run();
while (true) {
try {
Thread.sleep(6000);
String str = blockingQueue.take();
System.out.println(getName() + " 取出數據 " + str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
測試過程就不放了,直接放出結果:
Producer 生產數據
Producer 生產數據
Producer 生產數據
Costumer 取出數據 0
Producer 生產數據
Costumer 取出數據 1
Producer 生產數據
Costumer 取出數據 2
Costumer 取出數據 3
Costumer 取出數據 4
這可以看出put方法與take方法均是阻塞的方法。當隊列已經滿的時候,就會阻塞放入方法,當隊列為空的時候,就會阻塞取出方法。
下面,我們主要看這個兩個方法,究竟是如何實現阻塞的。
** put方法 **
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
put方法是將元素放入到隊列中,這里面可以看出是用過Lock類與Condition類來實現的,即通過等待/通知機制實現的阻塞隊列。這里notFull是一個條件,當隊列已經滿的時候,就會執行await方法,如果沒有滿就執行入隊(enqueue)方法。這里,判斷隊列已滿用的是count == items.length。接下來,我們看一下take方法,來看看取數據的阻塞。
** take方法**
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
這里,與put方法類似,當元素為0時,就會執行await方法,上面方法中都沒有直接說明signal方法的執行。其實該方法是入隊與出隊的方法中實現的。也就是當執行notFull.await()時,是通過dequeue()方法來通知停止等待的,可以放入元素。當執行到notEmpty.await()時,是通過enqueue來通知結束阻塞,可以取出元素。
LinkedBlockingQueue
LinkedBlockingQueue顧名思義是一個鏈表形式的阻塞隊列,不同於ArrayBlockingQueue。如果不指定容量,則默認是Integer.MAX_VALUE。也就是說他是一個無界阻塞隊列。他的例子與上面的類似,但是其put與take方法實現不同於ArrayBlockingQueue,但兩者大致思路一致。我們只看一下put實現:
** put方法**
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
這里阻塞的本質實現也是通過Condition類的等待/通知機制。但是有幾點不同:
第一 這里用了一個原子類的count計數,官方的給的注釋是即使沒有鎖來提供保護,也能保證線程安全,實現wait guard。
第二 ArrayBlockingQueue的通知是在入隊與出隊的方法中,LinkedBlockingQueue則不是,並且插入之后不滿的時候,還有通知其他await的線程。
第三 ArrayBlockingQueue的lock一直是一個,也就是put/take是用的一個鎖,放與取無法實現並行。但是LinkedBlockingQueue是兩個鎖,放一個鎖,取一個鎖,可以實現put/take的並行,要高效一些。
SynchronousQueue
SynchronousQueue顧名思義是同步隊列,特點不同於上面的阻塞隊列,他是一個無界非緩存的隊列,准確說他不存儲元素,放入的元素,只有等待取走元素之后才能放入。也就是說任意時刻:
- isEmpty()法永遠返回是true
- remainingCapacity() 方法永遠返回是0
- remove()和removeAll() 方法永遠返回是false
- iterator()方法永遠返回空
- peek()方法永遠返回null
元素並不會被生產者存在隊列中,而是直接生產者與消費者進行交互。
其實現是利用無鎖算法,可以參考SynchronousQueue實現
還有一點需要注意,同步隊列支持公平性與非公平性。公平性是利用隊列來管理多余生產者與消費者,非公平性是利用棧來管理多余生產者與消費者。
