概念
BlockingQueue 翻譯成中文阻塞隊列,顧名思義就是線程使用隊列時會阻塞當前線程;
BlockingQueue 繼承了Collection,具有一般集合所具有的數據存取功能
BlockingQueue 是線程安全的隊列,多線程訪問時不會出現同一個數據集中的數據被多次取出,或者覆蓋存放的事件
使用場景
可用於一個快速反饋的消息隊列,無消息時阻塞線程讓出CPU,有數據存入時通知線程取出數據,取完后繼續阻塞,
比如用戶下單后立刻在大屏上顯示有客戶下單,比較簡單的做法是開啟一個定時任務,定期掃訂單表;或者接入消息中間件,下單時發送消息,大屏服務監聽消息;或者借用reddis隊列 解決方式有很多種不一一列舉
示例模擬數據的存取 設置隊列的容量為1 是為了更好展示 存取的阻塞特性
public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(1); //模擬存入數據線程 new Thread(()->{ int i=0; while (true){ try { //每次循環+1 i++; queue.put(i); System.out.println("存入數據"+i); } catch (InterruptedException e) { e.printStackTrace(); } } }, "存入數據線程").start(); //模擬取出數據線程 1秒鍾取一個 new Thread(()->{ while (true){ try { //一秒鍾取一個數據 Thread.sleep(1000); Integer result = queue.take(); System.out.println("取出數據"+result); } catch (InterruptedException e) { e.printStackTrace(); } } }, "取數據線程").start(); } 打印結果: 存入數據1 取出數據1 存入數據2 取出數據2 存入數據3 取出數據3 存入數據4 取出數據4 存入數據5 取出數據5 存入數據6 取出數據6 存入數據7 取出數據7
方法示例
阻塞隊列的使用非常簡單,基本上和普通集合一樣對數據進行存和取
public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(2); //存入一個數據 如果隊列滿了則一直阻塞到有數據取出 queue.put(1); //取出一個數據 如果隊列空了則一直阻塞到有數據存入 queue.take(); //存入一個數據 如果隊列滿了則阻塞若干時長(示例為10秒),超時則返回offerResult=false boolean offerResult = queue.offer(1, 10, TimeUnit.SECONDS); //取出一個數據 如果隊列空了則阻塞若干時長(示例為10秒),超時則返回pollResult=null Integer pollResult = queue.poll(10, TimeUnit.SECONDS); }
源碼分析
1、接口繼承結構
2、接口代碼
public interface BlockingQueue<E> extends Queue<E> { //向隊列中添加元素, 若超過給定隊列長度拋出異常 boolean add(E e); //向隊列中添加元素, 若超過給定隊列長度拋出異常 boolean offer(E e); //向隊列中添加元素, 若超過隊列長度則等待隊列有剩余容量再加入元素 void put(E e) throws InterruptedException; //向隊列中添加元素, 若超過給定隊列長度則等待給定時長 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //獲取隊列頭部元素,並從隊列頭部移除,若隊列為空,則阻塞當前獲取線程,並等待新元素加入 E take() throws InterruptedException; //獲取隊列頭部元素,並從隊列頭部移除,若隊列為空,則阻塞當前獲取線程,並等待元素給定時長 E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回隊列剩余容量 int remainingCapacity(); //移除指定元素 boolean remove(Object o); //返回是否存在指定元素 public boolean contains(Object o); //將隊列中的元素全部移除到給定的集合c中 int drainTo(Collection<? super E> c); //將隊列中的元素全部移除到給定的集合c中(最多不超過maxElements個) int drainTo(Collection<? super E> c, int maxElements); }
3、實現類 ArrayBlockingQueue 分析
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //數據集 用於存放元素 初始化固定數組長度 不再擴容 final Object[] items; //數據集下一次取數據的下標 //具體操作為 每次take加1 若take+1==items.length即take最后一個元素 //則takeIndex重置為0 如此往復 int takeIndex; //數據集下一次存數據下標 int putIndex; //數據集中 存放元素的個數 即items[i]!=null的個數 int count; //重入鎖 可選公平與非公平 非本文重點 final ReentrantLock lock; //Condition //使用流程 1取數據為空(count==0) 則阻塞等待數據集存入數據 執行等待notEmpty.await // 2存數據數據集肯定不為空(count!=0), 則通知取數據線程繼續取數據 執行通知notEmpty.signal private final Condition notEmpty; //Condition //使用流程 1存數據數據集存滿(count==items.length)則等待消耗后重新存入 執行等待notFull.await // 2取數據后則數據集未滿肯定不滿(count<items.length) 則通知存入數據 執行通知notFull.signal private final Condition notFull; //用戶維護ArrayBlockingQueue 作為集合的迭代(Iterator)功能 //調用ArrayBlockingQueue.iterator()是初始化此屬性 非本文重點 transient Itrs itrs = null; //--------------------重點方法------------------------ /** * 從隊列中取一個元素 * @return [description] * @throws InterruptedException [description] */ public E take() throws InterruptedException { //對操作進行加鎖 多線程時輪流取元素 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果隊列中沒有對象 則阻塞線程等待 while (count == 0) //重點:等待存數據的線程通知 notEmpty.await(); //代碼運行到此處說明count!=null 執行從隊列中取元素 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; //從數據集數組items 取出下標takeIndex的數據 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //取完數據之后 將數組對應下標應用置為空(GC對象) items[takeIndex] = null; //takeIndex+1等於數組長度表示當前下標為數組最后一個對象 //則takeIndex重新歸0 if (++takeIndex == items.length) takeIndex = 0; //每次取數據 數據總量減1 count--; //迭代器維護操作 if (itrs != null) itrs.elementDequeued(); //重點:通知存數據的線程 可以執行數據存放 notFull.signal(); return x; } /** * 存入一個數據 * @param e [description] * @throws InterruptedException [description] */ public void put(E e) throws InterruptedException { //校驗數據非空 checkNotNull(e); //加鎖 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //若數據集數組items滿了 則阻塞線程等待 while (count == items.length) //重點:等待取出數據的線程通知 notFull.await(); //存入數據 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; //存入數據到下標putIndex items[putIndex] = x; //如果存數據的下標已經到數據最后一個下標 則putIndex重新歸0 if (++putIndex == items.length) putIndex = 0; //數據總量加1 count++; //重點:存入數據后通知等待取數據的線程 notEmpty.signal(); } }
總結:
BlockingQueue 重點關注
1、阻塞方式
Condition notFull 和 Condition notEmpty 的使用,存通知取,取通知存;
從而達到存滿阻塞,取完阻塞,存入通知取,取出通知存的功能
2、存取游標
takeIndex 和 putIndex的使用,每次取數據takeIndex加1,到了數據末尾則重新回到數組開始下標0,存數據原理相似逐次加1,到末尾歸0
對於LinkedBlockingQueue實現方式則略有不同,鏈表式集合多線程取數據時只需要排隊從頭部節點獲取,從末尾存數據,有個小優化,創建LinkedBlockingQueue
時創建一個虛擬頭部節點,不做深究