
ArrayBlockingQueue:由數組結構組成的有界阻塞隊列;
LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列(但大小默認值為:Integer.MAX_VALUE);
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列;
DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列;
SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列;
LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列;
LinkedBlockingDeque:由鏈表結果組成的雙向阻塞隊列;



阻塞隊列核心方法


add(e):為阻塞隊列添加一個元素e,添加成功返回 true,當阻塞隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full
remove():為阻塞隊列移除一個元素,移除成功時返回移除的元素,當阻塞隊列為空時,拋出異常 java.util.NoSuchElementException
element():檢查當前阻塞隊列的首個元素,成功時返回首個元素,當阻塞隊列為空時,拋出異常 java.util.NoSuchElementException
offer(e):為阻塞隊列添加一個元素e,添加成功返回 true,當阻塞隊列滿時返回 false
poll():為阻塞隊列移除一個元素,移除成功時返回移除的元素,當阻塞隊列為空時,返回 null
peek():檢查當前阻塞隊列的首個元素,成功時返回首個元素,當阻塞隊列為空時,返回 null
put(e):為阻塞隊列添加一個元素e,無返回值,當阻塞隊列滿時會阻塞線程,直到操作成功為止
take():為阻塞隊列移除一個元素,成功時返回首個元素,當阻塞隊列為空時會阻塞線程,直到操作成功為止
offer(e,time,unit):為阻塞隊列添加一個元素e,添加成功返回 true,當阻塞隊列滿時會阻塞隊列一段時間(time:long類型的時間,unit為時間單位),失敗返回 false
poll(time,unit):為阻塞隊列移除一個元素,移除成功時返回移除的元素,當阻塞隊列為空時,會阻塞隊列一段時間(time:long類型的時間,unit為時間單位),失敗返回 null
SynchronousQueue:不存儲元素的阻塞隊列

import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue queue = new SynchronousQueue<>(); new Thread(() ->{ try { System.out.println(Thread.currentThread().getName()+"\t put 1"); queue.put(1); System.out.println(Thread.currentThread().getName()+"\t put 2"); queue.put(2); System.out.println(Thread.currentThread().getName()+"\t put 3"); queue.put(3); } catch (InterruptedException e) { e.printStackTrace(); } },"A").start(); new Thread(() ->{ try { try { TimeUnit.SECONDS.sleep(1); }catch (Exception e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" 第一次移除隊列元素"); queue.take(); try { TimeUnit.SECONDS.sleep(1); }catch (Exception e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" 第二次移除隊列元素"); queue.take(); try { TimeUnit.SECONDS.sleep(1); }catch (Exception e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" 第三次移除隊列元素"); queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"B").start(); } }

生產者與消費者實例:(生產一個消費一個)
第一版(使用synchronized、wait 、 notify等方法進行實現)
第二版(使用ReentrantLock、await、signal / signalAll等方法進行進行實現)
第二版示例代碼:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class ShareData {//資源類 private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception { lock.lock(); try { //1、判斷 while (number != 0) { //等待 condition.await(); } //2、干活 number++; System.out.println(Thread.currentThread().getName() + "\t" + number); //3、喚醒 condition.signalAll();//喚醒所有等待線程 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception { lock.lock(); try { //1、判斷 while (number == 0) { //等待 condition.await(); } //2、干活 number--; System.out.println(Thread.currentThread().getName() + "\t" + number); //3、喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() ->{ for (int i = 1; i <= 5; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } },"A").start(); new Thread(() ->{ for (int i = 1; i <= 5; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } },"B").start(); } }

第三版:(volatile/CAS/AtomicInteger/BlockingQueue/線程交互/原子引用)等進行實現
第三版示例代碼:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class MyResource{ private volatile boolean flag = true;//默認開啟,進行生產+消費 volatile(可見性、不保證原子性、禁止指令重排) private AtomicInteger atomicInteger = new AtomicInteger();//原子整型類(保證原子性)、默認值:0 BlockingQueue<String> queue = null; public MyResource(BlockingQueue<String> queue) { this.queue = queue; System.out.println("當前傳入的BlockingQueue:" + queue.getClass().getName()); } public void myProd()throws Exception{//生產蛋糕 String data = null; boolean retValue;//默認值為 false while (flag){ //atomicInteger.incrementAndGet();// ++atomicInteger //atomicInteger.getAndIncrement();// atomicInteger++ data = atomicInteger.incrementAndGet() + ""; retValue = queue.offer(data,2L, TimeUnit.SECONDS);//添加 data,若失敗會阻塞2秒 返回false if (retValue){ System.out.println(Thread.currentThread().getName()+"\t 插入隊列:"+data+" 成功"); }else { System.out.println(Thread.currentThread().getName()+"\t 插入隊列:"+data+" 失敗"); } TimeUnit.SECONDS.sleep(1);//每一秒生產 1個 } System.out.println(Thread.currentThread().getName()+"\t flag = false 生產結束"); } public void myConsumer()throws Exception{ String data = null; while (flag){ data = queue.poll(2L,TimeUnit.SECONDS);//獲取 data,若失敗會阻塞2秒 返回 null if (data == null || data.equalsIgnoreCase("")){ System.out.println(Thread.currentThread().getName()+"\t 2秒沒有取到蛋糕 消費退出"); return; } System.out.println(Thread.currentThread().getName()+"\t 消費隊列蛋糕:"+data+" 成功"); } } public void stop()throws Exception{ this.flag = false; } } /** * volatile/CAS/AtomicInteger/BlockingQueue/線程交互/原子引用 */ public class ProConsumer_BlockingQueueDemo { public static void main(String[] args) throws Exception { MyResource resource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() ->{ try { resource.myProd(); } catch (Exception e) { e.printStackTrace(); } },"prod").start(); new Thread(() ->{ try { resource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } },"consumer").start(); //睡眠5秒 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); System.out.println("5秒鍾到,活動結束"); resource.stop(); } }

