前提
最近在看JUC線程池java.util.concurrent.ThreadPoolExecutor
的源碼實現,其中了解到java.util.concurrent.Future
的實現原理。從目前java.util.concurrent.Future
的實現來看,雖然實現了異步提交任務,但是任務結果的獲取過程需要主動調用Future#get()
或者Future#get(long timeout, TimeUnit unit)
,而前者是阻塞的,后者在異步任務執行時間不確定的情況下有可能需要進行輪詢,這兩種情況和異步調用的初衷有點相違背。於是筆者想結合目前了解到的Future
實現原理的前提下擴展出支持(監聽)回調的Future
,思路上參考了Guava
增強的ListenableFuture
。本文編寫的時候使用的JDK是JDK11,其他版本可能不適合。
簡單分析Future的實現原理
虛擬例子推演
並發大師Doug Lea在設計JUC線程池的時候,提供了一個頂層執行器接口Executor
:
public interface Executor {
void execute(Runnable command);
}
實際上,這里定義的方法Executor#execute()
是整套線程池體系最核心的接口,也就是ThreadPoolExecutor
定義的核心線程、額外創建的線程(線程池最大線程容量 - 核心線程數)都是在這個接口提交任務的時候懶創建的,也就是說ExecutorService
接口擴展的功能都是基於Executor#execute()
的基礎進行擴展。Executor#execute()
方法只是單純地把任務實例Runnable
對象投放到線程池中分配合適的線程執行,但是由於方法返回值是void
類型,我們是無法感知任務什么時候執行完畢。這個時候就需要對Runnable
任務實例進行包裝(下面是偽代碼 + 偽邏輯):
// 下面這個Wrapper和Status類是筆者虛構出來
@RequiredArgsConstructor
class Wrapper implements Runnable{
private final Runnable target;
private Status status = Status.of("初始化");
@Override
public void run(){
try{
target.run();
status = Status.of("執行成功");
}catch(Throwable t){
status = Status.of("執行異常");
}
}
}
我們只需要把new Wrapper(原始Runnable實例)
投放到線程池執行,那么通過定義好的Status
狀態記錄變量就能得知異步任務執行的狀態,以及什么時候執行完畢(包括正常的執行完畢和異常的執行完畢)。這里僅僅解決了任務執行的狀態獲取,但是Executor#execute()
方法法返回值是void
類型的特點使得我們無法回調Runnable
對象執行的結果。這個時候需要定義一個可以回調執行結果的接口,其實已經有現成的接口Callable
:
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
這里遇到了一個問題:由於Executor#execute()
只接收Runnable
參數,我們需要把Callable
接口適配到Runnable
接口,這個時候,做一次簡單的委托即可:
@RequiredArgsConstructor
class Wrapper implements Runnable{
private final Callable callable;
private Status status = Status.of("初始化");
@Getter
private Object outcome;
@Override
public void run(){
try{
outcome = callable.call();
status = Status.of("執行成功");
}catch(Throwable t){
status = Status.of("執行異常");
outcome = t;
}
}
}
這里把Callable
實例直接委托給Wrapper
,而Wrapper
實現了Runnable
接口,執行結果直接存放在定義好的Object
類型的對象outcome
中即可。當我們感知到執行狀態已經結束,就可以從outcome
中提取到執行結果。
Future的實現
上面一個小結僅僅對Future
實現做一個相對合理的虛擬推演,實際上,RunnableFuture
才是JUC中常用的復合接口,它同時實現了Runnable
和Future
:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
上一節提到的虛構出來的Wrapper
類,在JUC中類似的實現是java.util.concurrent.FutureTask
,它就是Callable
和Runnable
的適配器,FutureTask
實現了RunnableFuture
接口:
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 省略其他代碼
}
注意到核心屬性state
表示執行狀態,outcome
承載執行結果。接着看提交Callable
類型任務的方法ExecutorService#submit()
:
public interface ExecutorService extends Executor {
// 省略其他接口方法
<T> Future<T> submit(Callable<T> task);
}
當我們通過上述ExecutorService#submit()
方法提交Callable
類型任務的時候,實際上做了如下的步驟:
- 檢查入參
task
的存在性,如果為null
拋出NullPointerException
。 - 把
Callable
類型的task
包裝為FutureTask
實例。 - 把新建的
FutureTask
實例放到線程池中執行,也就是調用Executor#execute(FutureTask實例)
。 - 返回
FutureTask
實例的接口實例RunnableFuture
(實際上是返回子接口Future
實例)。
如果我們需要獲取結果,可以Future#get()
或者Future#get(long timeout, TimeUnit unit)
獲取,調用這兩個方法的時候參看FutureTask
里面的方法實現,得知步驟如下:
- 如果狀態
state
小於等於COMPLETING(1)
,說明任務還在執行中,獲取結果的請求線程會放入WaitNode
類型的隊列中進行阻塞。 - 如果任務執行完畢,不管異常完畢還是正常完畢,除了會更新狀態
state
和把結果賦值到outcome
之外,還會喚醒所有阻塞獲取結果的線程,然后調用鈎子方法FutureTask#done()
(具體見源碼FutureTask#finishCompletion()
)。
其實分析了這么多,筆者想指出的結論就是:Callable
類型任務提交到線程池中執行完畢(包括正常執行完畢和異常執行完畢)之后,都會回調鈎子方法FutureTask#done()
。這個就是我們擴展可監聽Future
的理論依據。
擴展可回調的Future
先做一次編碼實現,再簡單測試其功能。
編碼實現
先定義一個Future
接口的子接口ListenableFuture
,用於添加可監聽的回調:
public interface ListenableFuture<V> extends Future<V> {
void addCallback(ListenableFutureCallback<V> callback, Executor executor);
}
ListenableFutureCallback
是一個函數式回調接口:
@FunctionalInterface
public interface ListenableFutureCallback<V> {
void callback(V value, Throwable throwable);
}
對於ListenableFutureCallback
而言,回調的結果value
和throwable
是互斥的。正常執行完畢的情況下value
將會是執行結果值,throwable
為null
;異常執行完畢的情況下,value
將會是null
,throwable
將會是拋出的異常實例。如果更習慣於分開處理正常執行完畢的結果和異常執行完畢的結果,ListenableFutureCallback
可以這樣定義:
public interface ListenableFutureCallback<V> {
void onSuccess(V value);
void onError(Throwable throwable);
}
接着定義ListenableExecutorService
接口繼承ExecutorService
接口:
public interface ListenableExecutorService extends ExecutorService {
<T> ListenableFuture<T> listenableSubmit(Callable<T> callable);
/**
* 定義這個方法是因為有些時候由於任務執行時間非常短,有可能通過返回的ListenableFuture實例添加回調之前已經執行完畢,因此可以支持顯式傳入回調
*
* @param callable callable
* @param callbacks callbacks
* @param executor executor
* @return ListenableFuture
*/
<T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor);
}
然后添加一個執行單元適配器ListenableFutureCallbackRunnable
,承載每次回調觸發的調用(實現Runnable
接口,從而支持異步執行):
@RequiredArgsConstructor
public class ListenableFutureCallbackRunnable<V> implements Runnable {
private final ListenableFutureCallback<V> callback;
private final V value;
private final Throwable throwable;
@Override
public void run() {
callback.callback(value, throwable);
}
}
接着需要定義一個FutureTask
的子類ListenableFutureTask
,核心邏輯是覆蓋FutureTask#done()
方法觸發回調:
// ListenableFutureTask
public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
private final List<Execution<V>> executions = new ArrayList<>();
public ListenableFutureTask(Callable<V> callable) {
super(callable);
}
public ListenableFutureTask(Runnable runnable, V result) {
super(runnable, result);
}
public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) {
return new ListenableFutureTask<>(callable);
}
@Override
protected void done() {
Iterator<Execution<V>> iterator = executions.iterator();
Throwable throwable = null;
V value = null;
try {
value = get();
} catch (Throwable t) {
throwable = t;
}
while (iterator.hasNext()) {
Execution<V> execution = iterator.next();
ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),
value, throwable);
// 異步回調
if (null != execution.getExecutor()) {
execution.getExecutor().execute(callbackRunnable);
} else {
// 同步回調
callbackRunnable.run();
}
}
}
@Override
public void addCallback(ListenableFutureCallback<V> callback, Executor executor) {
Execution<V> execution = new Execution<>();
execution.setCallback(callback);
execution.setExecutor(executor);
executions.add(execution);
}
}
// Execution - 承載每個回調實例和對應的Executor,Executor實例為null則進行同步回調
@Data
public class Execution <V>{
private Executor executor;
private ListenableFutureCallback<V> callback;
}
最后一步就是編寫線程池ListenableThreadPoolExecutor
,繼承自ThreadPoolExecutor
並且實現ListenableExecutorService
接口:
public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService {
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) {
if (null == callable) {
throw new IllegalArgumentException("callable");
}
ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
execute(listenableFutureTask);
return listenableFutureTask;
}
@Override
public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) {
if (null == callable) {
throw new IllegalArgumentException("callable");
}
if (null == callbacks) {
throw new IllegalArgumentException("callbacks");
}
ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
for (ListenableFutureCallback<T> callback : callbacks) {
listenableFutureTask.addCallback(callback, executor);
}
execute(listenableFutureTask);
return listenableFutureTask;
}
}
測試
引入junit
,編寫測試類如下:
public class ListenableFutureTest {
private static ListenableExecutorService EXECUTOR;
private static Executor E;
@BeforeClass
public static void before() {
EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement()));
return thread;
}
});
E = Executors.newFixedThreadPool(3);
}
@Test
public void testListenableFuture1() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
return "message";
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, null);
Thread.sleep(2000);
}
@Test
public void testListenableFuture2() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
throw new RuntimeException("exception");
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, null);
Thread.sleep(2000);
}
@Test
public void testListenableFuture3() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
return "message";
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, E);
System.out.println("testListenableFuture3 end...");
Thread.sleep(2000);
}
@Test
public void testListenableFuture4() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
throw new RuntimeException("exception");
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, E);
System.out.println("testListenableFuture4 end...");
Thread.sleep(2000);
}
}
執行結果:
// testListenableFuture1
Value = message,Throwable = null
// testListenableFuture2
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
// testListenableFuture3
testListenableFuture3 end...
Value = message,Throwable = null
// testListenableFuture4
testListenableFuture4 end...
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
和預期的結果一致,注意一下如果Callable
執行拋出異常,異常被包裝為ExecutionException
,要調用Throwable#getCause()
才能得到原始的異常實例。
小結
本文通過了解ThreadPoolExecutor
和Future
的實現原理做簡單的擴展,使得異步提交任務變得更加優雅和簡便。強化了動手能力的同時,也能加深對並發編程的一些認知。當然,本文只是提供一個十分簡陋的實現,筆者其實還想到了如對回調處理的耗時做監控、回調打上分組標簽執行等等更完善的功能,等到有需要的場景再進行實現。
這里記錄一下過程中的一些領悟:
Executor#execute()
是線程池的核心接口,所有其他功能都是基於此接口做擴展,它的設計本身是無狀態的。- 靈活使用適配器模式,可以在不改變已發布的接口的功能同時實現新的接口的功能適配。
- 要善於發掘和使用JDK類庫設計者留給開發者的擴展接口。
個人博客
(本文完 c-1-d e-a-20190702)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):
娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力: