java自1.5版本之后,提供線程池,供開發人員快捷方便的創建自己的多線程任務。下面簡單的線程池的方法及說明。
1、Executor
線程池的頂級接口。定義了方法execute(Runnable),該方法接收一個Runnable實例,用來執行一個任務,該任務即是一個實現Runnable接口的類。
此服務方法無返回值,原因是因為實現Runnable接口的類的run方法是無返回(void)的。
常用方法 : void execute(execute)
作用 : 啟動並執行線程任務
2、ExecutorService
繼承自Executor接口,提供了更多的方法調用,例如優雅關閉方法shutdown,有返回值的submit。
2.1、ExecutorService生命周期
運行 - Running 、關閉 - shuttingdown、終止 - terminated
Running : 線程池正在執行中,活動狀態。創建后即進入此狀態
shuttingdown : 優雅關閉,線程池正在關閉中。不再接收新的線程任務,已有的任務(正在處理的 + 隊列中阻塞的),處理完畢后,關閉線程池。
調用shutdown()方法,即進入此狀態
terminated : 線程池已關閉。
2.2、submit方法
有返回值,Future類型。重載了方法,submit(Runnable)不需要提供返回值。submit(Callable)、submit(Runnable,T)可以提供線程執行后的結果返回值。
2.3、Future
線程執行完畢結果。獲取線程執行結果是通過get()方法獲取。get()無參,阻塞等待線程執行結束。
get(long timeout, TimeUnit unit)有參,阻塞等待固定時長,超時未獲取,則拋出異常。
2.4、Callable
類似Runnable的一個線程接口。其中的對應run的方法是call方法。此接口提供了線程執行完畢返回值。
package com.cn.cfang.executor; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newFixedThreadPool(1); Data data = new Data(); // Future<Data> future = executorService.submit(new Task(data), data); //runnable Future<Data> future = executorService.submit(new Task1(data)); //callable System.out.println(future.get().getName()); executorService.shutdown(); } } class Data { String name; public String getName() { return name; } public void setName(String name) { this.name = name; } } class Task implements Runnable{ Data data; public Task(Data data) { this.data = data; } @Override public void run() { data.setName("hello world"); } } class Task1 implements Callable<Data>{ Data data; public Task1(Data data) { this.data = data; } @Override public Data call() throws Exception { data.setName("hello world"); return data; } }
3、Executors工具類
提供了很多的工廠方法用於創建線程池,返回的線程池都實現了ExecutorService接口。
線程池屬於進程級的重量級資源,默認的生命周期同JVM一致,當開啟線程池后,直到jvm關閉,是線程池的默認的生命周期。
如果手動調用shutdown方法,可優雅關閉線程池,在當前所有任務執行結束后,關閉線程池。
4、幾種常用的線程池
4.1、FixedThreadPool
容量固定的線程池。使用LinkedBlockingQueue作為任務隊列,當任務數量大於線程池容量的時候,未執行的任務進入任務等待隊列LinkedBlockingQueue中,
當線程有空閑的時候,自動從隊列中取出任務執行。
使用場景: 大多數情況下,推薦使用的線程池。因為os系統和硬件是有線程上限限制的,不可能去無限的提供線程池操作。
4.2、CachedThreadPool
緩存線程池。容量 0-Integer.MAX_VALUE,自動根據任務數擴容:如果線程池中的線程數不滿足任務執行需求,則創建新的線程並添加到池中。
生命周期默認60s,當線程空閑時長到60s的時候,自動終止銷毀釋放線程,移除線程池。
使用場景 : 可用於測試最高負載量,用於對FixedThreadPool容量的參考。
注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT(默認60s)不活動,其會自動被終止。
4.3、ScheduledThreadPool
定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。
public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(3); System.out.println(service); // 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) // runnable - 要執行的任務。
// start_limit - 第一次執行任務的時間間隔
// limit - 多次任務執行的時間間隔
// timeunit - 時間單位 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS); }
4.4、SingleThreadExceutor 單一容量線程池。
4.5、自定義線程池
自定義線程池,可以使用ThreadPoolExecutor類來進行創建管理。線程池中,除了ForkJoinPool外,其他常用的線程池底層,都是使用ThreadPoolExecutor實現的。
參數說明:
corePoolSize:核心線程數,也是最少線程數。在創建線程池時,默認情況下,是不會創建線程池的,也即此時的線程池中線程數為0,直到有任務來臨時,才會去創建線程。當然,手動調用prestartCoreThread()或者prestartAllCoreThreads()方法,可以初始化創建線程池中的線程。默認情況下,當有任務來臨時,就會創建新的線程去處理執行,即使此時線程池中有空閑的線程。當線程數達到corePoolSize時,線程數不增加,此時任務會放入等待隊列BlockingQueue中。
workQueue:阻塞隊列,用來存儲等待執行的任務資源。
maximumPoolSize:最大線程數。當阻塞隊列滿了,開始擴充線程池中的線程數。直到達到此最大值的時候。
handler:當線程池中的線程數等於maximumPoolSize的時候,此時再來任務的話,交由此拒絕策略執行。
keepAliveTime:表示的線程在空閑多長時間后會被終止。默認是在線程數大於corePoolSize才生效,也可以手動設置allowCoreThreadTimeOut()方法讓線程數在不大於 corePoolSize也生效。
public ThreadPoolExecutor( int corePoolSize, //核心容量,創建線程池的時候,默認有多少的線程數。也是最少線程數 int maximumPoolSize, //最大線程數 long keepAliveTime, //線程生命周期,0為永久。當線程空閑多長時間,自動回收。 TimeUnit unit, //生命周期時間單位。 BlockingQueue<Runnable> workQueue, //任務阻塞隊列。
RejectedExecutionHandler handler
) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
簡單例子:
package com.cn.cfang.executor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Test2 { public static void main(String[] args){ //創建等待隊列 BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); //創建線程池,池中保存的線程數為3,允許的最大線程數為5 ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); //創建七個任務 Runnable t1 = new MyThread(); Runnable t2 = new MyThread(); Runnable t3 = new MyThread(); Runnable t4 = new MyThread(); Runnable t5 = new MyThread(); Runnable t6 = new MyThread(); Runnable t7 = new MyThread(); //每個任務會在一個線程上執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); pool.execute(t6); pool.execute(t7); //關閉線程池 pool.shutdown(); } } class MyThread implements Runnable{ @Override public void run(){ System.out.println(Thread.currentThread().getName() + "正在執行。。。"); try{ Thread.sleep(100); }catch(InterruptedException e){ e.printStackTrace(); } } }
5、forkjoin框架
拆分合並,將一個大的任務,拆分成若干子任務,並最終匯總子任務的執行結果,得到大任務的執行結果。並行執行,采用工作竊取機制,更加有效的利用cpu資源。
5.1、主要類
ForkJoinPool : 用於執行Task。任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。
當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他未完成工作線程的隊列的尾部獲取一個任務。
ForkJoinTask:ForkJoin任務,提供在任務中執行fork()和join()操作的機制(二叉分邏輯),通常不直接繼承ForkJoinTask類,
而是繼承抽象子類RecursiveTask(有返回結果) 或者 RecursiveAction (無返回結果)。
ForkJoinWorkerThread:ForkJoinPool 內部的worker thread,用來具體執行ForkJoinTask。內部有 ForkJoinPool.WorkQueue,來保存要執行的 ForkJoinTask。
ForkJoinPool.WorkQueue:保存要執行的ForkJoinTask。
5.2、工作竊取機制
1、大任務分割成N個子任務,為避免線程競爭,於是分開幾個隊列去保存這些子任務,並為每個隊列提供一個工作線程去處理其中的任務。工作線程與任務隊列一一對應。
2、如果A線程執行完自己隊列中的所有任務,如果此時其他隊列中還有未執行的任務,則A線程會去竊取一個其他隊列的任務來執行。但是,此時兩個線程同時訪問,
可能會產生競爭問題,所以,任務隊列設計成了雙向隊列。A線程竊取的時候,從另一端開始執行,盡可能的去避免線程競爭問題。
3、工作竊取機制,充分的利用線程資源,並盡可能的去避免線程間的競爭問題。但是,只能是盡可能避免,並不能規避。例如,雙向隊列只有一個任務。
5.3、簡單使用
例:求和 0 - 10000000000L。
package com.cn.cfang.executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * 步驟: * 1、建立任務類Task,繼承RecursiveTask或者RecursiveAction。需要返回值則選用RecursiveTask,無需返回值選用RecursiveAction * 2、任務類Task,滿足一定的閾值,則對子任務進行計算,不滿足,則二叉分后,遞歸調用自身 * 3、調用中,新建ForkJoinPool對象,新建任務類對象Task,將任務類對象Task放入ForkJoinPool中執行。 * 如果需要返回值,則可以invoke或者Future-submit。 * @author cfang * 2018年5月15日 上午10:51:03 */ public class Test3 { public static void main(String[] args) throws Exception{ ForkJoinPool pool = new ForkJoinPool(); ForkJoinWorkTask task = new ForkJoinWorkTask(0l, 10000000l); // Long result = pool.invoke(task); // System.out.println(result); Future<Long> future = pool.submit(task); System.out.println(future.get()); } } class ForkJoinWorkTask extends RecursiveTask<Long>{ private static final long serialVersionUID = 1L; private Long start; //起始 private Long end; //終止 private static final Long THRESHOLD = 10000L; //子任務分割閾值 public ForkJoinWorkTask(Long start, Long end){ this.start = start; this.end = end; } @Override protected Long compute() { Long sum = 0l; if(end - start <= THRESHOLD){ //足夠小的子任務,進行計算求和 for(Long i = start; i < end; i++){ sum += i; } }else{ //任務拆分不滿足,繼續拆分(二叉分邏輯) Long middle = (start + end) / 2; ForkJoinWorkTask rightTask = new ForkJoinWorkTask(start, middle); rightTask.fork(); ForkJoinWorkTask leftTask = new ForkJoinWorkTask(middle + 1, end); leftTask.fork(); sum = rightTask.join() + leftTask.join(); } return sum; } }