Callable、Future與線程池
在創建新線程的三種方式中,繼承Thread
和實現Runnable接口
兩種方式都都沒有返回值,因此當我們想要獲取子線程計算結果時只能設置共享數據,同時還需要考慮同步的問題,比較麻煩。而Callable接口就是解決這個問題的存在。
Callable
Callable和Runnable類似,都是只有一個方法的標志性接口:V call()
只不過Callable是有返回值的,並且聲明了異常Expection,即當計算正常進行則返回v,計算出錯則拋出異常。
單獨一個Callable並沒有什么可說的,該接口一般都是配合Future
接口進行使用。
Future接口
Future是對Callable任務進行處理的接口,里面有對任務操作的方法:
//獲取結果,若無結果會阻塞至異步計算完成
V get()
//獲取結果,超時返回null
V get(long timeOut, TimeUnit unit)
//執行結束(完成/取消/異常)返回true
boolean isDone()
//任務完成前被取消返回true
boolean isCancelled()
//取消任務,未開始或已完成返回false,參數表示是否中斷執行中的線程
boolean cancel(boolean mayInterruptRunning)
其中,對於
boolean cancel(boolean mayInterruptRunning)
方法的參數:
簡單來說,傳入false參數只能取消還沒有開始的任務,若任務已經開始了,就任由其運行下去。
當創建了Future實例,任務可能有以下三種狀態:
等待狀態。此時調用cancel()方法不管傳入true還是false都會標記為取消,任務依然保存在任務隊列中,但當輪到此任務運行時會直接跳過。
完成狀態。此時cancel()不會起任何作用,因為任務已經完成了。
運行中。此時傳入true會中斷正在執行的任務,傳入false則不會中斷。
Future的實現子類為FutureTask<V>
,該類即實現了Future接口,也實現了Runnable接口,因此Callable可配合FutureTask使用:
Callable<Integer> c = ()->{
Thread.sleep(3000);//模擬計算
return 100;//返回計算結果
};
//實例化FutureTask,注意這里不能使用Future的多態形式,因為只有FutureTask實現了Runnable接口
FutureTask<Integer> ft = new FutureTask<>(c);
//啟動線程
new Thread(ft).start();
//獲取計算結果,注意這里會阻塞
System.out.println(ft.get());
FutureTask提供了兩種構造方法:
//上例使用的就是這個,參數為Callable
public FutureTask(Callable<V> callable) {
}
//當Runnable執行成功時返回result(這有個毛用啊。。。)
public FutureTask(Runnable runnable, V result) {
}
FutureTask可以很方便的啟動線程。
線程池
除了使用FutureTask,還可以使用Callable + Future + 線程池
的方式執行Callable:
Callable<Integer> c = ()->{
Thread.sleep(3000);
return 100;
};
//構建定長線程池
ExecutorService service = Executors.newFixedThreadPool(10);
//在線程池中提交Callable時會返回Future對象
Future<Integer> future = service.submit(c);
System.out.println(future.get());
上例的線程池創建方式只是為了方便,在阿里開發手冊中要求:
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
對於Executors提供的幾種線程池分別為:
newSingleThreadExecutor
創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。
此線程池保證所有任務的執行順序按照任務的提交順序執行。
new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
newFixedThreadPool
創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newCachedThreadPool
創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。
此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
newScheduledThreadPool
創建一個定長線程池,支持定時及周期性任務執行。除了延遲執行之外和newFixedThreadPool基本相同,可以用來執行定時任務
上面四種方式只是比較方便,阿里開發手冊要求不能使用這些方式,那么來看看阿里要求的線程池方式:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
參數分別為:
corePoolSize
- 線程池核心池的大小。maximumPoolSize
- 線程池的最大線程數。keepAliveTime
- 當線程數大於核心時,此為終止前多余的空閑線程等待新任務的最長時間。unit
- keepAliveTime 的時間單位。workQueue
- 用來儲存等待執行任務的隊列。threadFactory
- 線程工廠。handler
- 拒絕策略
關注點1 線程池大小
線程池有兩個線程數的設置,一個為核心池線程數,一個為最大線程數。
在創建了線程池后,默認情況下,線程池中並沒有任何線程,等到有任務來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法
當創建的線程數等於 corePoolSize 時,會加入設置的阻塞隊列。當隊列滿時,會創建線程執行任務直到線程池中的數量等於maximumPoolSize。
關注點2 適當的阻塞隊列
java.lang.IllegalStateException: Queue full
方法 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek()
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
DelayQueue: 一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue: 一個不存儲元素的阻塞隊列。
LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。
關注點3 明確拒絕策略
ThreadPoolExecutor.AbortPolicy: 丟棄任務並拋出
RejectedExecutionException異常。 (默認)
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務