一、阻塞隊列
當阻塞隊列是空,從隊列中獲取元素的操作會被阻塞
當阻塞隊列是滿,往隊列中添加元素的操作會被阻塞
二、為什么用,有什么好處?
我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切阻塞隊列都包辦了。
三、常見的阻塞隊列
ArrayBlockingQueue由數組構成的有界阻塞隊列.
LinkedBlockingQueue由鏈表構成的有界阻塞隊列(默認值為Integer.MAX_VALUE)
public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3); /** * 1、拋出異常 add()/remove() */ // System.out.println(blockingQueue.add("a")); // System.out.println(blockingQueue.add("b")); // System.out.println(blockingQueue.add("c")); // System.out.println(blockingQueue.add("d")); // System.out.println(blockingQueue.element()); //檢查隊首元素 // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); /** * 2、返回布爾類型 offer()/pull() */ // System.out.println(blockingQueue.offer("a")); // System.out.println(blockingQueue.offer("b")); // System.out.println(blockingQueue.offer("c")); // System.out.println(blockingQueue.offer("d")); // // System.out.println(blockingQueue.peek()); //檢查隊首元素 // // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); /** * 3、阻塞 put()/take() */ // blockingQueue.put("a"); // blockingQueue.put("b"); // blockingQueue.put("c"); // System.out.println("############"); // blockingQueue.put("d"); // // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); /** *4、超時 */ System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS)); } }
SynchronousQueue是一個不存儲元素的阻塞隊列,也即單個元素的隊列
public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + " put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + " put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "BBB").start(); } }
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列
DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列
LinkedTransferQueue:由鏈表構成的無界阻塞隊列
LinkedBlockingDeque:由鏈表構成的雙向阻塞隊列
四、BlockQueue的核心方法
add()/remove()/element():拋出異常
offer()/pull():返回布爾類型/支持超時
put()/take():阻塞
peek() 檢查隊列首元素
五、使用場景
1、生產者-消費者模式
問題: 一個初始值為零的變量,兩個線程對其交替操作,一個加1一個減1,來5輪
1)傳統版的生產者-消費者模式
/** * Created by wujuhong on 2019/7/3. * 傳統版的生產者消費者模式 */ public class ProductConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "AA").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "BB").start(); } } class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws InterruptedException { lock.lock(); try { //1、判斷 while (number != 0) { //等待,不能生產 condition.await(); } //2、干活 number++; System.out.println(Thread.currentThread().getName() + " " + number); //3、通知喚醒 condition.signalAll(); } finally { lock.unlock(); } } public void decrement() throws InterruptedException { lock.lock(); try { //1、判斷 while (number == 0) { //等待,不能生產 condition.await(); } //2、干活 number--; System.out.println(Thread.currentThread().getName() + " " + number); //3、通知喚醒 condition.signalAll(); } finally { lock.unlock(); } } }
2)阻塞隊列的生產者-消費者模式
class MyResource { private volatile boolean FLAG = true; //默認生產,進行生產+消費 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProduct() throws InterruptedException { String data = ""; boolean returnValue; while (FLAG) { data = atomicInteger.incrementAndGet() + ""; returnValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (returnValue) { System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "失敗"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 生產動作結束,FLAG = false"); } public void myConsume() throws InterruptedException { String result = ""; while (FLAG) { result = blockingQueue.poll(2, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")) { FLAG = false; System.out.println(Thread.currentThread().getName() + " 超過2s鍾沒有取到蛋糕,消費隊列退出"); return; } System.out.println(Thread.currentThread().getName() + " 消費隊列" + result + "成功"); TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 消費動作結束,FLAG = false"); } public void stop() { this.FLAG = false; } } public class ProductConsumer_BlockQueueDemo { public static void main(String[] args) throws InterruptedException { MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 生產線程啟動"); try { myResource.myProduct(); } catch (InterruptedException e) { e.printStackTrace(); } }, "product").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 消費線程啟動"); try { myResource.myConsume(); } catch (InterruptedException e) { e.printStackTrace(); } }, "consume").start(); TimeUnit.SECONDS.sleep(5); System.out.println("5秒鍾時間到,main線程叫停,活動結束"); myResource.stop(); } }
2、線程池
3、消息中間件
阻塞隊列有沒有好的一面?
不得不阻塞,你如何管理?