Java線程之CompletionService批處理任務


如果你向Executor提交了一個批處理任務,並且希望在它們完成后獲得結果,怎么辦呢?

為此你可以保存與每個任務相關聯的Future,然后不斷地調用 timeout為零的get,來檢驗Future是否完成。這樣做固然可以,但卻相當乏味。幸運的是,還有一個更好的方法:完成服務 (Completion service)。

CompletionService整合了Executor和BlockingQueue的功能。

你可以將Callable任務提交給它去執行,然 后使用類似於隊列中的take和poll方法,在結果完整可用時獲得這個結果,像一個打包的Future。 ExecutorCompletionService是實現CompletionService接口的一個類,並將計算任務委托給一個Executor。

 

ExecutorCompletionService的實現相當直觀。它在構造函數中創建一個BlockingQueue,用它去保持完成的結果。 計算完成時會調用FutureTask中的done方法。

當提交一個任務后,首先把這個任務包裝為一個QueueingFuture,它是 FutureTask的一個子類,然后覆寫done方法,將結果置入BlockingQueue中,take和poll方法委托給了 BlockingQueue,它會在結果不可用時阻塞。

直接看demo:

package javademo;

import java.util.Random;  

import java.util.concurrent.BlockingQueue;  

import java.util.concurrent.Callable;  

import java.util.concurrent.CompletionService;  

import java.util.concurrent.ExecutionException;  

import java.util.concurrent.ExecutorCompletionService;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import java.util.concurrent.Future;  

import java.util.concurrent.LinkedBlockingQueue;  

  /***

   * 兩鍾方式出來線程運行結果

   * @author think

   *

   */

public class CompletionServiceTest {  

    public static void main(String[] args) throws Exception {  

    CompletionServiceTest cst = new CompletionServiceTest();  

    cst.count1();  

    cst.count2();  

    }  

      /***

        * 使用阻塞容器保存每次Executor處理的結果,在后面進行統一處理  

        * @throws Exception

        */

    public void count1() throws Exception{  

        ExecutorService exec = Executors.newCachedThreadPool();  

        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  

        for(int i=0; i<10; i++){  

            Future<Integer> future =exec.submit(getTask());  

            queue.add(future);  

        }  

        int sum = 0;  

        int queueSize = queue.size();  

        for(int i=0; i<queueSize; i++){  

            sum += queue.take().get();  

        }  

        System.out.println("總數為:"+sum);  

        exec.shutdown();  

    }  

/***

 * 使用CompletionService(完成服務)保持Executor處理的結果  

 * @throws InterruptedException

 * @throws ExecutionException

 */

    public void count2() throws InterruptedException, ExecutionException{  

        ExecutorService exec = Executors.newCachedThreadPool();  

        CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  

        for(int i=0; i<10; i++){  

            execcomp.submit(getTask());  

        }  

        int sum = 0;  

        for(int i=0; i<10; i++){  

           //檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。  

            Future<Integer> future = execcomp.take();  

            sum += future.get();  

        }  

        System.out.println("總數為:"+sum);  

        exec.shutdown();  

    }  

    /**

     * 得到一個任務  

     * @return Callable

     */

    public Callable<Integer> getTask(){  

        final Random rand = new Random();  

        Callable<Integer> task = new Callable<Integer>(){  

            @Override  

            public Integer call() throws Exception {  

                int i = rand.nextInt(10);  

                int j = rand.nextInt(10);  

                int sum = i*j;  

                System.out.print(sum+"\t");  

                return sum;  

            }  

        };  

        return task;  

    }  

}  

 

  1. import java.util.Random;  
  2. import java.util.concurrent.BlockingQueue;  
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.CompletionService;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorCompletionService;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11.   
  12. public class Test17 {  
  13.     public static void main(String[] args) throws Exception {  
  14.         Test17 t = new Test17();  
  15.         t.count1();  
  16.         t.count2();  
  17.     }  
  18. //使用阻塞容器保存每次Executor處理的結果,在后面進行統一處理  
  19.     public void count1() throws Exception{  
  20.         ExecutorService exec = Executors.newCachedThreadPool();  
  21.         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
  22.         for(int i=0; i<10; i++){  
  23.             Future<Integer> future =exec.submit(getTask());  
  24.             queue.add(future);  
  25.         }  
  26.         int sum = 0;  
  27.         int queueSize = queue.size();  
  28.         for(int i=0; i<queueSize; i++){  
  29.             sum += queue.take().get();  
  30.         }  
  31.         System.out.println("總數為:"+sum);  
  32.         exec.shutdown();  
  33.     }  
  34. //使用CompletionService(完成服務)保持Executor處理的結果  
  35.     public void count2() throws InterruptedException, ExecutionException{  
  36.         ExecutorService exec = Executors.newCachedThreadPool();  
  37.         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
  38.         for(int i=0; i<10; i++){  
  39.             execcomp.submit(getTask());  
  40.         }  
  41.         int sum = 0;  
  42.         for(int i=0; i<10; i++){  
  43. //檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。  
  44.             Future<Integer> future = execcomp.take();  
  45.             sum += future.get();  
  46.         }  
  47.         System.out.println("總數為:"+sum);  
  48.         exec.shutdown();  
  49.     }  
  50.     //得到一個任務  
  51.     public Callable<Integer> getTask(){  
  52.         final Random rand = new Random();  
  53.         Callable<Integer> task = new Callable<Integer>(){  
  54.             @Override  
  55.             public Integer call() throws Exception {  
  56.                 int i = rand.nextInt(10);  
  57.                 int j = rand.nextInt(10);  
  58.                 int sum = i*j;  
  59.                 System.out.print(sum+"\t");  
  60.                 return sum;  
  61.             }  
  62.         };  
  63.         return task;  
  64.     }  
  65.     /** 
  66.      * 執行結果: 
  67.         6   6   14  40  40  0   4   7   0   0   總數為:106 
  68.         12  6   12  54  81  18  14  35  45  35  總數為:312 
  69.      */  
  70. }  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM