這是java高並發系列第25篇文章。
環境:jdk1.8。
本文內容
- 掌握Queue、BlockingQueue接口中常用的方法
- 介紹6中阻塞隊列,及相關場景示例
- 重點掌握4種常用的阻塞隊列
Queue接口
隊列是一種先進先出(FIFO)的數據結構,java中用Queue
接口來表示隊列。
Queue
接口中定義了6個方法:
public interface Queue<E> extends Collection<E> {
boolean add(e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
每個Queue
方法都有兩種形式:
(1)如果操作失敗則拋出異常,
(2)如果操作失敗,則返回特殊值(null
或false
,具體取決於操作),接口的常規結構如下表所示。
操作類型 | 拋出異常 | 返回特殊值 |
---|---|---|
插入 | add(e) |
offer(e) |
移除 | remove() |
poll() |
檢查 | element() |
peek() |
Queue
從Collection
繼承的add
方法插入一個元素,除非它違反了隊列的容量限制,在這種情況下它會拋出IllegalStateException
;offer
方法與add
不同之處僅在於它通過返回false
來表示插入元素失敗。
remove
和poll
方法都移除並返回隊列的頭部,確切地移除哪個元素是由具體的實現來決定的,僅當隊列為空時,remove
和poll
方法的行為才有所不同,在這些情況下,remove
拋出NoSuchElementException
,而poll
返回null
。
element
和peek
方法返回隊列頭部的元素,但不移除,它們之間的差異與remove
和poll
的方式完全相同,如果隊列為空,則element
拋出NoSuchElementException
,而peek
返回null
。
隊列一般不要插入空元素。
BlockingQueue接口
BlockingQueue
位於juc中,熟稱阻塞隊列, 阻塞隊列首先它是一個隊列,繼承Queue
接口,是隊列就會遵循先進先出(FIFO)的原則,又因為它是阻塞的,故與普通的隊列有兩點區別:
- 當一個線程向隊列里面添加數據時,如果隊列是滿的,那么將阻塞該線程,暫停添加數據
- 當一個線程從隊列里面取出數據時,如果隊列是空的,那么將阻塞該線程,暫停取出數據
BlockingQueue
相關方法:
操作類型 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入 | add(e) |
offer(e) |
put(e) | offer(e,timeuout,unit) |
移除 | remove() |
poll() |
take() | poll(timeout,unit) |
檢查 | element() |
peek() |
不支持 | 不支持 |
重點,再來解釋一下,加深印象:
- 3個可能會有異常的方法,add、remove、element;這3個方法不會阻塞(是說隊列滿或者空的情況下是否會阻塞);隊列滿的情況下,add拋出異常;隊列為空情況下,remove、element拋出異常
- offer、poll、peek 也不會阻塞(是說隊列滿或者空的情況下是否會阻塞);隊列滿的情況下,offer返回false;隊列為空的情況下,pool、peek返回null
- 隊列滿的情況下,調用put方法會導致當前線程阻塞
- 隊列為空的情況下,調用take方法會導致當前線程阻塞
offer(e,timeuout,unit)
,超時之前,插入成功返回true,否者返回falsepoll(timeout,unit)
,超時之前,獲取到頭部元素並將其移除,返回true,否者返回false- 以上一些方法希望大家都記住,方便以后使用
BlockingQueue常見的實現類
看一下相關類圖
ArrayBlockingQueue
基於數組的阻塞隊列實現,其內部維護一個定長的數組,用於存儲隊列元素。線程阻塞的實現是通過ReentrantLock來完成的,數據的插入與取出共用同一個鎖,因此ArrayBlockingQueue並不能實現生產、消費同時進行。而且在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
LinkedBlockingQueue
基於單向鏈表的阻塞隊列實現,在初始化LinkedBlockingQueue的時候可以指定大小,也可以不指定,默認類似一個無限大小的容量(Integer.MAX_VALUE),不指隊列容量大小也是會有風險的,一旦數據生產速度大於消費速度,系統內存將有可能被消耗殆盡,因此要謹慎操作。另外LinkedBlockingQueue中用於阻塞生產者、消費者的鎖是兩個(鎖分離),因此生產與消費是可以同時進行的。
PriorityBlockingQueue
一個支持優先級排序的無界阻塞隊列,進入隊列的元素會按照優先級進行排序
SynchronousQueue
同步阻塞隊列,SynchronousQueue沒有容量,與其他BlockingQueue不同,SynchronousQueue是一個不存儲元素的BlockingQueue,每一個put操作必須要等待一個take操作,否則不能繼續添加元素,反之亦然
DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列,里面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊列里面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行,也就是說只有在延遲期到時才能夠從隊列中取元素
LinkedTransferQueue
LinkedTransferQueue是基於鏈表的FIFO無界阻塞隊列,它出現在JDK7中,Doug Lea 大神說LinkedTransferQueue是一個聰明的隊列,它是ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、無界的LinkedBlockingQueues等的超集,
LinkedTransferQueue
包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues
三種隊列的功能
下面我們來介紹每種阻塞隊列的使用。
ArrayBlockingQueue
有界阻塞隊列,內部使用數組存儲元素,有2個常用構造方法:
//capacity表示容量大小,默認內部采用非公平鎖
public ArrayBlockingQueue(int capacity)
//capacity:容量大小,fair:內部是否是使用公平鎖
public ArrayBlockingQueue(int capacity, boolean fair)
需求:業務系統中有很多地方需要推送通知,由於需要推送的數據太多,我們將需要推送的信息先丟到阻塞隊列中,然后開一個線程進行處理真實發送,代碼如下:
package com.itsoku.chat25;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import sun.text.normalizer.NormalizerBase;
import java.util.Calendar;
import java.util.concurrent.*;
/**
* 跟着阿里p7學並發,微信公眾號:javacode2018
*/
public class Demo1 {
//推送隊列
static ArrayBlockingQueue<String> pushQueue = new ArrayBlockingQueue<String>(10000);
static {
//啟動一個線程做真實推送
new Thread(() -> {
while (true) {
String msg;
try {
long starTime = System.currentTimeMillis();
//獲取一條推送消息,此方法會進行阻塞,直到返回結果
msg = pushQueue.take();
long endTime = System.currentTimeMillis();
//模擬推送耗時
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(String.format("[%s,%s,take耗時:%s],%s,發送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//推送消息,需要發送推送消息的調用該方法,會將推送信息先加入推送隊列
public static void pushMsg(String msg) throws InterruptedException {
pushQueue.put(msg);
}
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 5; i++) {
String msg = "一起來學java高並發,第" + i + "天";
//模擬耗時
TimeUnit.SECONDS.sleep(i);
Demo1.pushMsg(msg);
}
}
}
輸出:
[1565595629206,1565595630207,take耗時:1001],Thread-0,發送消息:一起來學java高並發,第1天
[1565595630208,1565595632208,take耗時:2000],Thread-0,發送消息:一起來學java高並發,第2天
[1565595632208,1565595635208,take耗時:3000],Thread-0,發送消息:一起來學java高並發,第3天
[1565595635208,1565595639209,take耗時:4001],Thread-0,發送消息:一起來學java高並發,第4天
[1565595639209,1565595644209,take耗時:5000],Thread-0,發送消息:一起來學java高並發,第5天
代碼中我們使用了有界隊列ArrayBlockingQueue
,創建ArrayBlockingQueue
時候需要制定容量大小,調用pushQueue.put
將推送信息放入隊列中,如果隊列已滿,此方法會阻塞。代碼中在靜態塊中啟動了一個線程,調用pushQueue.take();
從隊列中獲取待推送的信息進行推送處理。
注意:ArrayBlockingQueue
如果隊列容量設置的太小,消費者發送的太快,消費者消費的太慢的情況下,會導致隊列空間滿,調用put方法會導致發送者線程阻塞,所以注意設置合理的大小,協調好消費者的速度。
LinkedBlockingQueue
內部使用單向鏈表實現的阻塞隊列,3個構造方法:
//默認構造方法,容量大小為Integer.MAX_VALUE
public LinkedBlockingQueue();
//創建指定容量大小的LinkedBlockingQueue
public LinkedBlockingQueue(int capacity);
//容量為Integer.MAX_VALUE,並將傳入的集合丟入隊列中
public LinkedBlockingQueue(Collection<? extends E> c);
LinkedBlockingQueue
的用法和ArrayBlockingQueue
類似,建議使用的時候指定容量,如果不指定容量,插入的太快,移除的太慢,可能會產生OOM。
PriorityBlockingQueue
無界的優先級阻塞隊列,內部使用數組存儲數據,達到容量時,會自動進行擴容,放入的元素會按照優先級進行排序,4個構造方法:
//默認構造方法,默認初始化容量是11
public PriorityBlockingQueue();
//指定隊列的初始化容量
public PriorityBlockingQueue(int initialCapacity);
//指定隊列的初始化容量和放入元素的比較器
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator);
//傳入集合放入來初始化隊列,傳入的集合可以實現SortedSet接口或者PriorityQueue接口進行排序,如果沒有實現這2個接口,按正常順序放入隊列
public PriorityBlockingQueue(Collection<? extends E> c);
優先級隊列放入元素的時候,會進行排序,所以我們需要指定排序規則,有2種方式:
- 創建
PriorityBlockingQueue
指定比較器Comparator
- 放入的元素需要實現
Comparable
接口
上面2種方式必須選一個,如果2個都有,則走第一個規則排序。
需求:還是上面的推送業務,目前推送是按照放入的先后順序進行發送的,比如有些公告比較緊急,優先級比較高,需要快點發送,怎么搞?此時PriorityBlockingQueue
就派上用場了,代碼如下:
package com.itsoku.chat25;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 跟着阿里p7學並發,微信公眾號:javacode2018
*/
public class Demo2 {
//推送信息封裝
static class Msg implements Comparable<Msg> {
//優先級,越小優先級越高
private int priority;
//推送的信息
private String msg;
public Msg(int priority, String msg) {
this.priority = priority;
this.msg = msg;
}
@Override
public int compareTo(Msg o) {
return Integer.compare(this.priority, o.priority);
}
@Override
public String toString() {
return "Msg{" +
"priority=" + priority +
", msg='" + msg + '\'' +
'}';
}
}
//推送隊列
static PriorityBlockingQueue<Msg> pushQueue = new PriorityBlockingQueue<Msg>();
static {
//啟動一個線程做真實推送
new Thread(() -> {
while (true) {
Msg msg;
try {
long starTime = System.currentTimeMillis();
//獲取一條推送消息,此方法會進行阻塞,直到返回結果
msg = pushQueue.take();
//模擬推送耗時
TimeUnit.MILLISECONDS.sleep(100);
long endTime = System.currentTimeMillis();
System.out.println(String.format("[%s,%s,take耗時:%s],%s,發送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//推送消息,需要發送推送消息的調用該方法,會將推送信息先加入推送隊列
public static void pushMsg(int priority, String msg) throws InterruptedException {
pushQueue.put(new Msg(priority, msg));
}
public static void main(String[] args) throws InterruptedException {
for (int i = 5; i >= 1; i--) {
String msg = "一起來學java高並發,第" + i + "天";
Demo2.pushMsg(i, msg);
}
}
}
輸出:
[1565598857028,1565598857129,take耗時:101],Thread-0,發送消息:Msg{priority=1, msg='一起來學java高並發,第1天'}
[1565598857162,1565598857263,take耗時:101],Thread-0,發送消息:Msg{priority=2, msg='一起來學java高並發,第2天'}
[1565598857263,1565598857363,take耗時:100],Thread-0,發送消息:Msg{priority=3, msg='一起來學java高並發,第3天'}
[1565598857363,1565598857463,take耗時:100],Thread-0,發送消息:Msg{priority=4, msg='一起來學java高並發,第4天'}
[1565598857463,1565598857563,take耗時:100],Thread-0,發送消息:Msg{priority=5, msg='一起來學java高並發,第5天'}
main中放入了5條推送信息,i作為消息的優先級按倒敘放入的,最終輸出結果中按照優先級由小到大輸出。注意Msg實現了Comparable
接口,具有了比較功能。
SynchronousQueue
同步阻塞隊列,SynchronousQueue沒有容量,與其他BlockingQueue不同,SynchronousQueue是一個不存儲元素的BlockingQueue,每一個put操作必須要等待一個take操作,否則不能繼續添加元素,反之亦然。SynchronousQueue 在現實中用的不多,線程池中有用到過,
Executors.newCachedThreadPool()
實現中用到了這個隊列,當有任務丟入線程池的時候,如果已創建的工作線程都在忙於處理任務,則會新建一個線程來處理丟入隊列的任務。
來個示例代碼:
package com.itsoku.chat25;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 跟着阿里p7學並發,微信公眾號:javacode2018
*/
public class Demo3 {
static SynchronousQueue<String> queue = new SynchronousQueue<>();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
long starTime = System.currentTimeMillis();
queue.put("java高並發系列,路人甲Java!");
long endTime = System.currentTimeMillis();
System.out.println(String.format("[%s,%s,take耗時:%s],%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//休眠5秒之后,從隊列中take一個元素
TimeUnit.SECONDS.sleep(5);
System.out.println(System.currentTimeMillis() + "調用take獲取並移除元素," + queue.take());
}
}
輸出:
1565600421645調用take獲取並移除元素,java高並發系列,路人甲Java!
[1565600416645,1565600421645,take耗時:5000],Thread-0
main方法中啟動了一個線程,調用queue.put
方法向隊列中丟入一條數據,調用的時候產生了阻塞,從輸出結果中可以看出,直到take方法被調用時,put方法才從阻塞狀態恢復正常。
DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列,里面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊列里面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行,也就是說只有在延遲期到時才能夠從隊列中取元素。
需求:還是推送的業務,有時候我們希望早上9點或者其他指定的時間進行推送,如何實現呢?此時DelayQueue
就派上用場了。
我們先看一下DelayQueue
類的聲明:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
元素E需要實現接口Delayed
,我們看一下這個接口的代碼:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed
繼承了Comparable
接口,這個接口是用來做比較用的,DelayQueue
內部使用PriorityQueue
來存儲數據的,PriorityQueue
是一個優先級隊列,丟入的數據會進行排序,排序方法調用的是Comparable
接口中的方法。下面主要說一下Delayed
接口中的getDelay
方法:此方法在給定的時間單位內返回與此對象關聯的剩余延遲時間。
對推送我們再做一下處理,讓其支持定時發送(定時在將來某個時間也可以說是延遲發送),代碼如下:
package com.itsoku.chat25;
import java.util.Calendar;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 跟着阿里p7學並發,微信公眾號:javacode2018
*/
public class Demo4 {
//推送信息封裝
static class Msg implements Delayed {
//優先級,越小優先級越高
private int priority;
//推送的信息
private String msg;
//定時發送時間,毫秒格式
private long sendTimeMs;
public Msg(int priority, String msg, long sendTimeMs) {
this.priority = priority;
this.msg = msg;
this.sendTimeMs = sendTimeMs;
}
@Override
public String toString() {
return "Msg{" +
"priority=" + priority +
", msg='" + msg + '\'' +
", sendTimeMs=" + sendTimeMs +
'}';
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.sendTimeMs - Calendar.getInstance().getTimeInMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o instanceof Msg) {
Msg c2 = (Msg) o;
return Integer.compare(this.priority, c2.priority);
}
return 0;
}
}
//推送隊列
static DelayQueue<Msg> pushQueue = new DelayQueue<Msg>();
static {
//啟動一個線程做真實推送
new Thread(() -> {
while (true) {
Msg msg;
try {
//獲取一條推送消息,此方法會進行阻塞,直到返回結果
msg = pushQueue.take();
//此處可以做真實推送
long endTime = System.currentTimeMillis();
System.out.println(String.format("定時發送時間:%s,實際發送時間:%s,發送消息:%s", msg.sendTimeMs, endTime, msg));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//推送消息,需要發送推送消息的調用該方法,會將推送信息先加入推送隊列
public static void pushMsg(int priority, String msg, long sendTimeMs) throws InterruptedException {
pushQueue.put(new Msg(priority, msg, sendTimeMs));
}
public static void main(String[] args) throws InterruptedException {
for (int i = 5; i >= 1; i--) {
String msg = "一起來學java高並發,第" + i + "天";
Demo4.pushMsg(i, msg, Calendar.getInstance().getTimeInMillis() + i * 2000);
}
}
}
輸出:
定時發送時間:1565603357198,實際發送時間:1565603357198,發送消息:Msg{priority=1, msg='一起來學java高並發,第1天', sendTimeMs=1565603357198}
定時發送時間:1565603359198,實際發送時間:1565603359198,發送消息:Msg{priority=2, msg='一起來學java高並發,第2天', sendTimeMs=1565603359198}
定時發送時間:1565603361198,實際發送時間:1565603361199,發送消息:Msg{priority=3, msg='一起來學java高並發,第3天', sendTimeMs=1565603361198}
定時發送時間:1565603363198,實際發送時間:1565603363199,發送消息:Msg{priority=4, msg='一起來學java高並發,第4天', sendTimeMs=1565603363198}
定時發送時間:1565603365182,實際發送時間:1565603365183,發送消息:Msg{priority=5, msg='一起來學java高並發,第5天', sendTimeMs=1565603365182}
可以看出時間發送時間,和定時發送時間基本一致,代碼中Msg
需要實現Delayed接口
,重點在於getDelay
方法,這個方法返回剩余的延遲時間,代碼中使用this.sendTimeMs
減去當前時間的毫秒格式時間,得到剩余延遲時間。
LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedTransferQueue類繼承自AbstractQueue抽象類,並且實現了TransferQueue接口:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果存在一個消費者已經等待接收它,則立即傳送指定的元素,否則返回false,並且不進入隊列。
boolean tryTransfer(E e);
// 如果存在一個消費者已經等待接收它,則立即傳送指定的元素,否則等待直到元素被消費者接收。
void transfer(E e) throws InterruptedException;
// 在上述方法的基礎上設置超時時間
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 如果至少有一位消費者在等待,則返回true
boolean hasWaitingConsumer();
// 獲取所有等待獲取元素的消費線程數量
int getWaitingConsumerCount();
}
再看一下上面的這些方法,transfer(E e)
方法和SynchronousQueue的put方法
類似,都需要等待消費者取走元素,否者一直等待。其他方法和ArrayBlockingQueue、LinkedBlockingQueue
中的方法類似。
總結
- 重點需要了解
BlockingQueue
中的所有方法,以及他們的區別 - 重點掌握
ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
、DelayQueue
的使用場景 - 需要處理的任務有優先級的,使用
PriorityBlockingQueue
- 處理的任務需要延時處理的,使用
DelayQueue
java高並發系列目錄
- 第1天:必須知道的幾個概念
- 第2天:並發級別
- 第3天:有關並行的兩個重要定律
- 第4天:JMM相關的一些概念
- 第5天:深入理解進程和線程
- 第6天:線程的基本操作
- 第7天:volatile與Java內存模型
- 第8天:線程組
- 第9天:用戶線程和守護線程
- 第10天:線程安全和synchronized關鍵字
- 第11天:線程中斷的幾種方式
- 第12天JUC:ReentrantLock重入鎖
- 第13天:JUC中的Condition對象
- 第14天:JUC中的LockSupport工具類,必備技能
- 第15天:JUC中的Semaphore(信號量)
- 第16天:JUC中等待多線程完成的工具類CountDownLatch,必備技能
- 第17天:JUC中的循環柵欄CyclicBarrier的6種使用場景
- 第18天:JAVA線程池,這一篇就夠了
- 第19天:JUC中的Executor框架詳解1
- 第20天:JUC中的Executor框架詳解2
- 第21天:java中的CAS,你需要知道的東西
- 第22天:JUC底層工具類Unsafe,高手必須要了解
- 第23天:JUC中原子類,一篇就夠了
- 第24天:ThreadLocal、InheritableThreadLocal(通俗易懂)
java高並發系列連載中,總計估計會有四五十篇文章。
阿里p7一起學並發,公眾號:路人甲java,每天獲取最新文章!