線程池使用FutureTask的時候如果拒絕策略設置為了DiscardPolicy和DiscardOldestPolicy並且在被拒絕的任務的Future對象上調用無參get方法那么調用線程會一直被阻塞。
問題復現
public static void main(String[] args) throws Exception {
//(1)線程池單個線程,線程池隊列元素個數為1
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy());
//(2)添加任務one
Future futureOne = executorService.submit(new Runnable() {
@Override public void run() {
System.out.println("start runable one");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//(3)添加任務two
Future futureTwo = executorService.submit(new Runnable() {
@Override public void run() {
System.out.println("start runable two");
}
});
//(4)添加任務three
Future futureThree = null;
try {
futureThree = executorService.submit(new Runnable() {
@Override public void run() {
System.out.println("start runable three");
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getLocalizedMessage());
}
System.out.println("task one finish " + futureOne.get());//(5)等待任務one執行完畢
System.out.println("task two finish " + futureTwo.get());//(6)等待任務two執行完畢
System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任務three執行完畢
executorService.shutdown();//(8)關閉線程池,阻塞直到所有任務執行完畢
}
運行結果:
start runable one
task one finish null
start runable two
task two finish null
創建了一個單線程並且隊列元素個數為1的線程池,並且拒絕策略設置為了DiscardPolicy;先提交任務one,這個任務會使用唯一的一個線程進行執行,任務在打印 start runable one后會阻塞該線程5s;再向線程池提交了一個任務two,這時候會把任務two放入到阻塞隊列;提交任務three時,由於隊列已經滿了則會觸發拒絕策略丟棄任務three。
從運行結果看在任務one阻塞的5s內,主線程執行到了代碼(5)等待任務one執行完畢,當任務one執行完畢后代碼(5)返回,主線程打印出task one finish null。之后線程池的唯一線程會去隊列里面取出任務two並執行所以輸出start runable two然后代碼(6)會返回,這時候主線程輸出task two finish null,然后執行代碼(7)等待任務three執行完畢,從執行結果看代碼(7)會一直阻塞不會返回。
至此問題產生,如果把拒絕策略修改為DiscardOldestPolicy也會存在有一個任務的get方法一直阻塞只是現在是任務two被阻塞:
start runable one
task one finish null
start runable three
但是如果拒絕策略設置為默認的AbortPolicy則會拋出RejectedExecutionException並正常返回。
問題分析
要分析這個問題需要看下線程池的submit方法里面做了什么,submit方法代碼如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
newTaskFor()把Runnable轉為FutureTask對象,FutureTask實現RunnableFuture接口,繼續跟execute:
public void execute(Runnable command) {
// ...
// 如果線程個數消息核心線程數則新增處理線程處理
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果當前線程個數已經達到核心線程數則任務放入隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 嘗試新增處理線程進行處理
else if (!addWorker(command, false))
reject(command);// 新增失敗則調用拒絕策略
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
再來看下拒絕策略DiscardPolicy的代碼:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
這里rejectedExecution方法里面什么都沒做,所以代碼(4)調用submit后會返回一個future對象,即FutureTask:
public class FutureTask<V> implements RunnableFuture<V> {
/** * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state標識FutureTask的狀態,初始狀態是New。因此使用DiscardPolicy策略提交后返回了一個狀態為NEW的FutureTask對象。
那么下面就需要看下當調用future的無參get方法時候當future變為什么狀態時候才會返回:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL) // 狀態值為NORMAL正常返回
return (V)x;
if (s >= CANCELLED) // 狀態值大於等於CANCELLED則拋異常
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
也就是說當future的狀態>COMPLETING時候調用get方法才會返回,而明顯DiscardPolicy策略在拒絕元素的時候並沒有設置該future的狀態,后面也沒有其他機會可以設置該future的狀態,所以future的狀態一直是NEW,所以一直不會返回,同理DiscardOldestPolicy策略也是這樣的問題,最老的任務被淘汰時候沒有設置被淘汰任務對於future的狀態,也會導致一直不會返回。
那么默認的AbortPolicy策略為啥沒問題那?來看AbortPolicy策略代碼:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
看代碼就應該明白了吧。
所以當使用Future的時候,盡量使用帶超時時間的get方法,這樣即使使用了DiscardPolicy拒絕策略也不至於一直等待,等待超時時間到了會自動返回的,如果非要使用不帶參數的get方法則可以重寫DiscardPolicy的拒絕策略在執行策略時候設置該Future的狀態大於COMPLETING即可,但是查看FutureTask提供的方法發現只有cancel方法是public的並且可以設置FutureTask的狀態大於COMPLETING,重寫拒絕策略具體代碼可以如下:
public static void main(String[] args) throws Exception {
//(1)線程池單個線程,線程池隊列元素個數為1
ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
if (r != null && r instanceof FutureTask) {
((FutureTask) r).cancel(true);
}
}
}
}
);
//(2)添加任務one
Future futureOne = executorService.submit(new Runnable() {
@Override public void run() {
System.out.println("start runable one");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//(3)添加任務two
Future futureTwo = executorService.submit(new Runnable() {
@Override public void run() {
System.out.println("start runable two");
}
});
//(4)添加任務three
Future futureThree = null;
try {
futureThree = executorService.submit(new Runnable() {
@Override public void run() {
System.out.println("start runable three");
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getLocalizedMessage());
}
try {
System.out.println("task one finish " + futureOne.get());//(5)等待任務one執行完畢
System.out.println("task two finish " + futureTwo.get());//(6)等待任務two執行完畢
System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任務three執行完畢
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();//(8)關閉線程池,阻塞直到所有任務執行完畢
}
使用這個策略時候,由於report方法中對cancel的任務上會拋出CancellationException異常,所以在get()時使用try-catch捕獲異常。運行后發現程序能正常退出。