1.前言
第7節講解JAVA的線程模型中就說到了Future,並解釋了為什么可以主線程可以獲得線程池任務的執行后結果,變成一種同步狀態。秘密就在於Java將所有的runnable和callable任務,統一變成了callable,最終包裝成了FutureTask對象,該類實現了Runnable接口和Future接口,所以FutureTask能夠被線程執行。最終異步執行過程全部由該類控制邏輯,所以在get的時候鎖住了該類,run方法執行的時候釋放了鎖,這樣就滿足了能夠在異步線程執行完畢獲取相關結果的能力。
本章介紹一下Netty對Future的設計,Netty的聲明就是一個異步事件驅動框架,上一節學習了整個線程調度的過程,並在最后給出了前幾節的一個綜合流程圖,雖然圖中提到了幾種Future,但是沒有具體介紹細節,這些將在本節得到解釋。
2.相關概念
2.1 Future
雖然Java中已經定義了Future,但是滿足不了Netty的需求,所以Netty新寫了一個Future接口,繼承了JDK的Future。額外方法定義如下圖:

接口主要追加了兩個功能:1.增加了判斷任務是否成功失敗的方法,以及失敗獲取異常信息;2.增加了任務完成時觸發的監聽器
2.2 Promise

該類繼承自Future,自然是增加了額外的功能了:這是一個可寫的Future。什么意思呢?通過之前的知識,我們知道Future都是由異步線程控制的,主線程是無法控制線程執行的。Promise的作用就是主線程能夠控制一下執行的任務。
setSuccess():標記任務成功,並觸發所有listener。如果任務早就成功或失敗,則拋出異常
trySuccess():同上,但是失敗只是返回false,而不是拋出異常
這里就解釋這兩個方法,其他方法是覆蓋了父接口的方法,確定返回的具體類型而已。根據方法我們也能大體明白可寫的含義了。
3 主要Future詳解
3.1 DefaultChannelPromise
該類是在注冊channel時創建的,SingleThreadEventLoop的register方法。和Java的FutureTask不同,FutureTask是作為一個任務交給線程池,在內部控制任務執行。DefaultChannelPromise則是持有了Channel和EventExecutor二者,在外邊處理邏輯。其上層有2個抽象父類:
1.AbstractFuture:
該類就實現了get方法,原理是調用了await()方法,await之后喚醒肯定就是任務結束了,判斷有無異常,最終返回結果還是拋出異常。
2.DefaultPromise:
該類提供了一個Promise應該具備的基本實現。對任務標記結果,觸發listener等。其主要有個result,對結果進行CAS操作來判斷任務是否完成。
setSuccess過程如下:1.設置成功的結果;2.觸發所有的listener。設置結果主要是CAS更新result字段,然后判斷是否有get請求等待任務執行完,直接notifyAll即可。觸發listener的過程在於先判斷當前線程是否是事件線程中,觸發方法必須由EventLoop線程執行,然后就是遍歷觸發listener的operationComplete方法。
await過程如下:1.判斷是否執行完,執行完直接返回;2.判斷線程是否中斷,中斷拋出異常;3.檢查死鎖,即wait操作不能在EventLoop的線程中執行;4.如果沒執行完,等待者計數,然后wait。
DefaultChannelPromise相對於DefaultPromise而言只是增加了一個channel字段,其它的方法都是調用父類方法。接下來我們看看register過程是怎么使用這個Promise的吧。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
可以看到在注冊過程中實際上就是使用setXXX方法來處理相關邏輯的,這個和Java的FutureTask采取了不同的方式。
3.2 SucceededFuture
上面講了一個Promise控制主線程和線程池的同步狀態,那個是依靠promise才有的setXXX接口來觸發的。那么Future是怎么控制的呢?答案是Future不需要控制,返回Future的時候就已經有結果了,並且返回一定是一個同步過程。
以SucceededFuture為例。Bootstrap中的doResolveAndConnect0方法有段:final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);其解析成功就會返回帶有結果的SucceededFuture。看這個類的sync方法也和Promise的不一樣,Promise是await,Future是直接返回。這個可以說明Future和Promise的區別:Future用於同步任務,Promise用於異步任務。不知道Netty為什么會設計成這樣,讓人會有些疑惑。但是記住這點,再加上Promise的set方法達成的效果,就可以理解Netty的Future了。
4.總結
Netty的Future設計采取了和Java的FutureTask不同的設計思路。Java的思路是將Futuren包裝成一個任務,這樣異步線程執行這個FutureTask的時候,其就可以知道任務的執行狀態。Netty將Future擴展成了Promise。Future作為同步方法直接返回的結果類,使用較少。Promise提供了setXXX方法,給異步線程調用該方法告知執行狀態。相同的地方在於Promise也必須被異步線程持有,才能使用set方法。
