Netty之Channel*


Netty之Channel*

本文內容主要參考**<<Netty In Action>> ** 和Netty的文檔和源碼,偏筆記向.

先簡略了解一下ChannelPipelineChannelHandler.

想象一個流水線車間.當組件從流水線頭部進入,穿越流水線,流水線上的工人按順序對組件進行加工,到達流水線尾部時商品組裝完成.

可以將ChannelPipeline當做流水線,ChannelHandler當做流水線工人.源頭的組件當做event,如read,write等等.

1.1 Channel

Channel連接了網絡套接字或能夠進行I/O操作的組件,如 read, write, connect, bind.

我們可以通過Channel獲取一些信息.

  • Channel的當前狀態(如,是否連接,是否打開)
  • Channel的配置參數,如buffer的size
  • 支持的I/O操作
  • 處理所有I/O事件的ChannelPipeline和與通道相關的請求

Channel接口定義了一組和ChannelInboundHandler API密切相關的狀態模型.

52896437562

Channel的狀態改變,會生成對應的event.這些event會轉發給ChannelPipeline中的ChannelHandler,handler會對其進行響應.

1.2 ChannelHandler生命周期

下面列出了 interface ChannelHandler 定義的生命周期操作, 在 ChannelHandler被添加到 ChannelPipeline 中或者被從 ChannelPipeline 中移除時會調用這些操作。這些方法中的每一個都接受一個 ChannelHandlerContext 參數

1.3 ChannelInboundHandler 接口

ChannelInboundHandler處理入站數據以及各種狀態變化,當Channel狀態發生改變會調用ChannelInboundHandler中的一些生命周期方法.這些方法與Channel的生命密切相關.

入站數據,就是進入socket的數據.下面展示一些該接口的生命周期API

當某個 ChannelInboundHandler 的實現重寫 channelRead()方法時,它將負責顯式地
釋放與池化的 ByteBuf 實例相關的內存。 Netty 為此提供了一個實用方法ReferenceCountUtil.release() .

@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ReferenceCountUtil.release(msg);
	}
}

這種方式還挺繁瑣的,Netty提供了一個SimpleChannelInboundHandler ,重寫channelRead0()方法,就可以在調用過程中會自動釋放資源.

public class SimpleDiscardHandler
	extends SimpleChannelInboundHandler<Object> {
	@Override
	public void channelRead0(ChannelHandlerContext ctx,
									Object msg) {
			// 不用調用ReferenceCountUtil.release(msg)也會釋放資源
	}
}

原理就是這樣,channelRead方法包裝了channelRead0方法.

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

1.4 ChannelOutboundHandler

出站操作和數據將由 ChannelOutboundHandler 處理。它的方法將被 Channel、 ChannelPipeline 以及 ChannelHandlerContext 調用。
ChannelOutboundHandler 的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些復雜的方法來處理請求。例如, 如果到遠程節點的寫入被暫停了, 那么你可以推遲沖刷操作並在稍后繼續。

ChannelPromiseChannelFuture: ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數, 以便在操作完成時得到通知。 ChannelPromiseChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 從而使ChannelFuture不可變.

1.5 ChannelHandler適配器

ChannelHandlerAdapter顧名思義,就是handler的適配器.你需要知道什么是適配器模式,假設有一個A接口,我們需要A的subclass實現功能,但是B類中正好有我們需要的功能,不想復制粘貼B中的方法和屬性了,那么可以寫一個適配器類Adpter繼承B實現A,這樣一來Adpter是A的子類並且能直接使用B中的方法,這種模式就是適配器模式.

就比如Netty中的SslHandler類,想使用ByteToMessageDecoder中的方法進行解碼,但是必須是ChannelHandler子類對象才能加入到ChannelPipeline中,通過如下簽名和其實現細節(SslHandler實現細節就不貼了)就能夠作為一個Handler去處理消息了.

public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler

下圖是ChannelHandler和Adpter的UML圖示.

ChannelHandlerAdapter提供了一些實用方法isSharable() 如果其對應的實現被標注為 Sharable, 那么這個方法將返回 true, 表示它可以被添加到多個 ChannelPipeline中 .

如果想在自己的ChannelHandler中使用這些適配器類,只需要擴展他們,重寫那些想要自定義的方法即可.

1.6 資源管理

在使用ChannelInboundHandler.channelRead() ChannelOutboundHandler.write() 方法處理數據時要避免資源泄露,ByteBuf那篇文章提到過引用計數,當使用完某個ByteBuf之后記得調整引用計數.

Netty提供了一個class ResourceLeakDetector 來幫助診斷資源泄露,這能夠幫助你判斷應用的運行情況,但是如果希望提高吞吐量(比如搞一些競賽),關閉內存診斷可以提高吞吐量.

泄露檢測級別可以通過將下面的 Java 系統屬性設置為表中的一個值來定義:
java -Dio.netty.leakDetectionLevel=ADVANCED

如果帶着該 JVM 選項重新啟動你的應用程序,你將看到自己的應用程序最近被泄漏的緩沖
區被訪問的位置。下面是一個典型的由單元測試產生的泄漏報告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...

應用程序處理消息釋放資源

消費入站消息釋放資源

@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		ReferenceCountUtil.release(msg);// 用於釋放資源的工具類
	}
}

SimpleChannelInboundHandler 中的channelRead0()會消費消息之后自動釋放資源.

出站釋放資源

@Sharable
public class DiscardOutboundHandler
						extends ChannelOutboundHandlerAdapter {
	@Override
	public void write(ChannelHandlerContext ctx,
        Object msg, ChannelPromise promise) {
        // 還是通過util工具類釋放資源
        ReferenceCountUtil.release(msg);
        // 通知ChannelPromise,消息已經處理
        promise.setSuccess();
	}
}

重要的是, 不僅要釋放資源,還要通知 ChannelPromise。否則可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的情況。總之,如果一個消息被消費或者丟棄了, 並且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那么用戶就有責任調用ReferenceCountUtil.release()。如果消息到達了實際的傳輸層, 那么當它被寫入時或者 Channel 關閉時,都將被自動釋放。

2 ChannelPipelin接口

Channel和ChannelPipeline

每一個新創建的 Channel 都將會被分配一個新的 ChannelPipeline。這項關聯是永久性的; Channel 既不能附加另外一個 ChannelPipeline,也不能分離其當前的。在 Netty 組件的生命周期中,這是一項固定的操作,不需要開發人員的任何干預。

ChannelHandler和ChannelHandlerContext

根據事件的起源,事件將會被 ChannelInboundHandler 或者 ChannelOutboundHandler 處理。隨后, 通過調用 ChannelHandlerContext 實現,它將被轉發給同一超類型的下一個ChannelHandler。

ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler 交 互 。 ChannelHandler 可 以 通 知 其 所 屬 的 ChannelPipeline 中 的 下 一 個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline.

ChannelPipelin和ChannelHandler

這是一個同時具有入站和出站 ChannelHandler 的 ChannelPipeline 的布局,並且印證了我們之前的關於 ChannelPipeline 主要由一系列的 ChannelHandler 所組成的說法。 ChannelPipeline 還提供了通過 ChannelPipeline 本身傳播事件的方法。如果一個入站事件被觸發,它將被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。

你可能會說, 從事件途經 ChannelPipeline 的角度來看, ChannelPipeline 的頭部和尾端取決於該事件是入站的還是出站的。然而 Netty 總是將 ChannelPipeline 的入站口(圖 的左側)作為頭部,而將出站口(該圖的右側)作為尾端。
當你完成了通過調用 ChannelPipeline.add*()方法將入站處理器( ChannelInboundHandler)和 出 站 處 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 后 , 每 一 個ChannelHandler 從頭部到尾端的順序位置正如同我們方才所定義它們的一樣。因此,如果你將圖 6-3 中的處理器( ChannelHandler)從左到右進行編號,那么第一個被入站事件看到的 ChannelHandler 將是1,而第一個被出站事件看到的 ChannelHandler 將是 5。

在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。如果不匹配, ChannelPipeline 將跳過該ChannelHandler 並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。 (當然, ChannelHandler 也可以同時實現ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)

2.1 修改ChannelPipeline

修改指的是添加或刪除ChannelHandler

代碼示例

ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
// 先添加一個Handler到ChannelPipeline中
pipeline.addLast("handler1", firstHandler);
// 這個Handler放在了first,意味着放在了handler1之前
pipeline.addFirst("handler2", new SecondHandler());
// 這個Handler被放到了last,意味着在handler1之后
pipeline.addLast("handler3", new ThirdHandler());
...
// 通過名稱刪除
pipeline.remove("handler3");
// 通過對象刪除
pipeline.remove(firstHandler);
// 名稱"handler2"替換成名稱"handler4",並切handler2的實例替換成了handler4的實例
pipeline.replace("handler2", "handler4", new ForthHandler());

這種方式非常靈活,按照需要更換或插入handler達到我們想要的效果.

ChannelHandler的執行和阻塞

通常 ChannelPipeline 中的每一個 ChannelHandler 都是通過它的 EventLoop( I/O 線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的 I/O 處理產生負面的影響。

但有時可能需要與那些使用阻塞 API 的遺留代碼進行交互。對於這種情況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add()方法。如果一個事件被傳遞給一個自定義的 EventExecutorGroup ,它將被包含在這個 EventExecutorGroup 中的某個 EventExecutor 所處理,從而被從該Channel 本身的 EventLoop 中移除。對於這種用例, Netty 提供了一個叫 DefaultEventExecutorGroup 的默認實現。

pipeline對handler的操作

2.2 ChannelPipeline的出入站api

入站

出站

  • ChannelPipeline 保存了與 Channel 相關聯的 ChannelHandler
  • ChannelPipeline 可以根據需要,通過添加或者刪除 ChannelHandler 來動態地修改
  • ChannelPipeline 有着豐富的 API 用以被調用,以響應入站和出站事件

3 ChannelHandlerContext接口

每當有ChannelHandler添加到ChannelPipeline中,都會創建ChannelHandlerContext.如果調用ChannelChannelPipeline上的方法,會沿着整個ChannelPipeline傳播,如果調用ChannelHandlerContext上的相同方法,則會從對應的當前ChannelHandler進行傳播.

API

  • ChannelHandlerContextChannelHandler 之間的關聯(綁定)是永遠不會改變的,所以緩存對它的引用是安全的;
  • 如同我們在本節開頭所解釋的一樣,相對於其他類的同名方法,ChannelHandlerContext的方法將產生更短的事件流, 應該盡可能地利用這個特性來獲得最大的性能。

3.1 使用CHannelHandlerContext

從ChannelHandlerContext訪問channel

ChannelHandlerContext ctx = ..;
// 獲取channel引用
Channel channel = ctx.channel();
// 通過channel寫入緩沖區
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

從ChannelHandlerContext訪問ChannelPipeline

ChannelHandlerContext ctx = ..;
// 獲取ChannelHandlerContext
ChannelPipeline pipeline = ctx.pipeline();
// 通過ChannelPipeline寫入緩沖區
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

有時候我們不想從頭傳遞數據,想跳過幾個handler,從某個handler開始傳遞數據.我們必須獲取目標handler之前的handler關聯的ChannelHandlerContext.

ChannelHandlerContext ctx = ..;
// 直接通過ChannelHandlerContext寫數據,發送到下一個handler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

好了,ChannelHandlerContext的基本使用應該掌握了,但是你真的理解ChannelHandlerContext,ChannelPipeline和Channelhandler之間的關系了嗎.我們老看一下Netty的源碼.

先看一下AbstractChannelHandlerContext類,這個類像不像雙向鏈表中的一個Node,

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
        ...
        volatile AbstractChannelHandlerContext next;
    	volatile AbstractChannelHandlerContext prev;
    	...
    	}

再來看一看DefaultChannelPipeline,ChannelPipeline中擁有ChannelHandlerContext這個節點的head和tail,

而且DefaultChannelPipeline類中並沒有ChannelHandler成員或handler數組.

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
        
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...

所以addFirst向pipeline中添加了handler到底添加到哪了呢.看一下pipeline中的addFirst方法

    @Override
    public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
        return addFirst(null, name, handler);
    }

    @Override
    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 檢查handler是否具有復用能力,不重要
            checkMultiplicity(handler);
			// 名稱,不重要.
            name = filterName(name, handler);
// 這個方法創建了DefaultChannelHandlerContext,handler是其一個成員屬性
// 你現在應該明白了上面說的添加handler會創建handlerContext了吧
            newCtx = newContext(group, name, handler);
// 這個方法
            addFirst0(newCtx);
// 這個方法是調整pipeline中HandlerContext的指針,
// 就是更新HandlerContext鏈表節點之間的位置
private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }

簡單總結一下,pipeline擁有context(本身像一個鏈表的節點)組成的節點的雙向鏈表首尾,可以看做pipeline擁有一個context鏈表,context擁有成員handler,這便是三者之間的關系.實際上,handler作為消息處理的主要組件,實現了和pipeline的解耦,我們可以只有一個handler,但是被封裝進不同的context能夠被不同的pipeline使用.

3.2 handler和context高級用法

緩存ChannelHandlerContext引用

@Sharable
public class WriteHandler extends ChannelHandlerAdapter {
	private ChannelHandlerContext ctx;
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) {
		this.ctx = ctx;
	}
    public void send(String msg) {
    	ctx.writeAndFlush(msg);
    }
}

因為一個 ChannelHandler 可以從屬於多個 ChannelPipeline,所以它也可以綁定到多個 ChannelHandlerContext 實例。 對於這種用法指在多個ChannelPipeline 中共享同一個 ChannelHandler, 對應的 ChannelHandler 必須要使用@Sharable 注解標注; 否則,試圖將它添加到多個 ChannelPipeline 時將會觸發異常。

@Sharable錯誤用法

@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	count++;
    	System.out.println("channelRead(...) called the "
    		+ count + " time");
    	ctx.fireChannelRead(msg);
    }
}

這段代碼的問題在於它擁有狀態 , 即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個並發的Channel訪問時導致問題。(當然,這個簡單的問題可以通過使channelRead()方法變為同步方法來修正。)

總之,只應該在確定了你的 ChannelHandler 是線程安全的時才使用@Sharable 注解。

4.1 入站異常處理

處理入站事件的過程中有異常被拋出,那么它將從它在ChannelInboundHandler里被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你需要在你的 ChannelInboundHandler 實現中重寫下面的方法。

public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception 
// 基本處理方式
public class InboundExceptionHandler extends 		ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
    								Throwable cause) {
    	cause.printStackTrace();
    	ctx.close();
    }
}

因為異常將會繼續按照入站方向流動(就像所有的入站事件一樣), 所以實現了前面所示邏輯的 ChannelInboundHandler 通常位於 ChannelPipeline 的最后。這確保了所有的入站異常都總是會被處理,無論它們可能會發生在ChannelPipeline 中的什么位置。

  • ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;

  • 如果異常到達了 ChannelPipeline 的尾端,它將會被記錄為未被處理;

  • 要想定義自定義的處理邏輯,你需要重寫 exceptionCaught()方法。然后你需要決定是否需要將該異常傳播出去。

4.2 出站異常處理

  • 每個出站操作都將返回一個 ChannelFuture。 注冊到 ChannelFuture 的 ChannelFutureListener 將在操作完成時被通知該操作是成功了還是出錯了。
  • 幾乎所有的 ChannelOutboundHandler 上的方法都會傳入一個 ChannelPromise
    的實例。作為 ChannelFuture 的子類, ChannelPromise 也可以被分配用於異步通
    知的監聽器。但是, ChannelPromise 還具有提供立即通知的可寫方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);

1.添加ChannelFutureListener到ChannelFuture

    ChannelFuture future = channel.write(someMessage);
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) {
            if (!f.isSuccess()) {
                f.cause().printStackTrace();
                f.channel().close();
            }
         }
    });

2.添加ChannelFutureListener到ChannelPromise

public class OutboundExceptionHandler extends 			ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
        ChannelPromise promise) {
            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) {
                    if (!f.isSuccess()) {
                        f.cause().printStackTrace();
                        f.channel().close();
                    }
            	}
        });
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM