javadoc筆記點
觀察者的核心思想就是,在適當的時機回調觀察者的指定動作函數
我們知道,在使用netty創建channel時,一般都是把這個channel設置成非阻塞的模式,這意味着什么呢? 意味着所有io操作一經調用,即刻返回
這讓netty對io的吞吐量有了飛躍性的提升,但是異步編程相對於傳統的串行化的編程模式來說,控制起來可太麻煩了
jdk提供了原生的Futrue接口,意為在未來任務,其實就是把任務封裝起來交給新的線程執行,在這個線程執行任務的期間,我們的主線程可以騰出時間去做別的事情
下面的netty給出的實例代碼,我們可以看到,任務線程有返回一個Futrue對象,這個對象中封裝着任務執行的情況
* * void showSearch(final String target)
* * throws InterruptedException {
* * Future<String> future
* * = executor.submit(new Callable<String>() {
* * public String call() {
* * return searcher.search(target);
* * }});
* * displayOtherThings(); // do other things while searching
* * try {
* * displayText(future.get()); // use future
* * } catch (ExecutionException ex) { cleanup(); return; }
* * }
*
雖然jdk原生Futrue可以實現異步提交任務,並且返回了任務執行信息的Futrue,但是有一個致命的缺點,從futrue獲取任務執行情況方法,是阻塞的,這是不被允許的,因為在netty中,一條channel可能關系着上千的客戶端的鏈接,其中一個客戶端的阻塞導致幾千的客戶端不可用是不被允許的,netty的Future設計成,繼承jdk原生的future,而且進行擴展如下
// todo 這個接口繼承了 java並發包總的Futrue , 並在其基礎上增加了很多方法
// todo Future 表示對未來任務的封裝
public interface Future<V> extends java.util.concurrent.Future<V> {
// todo 判斷IO是否成功返回
boolean isSuccess();
// todo 判斷是否是 cancel()方法取消
boolean isCancellable();
// todo 返回IO 操作失敗的原因
Throwable cause();
/**
* todo 使用了觀察者設計模式, 給這個future添加監聽器, 一旦Future 完成, listenner 立即被通知
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// todo 添加多個listenner
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// todo 移除多個 listenner
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// todo sync(同步) 等待着 future 的完成, 並且,一旦future失敗了,就會拋出 future 失敗的原因
// todo bind()是個異步操作,我們需要同步等待他執行成功
Future<V> sync() throws InterruptedException;
// todo 不會被中斷的 sync等待
Future<V> syncUninterruptibly();
// todo 等待
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
// todo 無阻塞的返回Future對象, 如果沒有,返回null
// todo 有時 future成功執行后返回值為null, 這是null就是成功的標識, 如 Runable就沒有返回值, 因此文檔建議還要 通過isDone() 判斷一下真的完成了嗎
V getNow();
@Override
boolean cancel(boolean mayInterruptIfRunning);
...
netty的觀察者模式
最常用的關於異步執行的方法writeAndFlush()
就是典型的觀察者的實現, 在netty中,當一個IO操作剛開始的時候,一個ChannelFutrue
對象就會創建出來,此時,這個futrue對象既不是成功的,也不是失敗的,更不是被取消的,因為這個IO操作還沒有結束
如果我們想在IO操作結束后立刻執行其他的操作時,netty推薦我們使用addListenner()
添加監聽者的方法而不是使用await()阻塞式等待,使用監聽者,我們就不用關系具體什么時候IO操作結束,只需要提供回調方法就可以,當IO操作結束后,方法會自動被回調
在netty中,一個IO操作是狀態分為如下幾種
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------- -----------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null 非空|
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
源碼追蹤
對writeAndFlush的使用
ChannelFuture channelFuture = ctx.writeAndFlush("from client : " + UUID.randomUUID());
channelFuture.addListener(future->{
if(future.isSuccess()){
todo
}else{
todo
}
});
注意點: 我們使用writeAndFlush()
程序立即返回,隨后我們使用返回的對象添加監聽者,添加回調,這個時writeAndFlush()
有可能已經完成了,也有可能沒有完成,這是不確定的事
首先我們知道,writeAndFlush()
是出站的動作,屬於channelOutboundHandler
,而且他是從pipeline的尾部開始傳播的,源碼如下:
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
尾節點數據AbstractChannelHandlerContext
類, 繼續跟進查看源碼如下:
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
悄無聲息的做了一個很重要的事情,創建了Promise
,這個DefaultChannelPromise
就是被觀察者,過一會由它完成方法的回調
繼續跟進writeAndFlush()
,源碼如下, 我們可以看到promise
被返回了, DefaultChannelPromise
是ChannelPromise
的實現類,而ChannelPromise
又繼承了ChannelFuture
,這也是為什么明明每次使用writeAndFlush()
返回的都是ChannelFuture
而我們這里卻返回了DafaultChannelPromise
// todo 調用本類的 write(msg, true, promise)
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
return promise;
}
write(msg, true, promise);
return promise;
在去目標地之前,先看一下addListenner()
干了什么,我們進入到DefaultChannelPromise
源碼如下:
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
隨機進入它的父類 DefaultChannelPromise中
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
這個函數分兩步進行
第一步: 為什么添加監聽事件的方法需要同步?
在這種多線程並發執行的情況下,這個 addListener0(listener);
任意一個線程都能使用,存在同步添加的情況 這個動作不像將channel和EventLoop做的唯一綁定一樣,沒有任何必須使用inEventloop()
去判斷在哪個線程中,直接使用同步
接着進入 addListener0(listener)
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener; // todo 第一次添加直接在這里賦值
} else if (listeners instanceof DefaultFutureListeners) {
// todo 第三次添加調用這里
((DefaultFutureListeners) listeners).add(listener);
} else {
// todo 第二次添加來這里復制, 由這個 DefaultFutureListeners 存放觀察者
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
第二步: 為什么接着判斷isDone()
writeAndFlush()
是異步執行的,而且在我們添加監聽者的操作之前已經開始執行了,所以在添加完監聽者之后,立即驗證一把,有沒有成功
思考一波:
回顧writeAndFlush()
的調用順序,從tail開始傳播兩波事件,第一波write,緊接着第二波flush,一直傳播到header,進入unsafe類中,由他完成把據寫入jdk原生ByteBuffer
的操作, 所以按理說,我們添加是listenner的回調就是在header的unsafe中完成的,這是我們的目標地
任何方法的回調都是提前設計好了的,就像pipeline中的handler中的方法的回調,就是通過遍歷pipeline內部的鏈表實現的,這里的通知觀察者,其實也是調用觀察者的方法,而且他使用的一定是觀察的父類及以上的引用實現的方法回調
回到我們的writeAndFlush()
這個方法,在第二波事務傳遞完成,將數據真正寫入jdk原生的ByteBuffer
之前,只有進行的所有回調都是設置失敗的狀態,直到把數據安全發出后才可能是 回調成功的操作
此外,想要進行回調的操作,就得有被觀察的對象的引用,所以一會我就回看到,Promise
一路被傳遞下去
我們進入的unsafe的write()
就可以看到與回調相關的操作safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
,源碼如下
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { // todo 緩存 寫進來的 buffer
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
我們繼續跟進本類方法safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
, 源碼如下:
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
}
}
其中重要的方法,就是回調 被觀察者的 tryFailure(cause)
, 這個被觀察者的類型是ChannelPromise
, 我們去看它的實現,源碼如下
@Override
public boolean tryFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return true;
}
return false;
}
調用本類方法notifyListeners()
繼續跟進本類方法notifyListenersNow();
接着跟進本類方法 notifyListener0(this, (GenericFutureListener<?>) listeners);
繼續 l.operationComplete(future);
終於看到了調用了監聽者的完成操作,實際上就是回調用戶的方法,雖然是完成的,但是失敗了
下面我們去flush()
中去查看通知成功的回調過程, 方法的調用順序如下
flush();
flush0();
doWrite(outboundBuffer);
在doWrite()方法中,就會使用自旋的方式往嘗試把數據寫出去, 數據被寫出去后,有一個標識 done=true, 證明是成功寫出了, 緊接着就是把當前的盛放ByteBuf的entry從鏈表上移除,源碼出下
if (done) {
// todo 跟進去
in.remove();
} else {
我們繼續跟進remove()
, 終於我們找到了成功回調的標志,在remove()
的底端safeSuccess(promise);, 下一步就是用回調用戶添加的監聽者操作完成了,並且完成的狀態是Success
成功的
public boolean remove() {
// todo 獲取當前的 Entry
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
// todo 將當前的Entry進行移除
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}