一、介紹
ExecutorService是java.util.concurrent包中的一個線程池實現接口。其有兩個實現類:
1)ThreadPoolExecutor:普通線程池通過配置線程池大小,能有效管理線程的調度,在執行大量異步線程時提高程序的性能。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- corePoolSize:核心線程數,如果運行的線程少於corePoolSize,則創建新線程來執行新任務,即使線程池中的其他線程是空閑的
- maximumPoolSize:最大線程數,可允許創建的線程數,corePoolSize和maximumPoolSize設置的邊界自動調整池大小:
- corePoolSize <運行的線程數< maximumPoolSize:僅當隊列滿時才創建新線程
- corePoolSize=運行的線程數= maximumPoolSize:創建固定大小的線程池
- keepAliveTime:如果線程數多於corePoolSize,則這些多余的線程的空閑時間超過keepAliveTime時將被終止
- unit:keepAliveTime參數的時間單位
- workQueue:保存任務的阻塞隊列,與線程池的大小有關:
- 當運行的線程數少於corePoolSize時,在有新任務時直接創建新線程來執行任務而無需再進隊列
- 當運行的線程數等於或多於corePoolSize,在有新任務添加時則選加入隊列,不直接創建線程
- 當隊列滿時,在有新任務時就創建新線程
- threadFactory:使用ThreadFactory創建新線程,默認使用defaultThreadFactory創建線程
- handle:定義處理被拒絕任務的策略,默認使用ThreadPoolExecutor.AbortPolicy,任務被拒絕時將拋出RejectExecutorException
2)ScheduledThreadPoolExecutor:執行延遲任務和周期性任務。
二、ExecutorService種類
1、newSingleThreadExecutor
由數可知,創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,沒有被執行的線程先排在等待隊列中,而且先放入線程池的先執行
示例:
package executorservice.demo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author boshen * @date 2018/12/20 */ public class SingleThreadExecutorTest { class StudentThread implements Runnable{ private String name; StudentThread(String name){ this.name = name; } public void run(){ System.out.println("學生:" + name + " 開始吃飯"); try { Thread.sleep(3000); System.out.println("學生:" + name + " 吃完飯了"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args){ SingleThreadExecutorTest cb = new SingleThreadExecutorTest(); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(cb.new StudentThread("張三")); executorService.submit(cb.new StudentThread("李四")); executorService.shutdown(); } }
學生:張三 開始吃飯
學生:張三 吃完飯了
學生:李四 開始吃飯
學生:李四 吃完飯了
2、newFixedThreadPool
創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待
示例:
package executorservice.demo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author boshen * @date 2018/12/20 */ public class FixedThreadPoolTest { class StudentThread implements Runnable{ private String name; StudentThread(String name){ this.name = name; } public void run(){ System.out.println("學生:" + name + " 開始吃飯"); try { Thread.sleep(2000); System.out.println("學生:" + name + " 吃完飯了"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args){ FixedThreadPoolTest cb = new FixedThreadPoolTest(); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(cb.new StudentThread("張三")); executorService.submit(cb.new StudentThread("李四")); executorService.submit(cb.new StudentThread("王五")); executorService.submit(cb.new StudentThread("馬六")); executorService.shutdown(); } }
學生:李四 開始吃飯
學生:張三 開始吃飯
學生:李四 吃完飯了
學生:張三 吃完飯了
學生:王五 開始吃飯
學生:馬六 開始吃飯
學生:馬六 吃完飯了
學生:王五 吃完飯了
3、newCachedThreadPool
創建可緩存的線程池,如果線程池中的線程在60秒未被使用就將被移除,在執行新的任務時,當線程池中有之前創建的可用線程就重用可用線程,否則就新建一條線程
示例:
package executorservice.demo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author boshen * @date 2018/12/20 */ public class CachedThreadPoolTest { class StudentThread1 implements Runnable{ private String name; StudentThread1(String name){ this.name = name; } public void run(){ System.out.println("學生:" + name + " 開始吃飯,線程名為:"+Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } class StudentThread2 implements Runnable{ private String name; StudentThread2(String name){ this.name = name; } public void run(){ System.out.println("學生:" + name + " 開始吃飯,線程名為:"+Thread.currentThread().getName()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args){ CachedThreadPoolTest cb = new CachedThreadPoolTest(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(cb.new StudentThread1("張三")); executorService.submit(cb.new StudentThread1("李四")); executorService.submit(cb.new StudentThread2("王五")); executorService.submit(cb.new StudentThread2("馬六")); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } executorService.submit(cb.new StudentThread1("趙七")); executorService.submit(cb.new StudentThread1("楊八")); executorService.shutdown(); } }
學生:張三 開始吃飯,線程名為:pool-1-thread-1 學生:王五 開始吃飯,線程名為:pool-1-thread-3 學生:馬六 開始吃飯,線程名為:pool-1-thread-4 學生:李四 開始吃飯,線程名為:pool-1-thread-2 學生:趙七 開始吃飯,線程名為:pool-1-thread-2 學生:楊八 開始吃飯,線程名為:pool-1-thread-1
由結果可知:
張三和李四執行時間為2秒,王五和馬六執行時間為10秒,提交了前4個線程之后隔了4秒提交趙七和楊八的線程,這時候張三和李四已經執行完了。
所以張三的線程pool-1-thread-1繼續執行楊八,李四的線程pool-1-thread-2繼續執行趙七。並沒有多創建出來pool-1-thread-5和pool-1-thread-6
4、newScheduledThreadPool
創建一個定長線程池,支持定時及周期性任務執行
- Executors.newScheduledThreadPool(int corePoolSize),corePoolSize表示線程容量。
- schedule(Callable/Runnable command,long initialDelay,TimeUnit unit):第一個參數任務,第二個參數表示執行任務前等待的時間,第三參數表示時間單位。
- scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):第一個參數表示周期線執行的任務,第二個參數表示第一次執行前的延遲時間,第三個參數表示任務啟動間隔時間,第四個參數表示時間單位。雖然任務類型是Runnable但該方法有返回值ScheduledFuture。可以通過該對象獲取線程信息。
- scheduleWithFixedDelay(Runnable command,long initialDelay,long period,TimeUnit unit):與scheduleAtFixedRate方法類似,不同的是第三個參數表示前一次結束的時間和下一次任務啟動的間隔時間
示例
package executorservice.demo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author boshen * @date 2018/12/20 */ public class ScheduledThreadPoolTest { class StudentThread implements Runnable{ private String name; StudentThread(String name){ this.name = name; } public void run(){ try { System.out.println("學生:" + name + " 開始吃飯,線程名為:"+Thread.currentThread().getName()); Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args){ ScheduledThreadPoolTest cb = new ScheduledThreadPoolTest(); ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); //當以下線程提交后要2秒后才執行,只執行一次 executorService.schedule(cb.new StudentThread("張三"),2000, TimeUnit.MILLISECONDS); //當以下線程提交后要2秒后才執行,每3秒執行一次,直到調用了executorService.shutdown(); executorService.scheduleWithFixedDelay(cb.new StudentThread("李四"),2,3,TimeUnit.SECONDS); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); } }
學生:李四 開始吃飯,線程名為:pool-1-thread-2 學生:張三 開始吃飯,線程名為:pool-1-thread-1 學生:李四 開始吃飯,線程名為:pool-1-thread-2 學生:李四 開始吃飯,線程名為:pool-1-thread-2 學生:李四 開始吃飯,線程名為:pool-1-thread-2 學生:李四 開始吃飯,線程名為:pool-1-thread-2
三、ExecutorService的幾個方法區別
1、execute(Runnable),無法取得返回值
public static void main(String[] args){ ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("該異步任務無返回值"); } }); executorService.shutdown(); }
2、submit(Runnable),返回一個 Future 對象。這個 Future 對象可以用來檢查 Runnable 是否已經執行完畢,但是也無法取得run方法里面想要返回的值因為run方法為void
public static void main(String[] args){ ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Runnable() { public void run() { try { Thread.sleep(10000); System.out.println("該任務執行了10秒"); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("主線程中獲取子線程的執行狀態:如果返回null表示執行正確完成"); try { System.out.println(future.get());//線程沒有執行完之前,會阻塞在這里 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executorService.shutdown(); }
主線程中獲取子線程的執行狀態:如果返回null表示執行正確完成 該任務執行了10秒 null
3、submit(Callable),返回一個 Future 對象。這個 Future 對象可以返回線程中call方法里面return的對象
public static void main(String[] args){ ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(new Callable() { public Object call() throws Exception { Thread.sleep(10000); System.out.println("該任務執行了10秒"); return "call 返回的值"; } }); System.out.println("主線程中獲取子線程的結果:"); try { System.out.println(future.get());//線程沒有執行完之前,會阻塞在這里 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executorService.shutdown(); }
主線程中獲取子線程的結果:
該任務執行了10秒
call 返回的值
4、invokeAll(Collection<? extends Callable<T>> tasks),參數是加入線程池的所有Callable,返值是List<Future<T>>,表示返回執行后的一系列Callable的執行結果
package executorservice.demo; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author Administrator * @date 2018/12/27 */ public class InvokeAllTest { class StudentThread implements Callable{ private String name; StudentThread(String name){ this.name = name; } public Object call() throws Exception{ System.out.println("學生:" + name + " 開始吃飯,線程名為:"+Thread.currentThread().getName()); return "result: 學生"+name+"吃完飯了"; } } public static void main(String[] args){ InvokeAllTest invokeAllTest = new InvokeAllTest(); ExecutorService executorService = Executors.newCachedThreadPool(); List<Callable<String>> callables = new ArrayList<Callable<String>>(); callables.add(invokeAllTest.new StudentThread("張三")); callables.add(invokeAllTest.new StudentThread("李四")); callables.add(invokeAllTest.new StudentThread("王五")); callables.add(invokeAllTest.new StudentThread("馬六")); try { List<Future<String>> futures = executorService.invokeAll(callables); for(Future<String> future:futures){ System.out.println(future.get()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executorService.shutdown(); } }
學生:張三 開始吃飯,線程名為:pool-1-thread-1 學生:王五 開始吃飯,線程名為:pool-1-thread-3 學生:李四 開始吃飯,線程名為:pool-1-thread-2 學生:馬六 開始吃飯,線程名為:pool-1-thread-4 result: 學生張三吃完飯了 result: 學生李四吃完飯了 result: 學生王五吃完飯了 result: 學生馬六吃完飯了
四、ExecutorService 關閉
ExecutorService 的 shutdown() 方法。並不會立即關閉,但它將不再接受新的任務,而且一旦所有線程都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown() 被調用之前所有提交給 ExecutorService 的任務都被執行。
如果你想要立即關閉 ExecutorService,你可以調用 shutdownNow() 方法。這樣會立即嘗試停止所有執行中的任務,並忽略掉那些已提交但尚未開始處理的任務。無法擔保執行任務的正確執行。可能它們被停止了,也可能已經執行結束。