為什么要使用阻塞隊列
之前,介紹了一下 ThreadPoolExecutor 的各參數的含義(並發編程之線程池ThreadPoolExecutor),其中有一個 BlockingQueue,它是一個阻塞隊列。那么,小伙伴們有沒有想過,為什么此處的線程池要用阻塞隊列呢?
我們知道隊列是先進先出的。當放入一個元素的時候,會放在隊列的末尾,取出元素的時候,會從隊頭取。那么,當隊列為空或者隊列滿的時候怎么辦呢。
這時,阻塞隊列,會自動幫我們處理這種情況。
當阻塞隊列為空的時候,從隊列中取元素的操作就會被阻塞。當阻塞隊列滿的時候,往隊列中放入元素的操作就會被阻塞。
而后,一旦空隊列有數據了,或者滿隊列有空余位置時,被阻塞的線程就會被自動喚醒。
這就是阻塞隊列的好處,你不需要關心線程何時被阻塞,也不需要關心線程何時被喚醒,一切都由阻塞隊列自動幫我們完成。我們只需要關注具體的業務邏輯就可以了。
而這種阻塞隊列經常用在生產者消費者模式中。(可參看:面試官讓我手寫一個生產者消費者模式)
常用的阻塞隊列
那么,一般我們用到的阻塞隊列有哪些呢。下面,通過idea的類圖,列出來常用的阻塞隊列,然后一個一個講解(不懂怎么用的,可以參考這篇文章:怎么用IDEA快速查看類圖關系)。
阻塞隊列中,所有常用的方法都在 BlockingQueue 接口中定義。如
插入元素的方法: put,offer,add。移除元素的方法: remove,poll,take。
它們有四種不同的處理方式,第一種是在失敗時拋出異常,第二種是在失敗時返回特殊值,第三種是一直阻塞當前線程,最后一種是在指定時間內阻塞,否則返回特殊值。(以上特殊值,是指在插入元素時,失敗返回false,在取出元素時,失敗返回null)
拋異常 | 特殊值 | 阻塞 | 超時 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
**1) ArrayBlockingQueue**
這是一個由數組結構組成的有界阻塞隊列。首先看下它的構造方法,有三個。
第一個可以指定隊列的大小,第二個還可以指定隊列是否公平,不指定的話,默認是非公平。它是使用 ReentrantLock 的公平鎖和非公平鎖實現的(后續講解AQS時,會詳細說明)。
簡單理解就是,ReentrantLock 內部會維護一個有先后順序的等待隊列,假如有五個任務一起過來,都被阻塞了。如果是公平的,則等待隊列中等待最久的任務就會先進入阻塞隊列。如果是非公平的,那么這五個線程就需要搶鎖,誰先搶到,誰就先進入阻塞隊列。
第三個構造方法,是把一個集合的元素初始化到阻塞隊列中。
另外,ArrayBlockingQueue 沒有實現讀寫分離,也就是說,讀和寫是不能同時進行的。因為,它讀寫時用的是同一把鎖,如下圖所示:
2) LinkedBlockingQueue
這是一個由鏈表結構組成的有界阻塞隊列。它的構造方法有三個。
可以看到和 ArrayBlockingQueue 的構造方法大同小異,不過是,LinkedBlockingQueue 可以不指定隊列的大小,默認值是 Integer.MAX_VALUE 。
但是,最好不要這樣做,建議指定一個固定大小。因為,如果生產者的速度比消費者的速度大的多的情況下,這會導致阻塞隊列一直膨脹,直到系統內存被耗盡(此時,還沒達到隊列容量的最大值)。
此外,LinkedBlockingQueue 實現了讀寫分離,可以實現數據的讀和寫互不影響,這在高並發的場景下,對於效率的提高無疑是非常巨大的。
3) SynchronousQueue
這是一個沒有緩沖的無界隊列。什么意思,看一下它的 size 方法:
總是返回 0 ,因為它是一個沒有容量的隊列。
當執行插入元素的操作時,必須等待一個取出操作。也就是說,put元素的時候,必須等待 take 操作。
那么,有的同學就好奇了,這沒有容量,還叫什么隊列啊,這有什么意義呢。
我的理解是,這適用於並發任務不大,而且生產者和消費者的速度相差不多的場景下,直接把生產者和消費者對接,不用經過隊列的入隊出隊這一系列操作。所以,效率上會高一些。
可以去查看一下 Excutors.newCachedThreadPool 方法用的就是這種隊列。
這個隊列有兩個構造方法,用於傳入是公平還是非公平,默認是非公平。
4)PriorityBlockingQueue
這是一個支持優先級排序的無界隊列。有四個構造方法:
可以指定初始容量大小(注意初始容量並不代表最大容量),或者不指定,默認大小為 11。也可以傳入一個比較器,把元素按一定的規則排序,不指定比較器的話,默認是自然順序。
PriorityBlockingQueue 是基於二叉樹最小堆實現的,每當取元素的時候,就會把優先級最高的元素取出來。我們測試一下:
public class Person {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public Person() {
}
}
public class QueueTest {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Person> priorityBlockingQueue = new PriorityBlockingQueue<>(1, new Comparator<Person>() {
@Override
public int compare(Person o1, Person o2) {
return o1.getId() - o2.getId();
}
});
Person p2 = new Person(7, "李四");
Person p1 = new Person(9, "張三");
Person p3 = new Person(6, "王五");
Person p4 = new Person(2, "趙六");
priorityBlockingQueue.add(p1);
priorityBlockingQueue.add(p2);
priorityBlockingQueue.add(p3);
priorityBlockingQueue.add(p4);
//由於二叉樹最小堆實現,用這種方式直接打印元素,不能保證有序
System.out.println(priorityBlockingQueue);
System.out.println(priorityBlockingQueue.take());
System.out.println(priorityBlockingQueue);
System.out.println(priorityBlockingQueue.take());
System.out.println(priorityBlockingQueue);
}
}
打印結果:
[Person{id=2, name='趙六'}, Person{id=6, name='王五'}, Person{id=7, name='李四'}, Person{id=9, name='張三'}]
Person{id=2, name='趙六'}
[Person{id=6, name='王五'}, Person{id=9, name='張三'}, Person{id=7, name='李四'}]
Person{id=6, name='王五'}
[Person{id=7, name='李四'}, Person{id=9, name='張三'}]
可以看到,第一次取出的是 id 最小值 2, 第二次取出的是 6 。
5)DelayQueue
這是一個帶有延遲時間的無界阻塞隊列。隊列中的元素,只有等延時時間到了,才能取出來。此隊列一般用於過期數據的刪除,或任務調度。以下,模擬一下定長時間的數據刪除。
首先定義數據元素,需要實現 Delayed 接口,實現 getDelay 方法用於計算剩余時間,和 CompareTo方法用於優先級排序。
public class DelayData implements Delayed {
private int id;
private String name;
//數據到期時間
private long endTime;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public DelayData(int id, String name, long endTime) {
this.id = id;
this.name = name;
//需要把傳入的時間endTime 加上當前系統時間,作為數據的到期時間
this.endTime = endTime + System.currentTimeMillis();
}
public DelayData() {
}
@Override
public long getDelay(TimeUnit unit) {
return this.endTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return o.getDelay(this.timeUnit) - this.getDelay(this.timeUnit) < 0 ? 1: -1;
}
}
模擬三條數據,分別設置不同的過期時間:
public class ProcessData {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayData> delayQueue = new DelayQueue<>();
DelayData a = new DelayData(5, "A", 5000);
DelayData b = new DelayData(8, "B", 8000);
DelayData c = new DelayData(2, "C", 2000);
delayQueue.add(a);
delayQueue.add(b);
delayQueue.add(c);
System.out.println("開始計時時間:" + System.currentTimeMillis());
for (int i = 0; i < 3; i++) {
DelayData data = delayQueue.take();
System.out.println("id:"+data.getId()+",數據:"+data.getName()+"被移除,當前時間:"+System.currentTimeMillis());
}
}
}
最后結果:
開始計時時間:1583333583216
id:2,數據:C被移除,當前時間:1583333585216
id:5,數據:A被移除,當前時間:1583333588216
id:8,數據:B被移除,當前時間:1583333591216
可以看到,數據是按過期時間長短,按順序移除的。C的時間最短 2 秒,然后過了 3 秒 A 也過期,再過 3 秒,B 過期。