BlockingQueue介紹及使用


1.BlockingQueue隊列和平常隊列一樣都可以用來作為存儲數據的容器,但有時候在線程當中
涉及到數據存儲的時候就會出現問題,而 BlockingQueue是空的話,如果一個線程要從BlockingQueue
里取數據的時候,該線程將會被阻斷,並進入等待狀態,直到BlockingQueue里面有數據存入了后,就會
喚醒線程進行數據的去除。若BlockingQueue是滿的,如果一個線程要將數據存入BlockQueue,該線程
將會被阻斷,並進入等待狀態,直到BlcokQueue里面的數據被取出有空間后,線程被喚醒后在將數據存入、
2.主要涉及的方法:
      BlockingQueue  方法以四種形式出現,對於不能立即滿足但可能在將來某一時刻可以滿足的操作,這四種形式的處理方式不同:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操作),第三種是在操作可以成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:
 
拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用

3.BlockingQueue定義的常用方法詳解: 
        1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則報異常 
        2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false. 
        3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續. 
        4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null 
        5)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止 
    2.BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類 
        1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的. 
        2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的 BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含 的對象是以FIFO(先入先出)順序排序的 
        3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序. 
        4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的. 
    3.LinkedBlockingQueueArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致 LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於 ArrayBlockingQueue.     

  在網上找到兩個例子  :

  

 1 package com.thread;
 2 import java.util.concurrent.ArrayBlockingQueue;
 3 import java.util.concurrent.BlockingQueue;
 4 
 5 public class BlockingQueueTest {
 6     public static void main(String[] args) {
 7         final BlockingQueue queue = new ArrayBlockingQueue(3);
 8         for(int i=0;i<2;i++){
 9             new Thread(){
10                 public void run(){
11                     while(true){
12                         try {
13                             Thread.sleep((long)(Math.random()*1000));
14                             System.out.println(Thread.currentThread().getName() + "准備放數據!");                            
15                             queue.put(1);
16                             System.out.println(Thread.currentThread().getName() + "已經放了數據," +                             
17                                         "隊列目前有" + queue.size() + "個數據");
18                         } catch (InterruptedException e) {
19                             e.printStackTrace();
20                         }
21 
22                     }
23                 }
24                 
25             }.start();
26         }
27         
28         new Thread(){
29             public void run(){
30                 while(true){
31                     try {
32                         //將此處的睡眠時間分別改為100和1000,觀察運行結果
33                         Thread.sleep(1000);
34                         System.out.println(Thread.currentThread().getName() + "准備取數據!");
35                         queue.take();
36                         System.out.println(Thread.currentThread().getName() + "已經取走數據," +                             
37                                 "隊列目前有" + queue.size() + "個數據");                    
38                     } catch (InterruptedException e) {
39                         e.printStackTrace();
40                     }
41                 }
42             }
43             
44         }.start();            
45     }
46 }

***********************************

 1 public class BlockingQueueCondition {
 2 
 3     public static void main(String[] args) {
 4         ExecutorService service = Executors.newSingleThreadExecutor();
 5         final Business3 business = new Business3();
 6         service.execute(new Runnable(){
 7 
 8             public void run() {
 9                 for(int i=0;i<50;i++){
10                     business.sub();
11                 }
12             }
13             
14         });
15         
16         for(int i=0;i<50;i++){
17             business.main();
18         }
19     }
20 
21 }
22 
23 class Business3{
24     BlockingQueue subQueue = new ArrayBlockingQueue(1);
25     BlockingQueue mainQueue = new ArrayBlockingQueue(1);
26     //這里是匿名構造方法,只要new一個對象都會調用這個匿名構造方法,它與靜態塊不同,靜態塊只會執行一次,
27     //在類第一次加載到JVM的時候執行
28     //這里主要是讓main線程首先put一個,就有東西可以取,如果不加這個匿名構造方法put一個的話程序就死鎖了
29     {
30         try {
31             mainQueue.put(1);
32         } catch (InterruptedException e) {
33             e.printStackTrace();
34         }
35     }
36     public void sub(){
37         try
38         {
39             mainQueue.take();
40             for(int i=0;i<10;i++){
41                 System.out.println(Thread.currentThread().getName() + " : " + i);
42             }
43             subQueue.put(1);
44         }catch(Exception e){
45 
46         }
47     }
48     
49     public void main(){
50         
51         try
52         {
53             subQueue.take();
54             for(int i=0;i<5;i++){
55                 System.out.println(Thread.currentThread().getName() + " : " + i);
56             }
57             mainQueue.put(1);
58         }catch(Exception e){
59         }        
60     }
61 }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM