關於ThreadPoolExecutor的源碼解讀,請參考我的最新博客《ThreadPoolExecutor源碼解讀》。
Java中的線程即是工作單元也是執行機制,從JDK 5后,工作單元與執行機制被分離。工作單元包括Runnable和Callable,執行機制由JDK 5中增加的java.util.concurrent包中Executor框架提供。
HotSpot VM的線程模型中將java的線程映射為本地操作系統的線程,java線程的啟動意味着一個本地操作系統線程的創建,而java線程的終止也就意味着對應的系統線程的回收。
Executor框架主要包含三個部分:
任務:包括Runnable和Callable,其中Runnable表示一個可以異步執行的任務,而Callable表示一個會產生結果的任務
任務的執行:包括Executor框架的核心接口Executor以及其子接口ExecutorService。在Executor框架中有兩個關鍵類ThreadPoolExecutor和ScheduledThreadPoolExecutor實現了ExecutorService接口。
異步計算的結果:包括接口Future和其實現類FutureTask。
下面是對Executor框架中的一些關鍵接口與類的簡介
Executor接口(java.util.concurrent.Executor)
它是Executor的基礎與核心,其定義如下:
public interface Executor { void execute(Runnable command); }
它包含了一個方法execute,參數為一個Runnable接口引用。
Executor接口將任務的提交與執行分離開來。
ThreadPoolExecutor類(java.util.concurrent.ThreadPoolExecutor)
它是線程池的核心實現類,用來執行被提交的任務。
它通常由工廠類Executors來創建,Executors可以創建SingleThreadExecutor,FixedThreadPool以及CachedThreadPool等不同的ThreadPoolExecutor。
SingleThreadExecutor使用單線程執行任務,Executors提供的API有如下兩個
public static ExecutorService newSingleThreadExecutor(); public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);
SingleThreadExecutor保證了任務執行的順序,不會存在多線程活動。
FixedThreadPool是使用固定線程數的線程池,Executors提供的API有如下兩個
1 public static ExecutorService newFixedThreadPool(int nThreads); 2 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
FixedThreadPool滿足了資源管理的需求,可以限制當前線程數量。適用於負載較重的服務器環境。
CachedThreadPool是無界線程池,Executors提供的API有如下兩個
public static ExecutorService newCachedThreadPool(); public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
CachedThreadPool適用於執行很多短期異步任務的小程序,適用於負載較輕的服務器。
ScheduledThreadPoolExecutor類(java.util.concurrent.ScheduledThreadPoolExecutor)
它是ThreadPoolExecutor的子類且實現了ScheduledExecutorService接口,它可以在給定的延遲時間后執行命令,或者定期執行命令,它比Timer更強大更靈活。
Executors可以創建的ScheduledThreadPoolExecutor的類型有ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor等
ScheduledThreadPoolExecutor具有固定線程個數,適用於需要多個后台線程執行周期任務,並且為了滿足資源管理需求而限制后台線程數量的場景,Executors中提供的API有如下兩個:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);
SingleThreadScheduledExecutor具有單個線程,Executors提供的創建API有如下兩個:
public static ScheduledExecutorService newSingleThreadScheduledExecutor(); public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory);
它適用於單個后台線程執行周期任務,並且保證順序一致執行的場景。
上述的ThreadPoolExecutor和ScheduledThreadPoolExecutor都可以用於執行Runnable與Callable接口的實現類
Future接口(Java.concurrent.Future)
Future代表着提交的任務的計算狀態與結果,可以對其進行取消,查詢是否取消,查詢是否完成,查詢結果等操作。
首先來看一下Future接口的定義:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
FutureTask類間接實現了Future接口,它用來表示異步計算的結果。當ThreadPoolExecutor或者ScheduledThreadPoolExecutor執行Runnable接口或者Callable接口的實現類時,它們會返回一個Future接口引用(實現類為FutureTask)。
Runnable接口或者Callable接口的實現類在被上述兩者執行的區別是,前者沒有返回結果,而后者可以返回結果。Runnable可以通過Executors提供的API(Executors#callable)包裝為Callable。
本文參考資料:《Java並發編程的藝術》以及其他網上文檔
-----------------------------------------------------------------------------------------------------------------
下面是一些簡單的Demo程序
1 import java.util.concurrent.Executor; 2 import java.util.concurrent.Executors; 3 4 public class ExecutorTest { 5 public static void main(String[] args) { 6 7 Runnable hello = () -> { 8 for (int i = 0; i < 100; i++) { 9 System.out.println(i + " hello"); 10 } 11 }; 12 Runnable bye = () -> { 13 for (int i = 0; i < 100; i++) { 14 System.out.println(i + " bye"); 15 } 16 }; 17 18 Executor executor = Executors.newCachedThreadPool(); 19 20 executor.execute(hello); 21 executor.execute(bye); 22 23 } 24 }
上面程序使用了兩個Runnable任務hello和bye來打印相應語句,程序將會交錯打印hello和bye。如果將executor改為SingleThreadExecutor,將會先打印100個"hello",再打印100個"bye"。
1 import java.util.ArrayList; 2 import java.util.List; 3 import java.util.Random; 4 import java.util.concurrent.Callable; 5 import java.util.concurrent.ExecutionException; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Future; 9 10 public class ExecutorTest { 11 public static void main(String[] args) { 12 Random random = new Random(); 13 List<Integer> numbers = new ArrayList<>(); 14 for (int i = 0; i < 100000; i++) { 15 numbers.add(random.nextInt(100000)); 16 } 17 int result = calculate(numbers, 3); 18 System.out.println(result); 19 } 20 21 public static int calculate(List<Integer> numbers,int digit) { 22 List<Callable<Integer>> tasks = new ArrayList<>(); 23 for (Integer x : numbers) { 24 tasks.add(() -> { 25 int count=0; 26 int y=x; 27 do { 28 if (y % 10 == digit) { 29 count++; 30 } 31 y /= 10; 32 } while (y > 0); 33 return count; 34 }); 35 } 36 ExecutorService service = Executors.newFixedThreadPool(10); 37 int answer=0; 38 try { 39 List<Future<Integer>> results = service.invokeAll(tasks); 40 for (Future<Integer> result : results) { 41 try { 42 answer+=result.get(); 43 } catch (ExecutionException e) { 44 e.printStackTrace(); 45 } 46 } 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 return answer; 51 } 52 }
上面的程序隨機生成了100000個隨機數,然后統計這些數字中每個數字10進制中具有多少個數位3的數量和。使用具有10個線程的線程池進行計算,最終通過Future引用對象的結果來統計答案。