多線程批量執行等待全部結果


來自:http://blog.csdn.net/wxwzy738/article/details/8497853

       http://blog.csdn.net/cutesource/article/details/6061229

 

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class Test17 {
    public static void main(String[] args) throws Exception {
        Test17 t = new Test17();
        t.count1();
        t.count2();
    }
//使用阻塞容器保存每次Executor處理的結果,在后面進行統一處理
    public void count1() throws Exception{
        ExecutorService exec = Executors.newCachedThreadPool();
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
        for(int i=0; i<10; i++){
            Future<Integer> future =exec.submit(getTask());
            queue.add(future);
        }
        int sum = 0;
        int queueSize = queue.size();
        for(int i=0; i<queueSize; i++){
            sum += queue.take().get();
        }
        System.out.println("總數為:"+sum);
        exec.shutdown();
    }
//使用CompletionService(完成服務)保持Executor處理的結果
    public void count2() throws InterruptedException, ExecutionException{
        ExecutorService exec = Executors.newCachedThreadPool();
        CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
        for(int i=0; i<10; i++){
            execcomp.submit(getTask());
        }
        int sum = 0;
        for(int i=0; i<10; i++){
//檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。
            Future<Integer> future = execcomp.take();
            sum += future.get();
        }
        System.out.println("總數為:"+sum);
        exec.shutdown();
    }
    //得到一個任務
    public Callable<Integer> getTask(){
        final Random rand = new Random();
        Callable<Integer> task = new Callable<Integer>(){
            @Override
            public Integer call() throws Exception {
                int i = rand.nextInt(10);
                int j = rand.nextInt(10);
                int sum = i*j;
                System.out.print(sum+"\t");
                return sum;
            }
        };
        return task;
    }
    /**
     * 執行結果:
        6    6    14    40    40    0    4    7    0    0    總數為:106
        12    6    12    54    81    18    14    35    45    35    總數為:312
     */
}

先看一下新建一個ThreadPoolExecutor的構建參數:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

 

看這個參數很容易讓人以為是線程池里保持corePoolSize個線程,如果不夠用,就加線程入池直至maximumPoolSize大小,如果 還不夠就往workQueue里加,如果workQueue也不夠就用RejectedExecutionHandler來做拒絕處理。

但實際情況不是這樣,具體流程如下:

1)當池子大小小於corePoolSize就新建線程,並處理請求

2)當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理

3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理

4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀

內部結構如下所示:

從中可以發現ThreadPoolExecutor就是依靠BlockingQueue的阻塞機制來維持線程池,當池子里的線程無事可干的時候就通過workQueue.take()阻塞住。

其實可以通過Executes來學學幾種特殊的ThreadPoolExecutor是如何構建的。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

 

newFixedThreadPool就是一個固定大小的ThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

 

newCachedThreadPool比較適合沒有固定大小並且比較快速就能完成的小任務,沒必要維持一個Pool,這比直接new Thread來處理的好處是能在60秒內重用已創建的線程。

其他類型的ThreadPool看看構建參數再結合上面所說的特性就大致知道它的特性


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM