java.util.concurrent.ExecutorService接口提供了許多線程管理的方法
Method | 說明 |
shutdown | 拒絕接收新的任務,待已提交的任務執行后關閉,且宿主線程不阻塞,若需要阻塞可借助awaitTermination實現 |
shutdownNow | 停止所有正在執行的任務,掛起未執行的任務並關閉,且宿主線程不阻塞,若需要阻塞可借助awaitTermination實現 |
awaitTermination | 當發生shutdown時,阻塞宿主線程直到約定的時間已過或者所有任務完成 |
submit | 提交任務Callable/Runnable,可利用Future的get()方法使宿主線程阻塞直到任務結束后返回結果 |
有了以上方法,便可以基於此接口實現線程池的各種功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor為例,其參數的詳解
Name | Type | 說明 |
corePoolSize | int | 線程池中最小的線程數 |
maximumPoolSize | int | 線程池中最大的線程數 |
keepAliveTime | long | 線程空閑時間,若線程數大於corePoolSize,空閑時間超過該值的線程將被終止回收 |
unit | TimeUnit | keepAliveTime的時間單位 |
workQueue | BlockingQueue<Runnable> | 已提交但未執行的任務隊列 |
threadFactory | ThreadFactory | 創建新線程的工廠 |
handler | RejectedExecutionHandler | 當線程池或隊列達到上限拒絕新任務拋出異常時的處理類 |
同時,java.util.concurrent.Executors類提供的常用方法有
Method | 說明 | 基類 |
newFixedThreadPool | 線程池中含固定數量的線程 | 基於java.util.concurrent.ThreadPoolExecutor類 |
newSingleThreadExecutor | 線程池中僅含一個工作線程 | |
newCachedThreadPool | 按需創建線程,若線程池中無可用線程,則創建新的線程並加入,直到線程數達到上限值(Integer.MAX_VALUE) | |
newWorkStealingPool | 按照可用CPU數創建線程池 | 基於java.util.concurrent.ForkJoinPool類 |
java.util.concurrent.ForkJoinPool類是Fork/Join框架的實現類,Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架,該類在有遞歸實現的場景有更優異的表現。
package com.concurrent.test; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; /** * 測試ExecutorService */ public class ThreadExecutorServiceTest { private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination"; private static final int RESULT = 111; private static boolean submitRunable() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<?> future = executorService.submit(new Runnable() { @Override public void run() { System.out.println("This is submitRunnable"); } }); return future.get() == null; } private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new Runnable() { @Override public void run() { System.out.println("This is submitRunnableWithResult"); } }, RESULT); return future.get(); } private static Integer submitBlockCallable() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("This is submitBlockCallable"); return RESULT; } }); return future.get(); //阻塞 } private static boolean submitNonBlockCallable() { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("This is submitNonBlockCallable"); return RESULT; } }); while (!future.isDone()) {// 非阻塞 System.out.println(new Date()); } return future.isDone(); } private static String shutdown() { ExecutorService executorService = Executors.newSingleThreadExecutor(); final StringBuilder sb = new StringBuilder(); executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { Thread.sleep(10000); sb.append("This is shutdown"); return RESULT; } }); executorService.shutdown(); return sb.toString(); } private static String shutdownWithAwaitTermination() throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); final StringBuilder sb = new StringBuilder(); executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { Thread.sleep(10000); sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION); return RESULT; } }); executorService.shutdown(); executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); return sb.toString(); } private static int testForkJoinPool(List<Integer> list) throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(8); Future<Integer> future = forkJoinPool.submit(new SumTask(list)); return future.get(); } @Test public void test() throws InterruptedException, ExecutionException { Assert.assertTrue(submitRunable()); Assert.assertEquals(RESULT, submitRunnableWithResult().intValue()); Assert.assertEquals(RESULT, submitBlockCallable().intValue()); Assert.assertTrue(submitNonBlockCallable()); Assert.assertTrue(shutdown().isEmpty()); Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination()); Assert.assertEquals(10, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4 }))); Assert.assertEquals(49, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }))); Assert.assertEquals(60, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 }))); } }
SumTask類如下:
package com.concurrent.test; import java.util.List; import java.util.concurrent.RecursiveTask; public class SumTask extends RecursiveTask<Integer> { /** * */ private static final long serialVersionUID = -5468389855825594053L; private List<Integer> list; public SumTask(List<Integer> list) { this.list = list; } /* * Ensure it is necessary to divide the job to parts and finish them separately */ @Override protected Integer compute() { int rtn , size = list.size(); if (size < 10) { rtn = sum(list); } else{ SumTask subTask1 = new SumTask(list.subList(0, size /2)); SumTask subTask2 = new SumTask(list.subList(size /2 + 1, size)); subTask1.fork(); subTask2.fork(); rtn = subTask1.join() + subTask2.join(); } return rtn; } private int sum(List<Integer> list) { int sum = 0; for (Integer integer : list) { sum += integer; } return sum; } }