一、阻塞隊列
當阻塞隊列是空,從隊列中獲取元素的操作會被阻塞
當阻塞隊列是滿,往隊列中添加元素的操作會被阻塞
二、為什么用,有什么好處?
我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切阻塞隊列都包辦了。
三、常見的阻塞隊列
ArrayBlockingQueue由數組構成的有界阻塞隊列.
LinkedBlockingQueue由鏈表構成的有界阻塞隊列(默認值為Integer.MAX_VALUE)
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
/**
* 1、拋出異常 add()/remove()
*/
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.element()); //檢查隊首元素
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
/**
* 2、返回布爾類型 offer()/pull()
*/
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("d"));
//
// System.out.println(blockingQueue.peek()); //檢查隊首元素
//
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
/**
* 3、阻塞 put()/take()
*/
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// System.out.println("############");
// blockingQueue.put("d");
//
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
/**
*4、超時
*/
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS));
}
}
SynchronousQueue是一個不存儲元素的阻塞隊列,也即單個元素的隊列
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列
DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列
LinkedTransferQueue:由鏈表構成的無界阻塞隊列
LinkedBlockingDeque:由鏈表構成的雙向阻塞隊列
四、BlockQueue的核心方法
add()/remove()/element():拋出異常
offer()/pull():返回布爾類型/支持超時
put()/take():阻塞
peek() 檢查隊列首元素
五、使用場景
1、生產者-消費者模式
問題: 一個初始值為零的變量,兩個線程對其交替操作,一個加1一個減1,來5輪
1)傳統版的生產者-消費者模式
/**
* Created by wujuhong on 2019/7/3.
* 傳統版的生產者消費者模式
*/
public class ProductConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BB").start();
}
}
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
//1、判斷
while (number != 0) {
//等待,不能生產
condition.await();
}
//2、干活
number++;
System.out.println(Thread.currentThread().getName() + " " + number);
//3、通知喚醒
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
//1、判斷
while (number == 0) {
//等待,不能生產
condition.await();
}
//2、干活
number--;
System.out.println(Thread.currentThread().getName() + " " + number);
//3、通知喚醒
condition.signalAll();
} finally {
lock.unlock();
}
}
}
2)阻塞隊列的生產者-消費者模式
class MyResource {
private volatile boolean FLAG = true; //默認生產,進行生產+消費
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProduct() throws InterruptedException {
String data = "";
boolean returnValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
returnValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (returnValue) {
System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "失敗");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + " 生產動作結束,FLAG = false");
}
public void myConsume() throws InterruptedException {
String result = "";
while (FLAG) {
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + " 超過2s鍾沒有取到蛋糕,消費隊列退出");
return;
}
System.out.println(Thread.currentThread().getName() + " 消費隊列" + result + "成功");
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + " 消費動作結束,FLAG = false");
}
public void stop() {
this.FLAG = false;
}
}
public class ProductConsumer_BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 生產線程啟動");
try {
myResource.myProduct();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "product").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 消費線程啟動");
try {
myResource.myConsume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consume").start();
TimeUnit.SECONDS.sleep(5);
System.out.println("5秒鍾時間到,main線程叫停,活動結束");
myResource.stop();
}
}
2、線程池
3、消息中間件
阻塞隊列有沒有好的一面?
不得不阻塞,你如何管理?
