本例介紹一個特殊的隊列:BlockingQueue,如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒.同樣,如果BlockingQueue是滿的,任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue里有空間才會被喚醒繼續操作.
使用BlockingQueue的關鍵技術點如下:
1.BlockingQueue定義的常用方法如下:
1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則報異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續.
4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null
5)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止
2.BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的
3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序.
4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
3.LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue.
下面是兩個使用BlockingQueue的例子:
1 package com.thread; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.BlockingQueue; 4 5 public class BlockingQueueTest { 6 public static void main(String[] args) { 7 final BlockingQueue queue = new ArrayBlockingQueue(3); 8 for(int i=0;i<2;i++){ 9 new Thread(){ 10 public void run(){ 11 while(true){ 12 try { 13 Thread.sleep((long)(Math.random()*1000)); 14 System.out.println(Thread.currentThread().getName() + "准備放數據!"); 15 queue.put(1); 16 System.out.println(Thread.currentThread().getName() + "已經放了數據," + 17 "隊列目前有" + queue.size() + "個數據"); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 22 } 23 } 24 25 }.start(); 26 } 27 28 new Thread(){ 29 public void run(){ 30 while(true){ 31 try { 32 //將此處的睡眠時間分別改為100和1000,觀察運行結果 33 Thread.sleep(1000); 34 System.out.println(Thread.currentThread().getName() + "准備取數據!"); 35 queue.take(); 36 System.out.println(Thread.currentThread().getName() + "已經取走數據," + 37 "隊列目前有" + queue.size() + "個數據"); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } 41 } 42 } 43 44 }.start(); 45 } 46 }
1 package com.thread; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.locks.Condition; 7 import java.util.concurrent.locks.Lock; 8 import java.util.concurrent.locks.ReentrantLock; 9 10 public class BlockingQueueCondition { 11 12 public static void main(String[] args) { 13 ExecutorService service = Executors.newSingleThreadExecutor(); 14 final Business3 business = new Business3(); 15 service.execute(new Runnable(){ 16 17 public void run() { 18 for(int i=0;i<50;i++){ 19 business.sub(); 20 } 21 } 22 23 }); 24 25 for(int i=0;i<50;i++){ 26 business.main(); 27 } 28 } 29 30 } 31 32 class Business3{ 33 BlockingQueue subQueue = new ArrayBlockingQueue(1); 34 BlockingQueue mainQueue = new ArrayBlockingQueue(1); 35 //這里是匿名構造方法,只要new一個對象都會調用這個匿名構造方法,它與靜態塊不同,靜態塊只會執行一次, 36 //在類第一次加載到JVM的時候執行 37 //這里主要是讓main線程首先put一個,就有東西可以取,如果不加這個匿名構造方法put一個的話程序就死鎖了 38 { 39 try { 40 mainQueue.put(1); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 } 45 public void sub(){ 46 try 47 { 48 mainQueue.take(); 49 for(int i=0;i<10;i++){ 50 System.out.println(Thread.currentThread().getName() + " : " + i); 51 } 52 subQueue.put(1); 53 }catch(Exception e){ 54 55 } 56 } 57 58 public void main(){ 59 60 try 61 { 62 subQueue.take(); 63 for(int i=0;i<5;i++){ 64 System.out.println(Thread.currentThread().getName() + " : " + i); 65 } 66 mainQueue.put(1); 67 }catch(Exception e){ 68 } 69 } 70 }