來自:http://blog.csdn.net/wxwzy738/article/details/8497853
http://blog.csdn.net/cutesource/article/details/6061229
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; public class Test17 { public static void main(String[] args) throws Exception { Test17 t = new Test17(); t.count1(); t.count2(); } //使用阻塞容器保存每次Executor處理的結果,在后面進行統一處理 public void count1() throws Exception{ ExecutorService exec = Executors.newCachedThreadPool(); BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>(); for(int i=0; i<10; i++){ Future<Integer> future =exec.submit(getTask()); queue.add(future); } int sum = 0; int queueSize = queue.size(); for(int i=0; i<queueSize; i++){ sum += queue.take().get(); } System.out.println("總數為:"+sum); exec.shutdown(); } //使用CompletionService(完成服務)保持Executor處理的結果 public void count2() throws InterruptedException, ExecutionException{ ExecutorService exec = Executors.newCachedThreadPool(); CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec); for(int i=0; i<10; i++){ execcomp.submit(getTask()); } int sum = 0; for(int i=0; i<10; i++){ //檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。 Future<Integer> future = execcomp.take(); sum += future.get(); } System.out.println("總數為:"+sum); exec.shutdown(); } //得到一個任務 public Callable<Integer> getTask(){ final Random rand = new Random(); Callable<Integer> task = new Callable<Integer>(){ @Override public Integer call() throws Exception { int i = rand.nextInt(10); int j = rand.nextInt(10); int sum = i*j; System.out.print(sum+"\t"); return sum; } }; return task; } /** * 執行結果: 6 6 14 40 40 0 4 7 0 0 總數為:106 12 6 12 54 81 18 14 35 45 35 總數為:312 */ }
先看一下新建一個ThreadPoolExecutor的構建參數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
看這個參數很容易讓人以為是線程池里保持corePoolSize個線程,如果不夠用,就加線程入池直至maximumPoolSize大小,如果 還不夠就往workQueue里加,如果workQueue也不夠就用RejectedExecutionHandler來做拒絕處理。
但實際情況不是這樣,具體流程如下:
1)當池子大小小於corePoolSize就新建線程,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀
內部結構如下所示:
從中可以發現ThreadPoolExecutor就是依靠BlockingQueue的阻塞機制來維持線程池,當池子里的線程無事可干的時候就通過workQueue.take()阻塞住。
其實可以通過Executes來學學幾種特殊的ThreadPoolExecutor是如何構建的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool就是一個固定大小的ThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool比較適合沒有固定大小並且比較快速就能完成的小任務,沒必要維持一個Pool,這比直接new Thread來處理的好處是能在60秒內重用已創建的線程。
其他類型的ThreadPool看看構建參數再結合上面所說的特性就大致知道它的特性