1.基本說明
1) netty 的組件設計: Netty 的主要組件有 Channel、 EventLoop、 ChannelFuture、 ChannelHandler、 ChannelPipe 等
2) ChannelHandler 充當了處理入站和出站數據的應用程序邏輯的容器。 例如, 實現 ChannelInboundHandler 接口(或
ChannelInboundHandlerAdapter) , 你就可以接收入站事件和數據, 這些數據會被業務邏輯處理。 當要給客戶端
發 送 響 應 時 , 也 可 以 從 ChannelInboundHandler 沖 刷 數 據 。 業 務 邏 輯 通 常 寫 在 一 個 或 者 多 個
ChannelInboundHandler 中。 ChannelOutboundHandler 原理一樣, 只不過它是用來處理出站數據的
3) ChannelPipeline 提供了 ChannelHandler 鏈的容器。 以客戶端應用程序為例, 如果事件的運動方向是從客戶端到
服務端的, 那么我們稱這些事件為出站的, 即客戶端發送給服務端的數據會通過 pipeline 中的一系列ChannelOutboundHandler, 並被這些 Handler 處理, 反之則稱為入站的
2.編碼解碼器
1) 當 Netty 發送或者接受一個消息的時候, 就將會發生一次數據轉換。 入站消息會被解碼: 從字節轉換為另一種格式(比如 java 對象) ; 如果是出站消息, 它會被編碼成字節。
2) Netty 提供一系列實用的編解碼器, 他們都實現了 ChannelInboundHadnler 或者 ChannelOutboundHandler 接口。在這些類中, channelRead 方法已經被重寫了。 以入站為例, 對於每個從入站 Channel 讀取的消息, 這個方法會
被調用。 隨后, 它將調用由解碼器所提供的 decode()方法進行解碼, 並將已經解碼的字節轉發給 ChannelPipeline中的下一個 ChannelInboundHandler。
注意:進站和出站都是想對而言
3 解碼器-ByteToMessageDecoder
1) 關系繼承圖
2) 由於不可能知道遠程節點是否會一次性發送一個完整的信息, tcp 有可能出現粘包拆包的問題, 這個類會對入站數據進行緩沖, 直到它准備好被處理.
3) 一個關於 ByteToMessageDecoder 實例分析
4 Netty 的 handler 鏈的調用機制
實例要求:
1) 使用自定義的編碼器和解碼器來說明 Netty 的 handler 調用機制
客戶端發送 long -> 服務器
服務端發送 long -> 客戶端
2) 案例演示
3) 結論
不論解碼器 handler 還是 編碼器 handler 即接收的消息類型必須與待處理的消息類型一致, 否則該 handler 不會被執行
在解碼器 進行數據解碼時, 需要判斷 緩存區(ByteBuf)的數據是否足夠 , 否則接收到的結果會期望結果可能不一致
代碼:
MyServer

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class MyServer { public static void main(String[] args) throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerInitializer

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();//一會下斷點 //入站的handler進行解碼 MyByteToLongDecoder //pipeline.addLast(new MyByteToLongDecoder()); pipeline.addLast(new MyByteToLongDecoder2()); //出站的handler進行編碼 pipeline.addLast(new MyLongToByteEncoder()); //自定義的handler 處理業務邏輯 pipeline.addLast(new MyServerHandler()); System.out.println("xx"); } }
MyServerHandler

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class MyServerHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("從客戶端" + ctx.channel().remoteAddress() + " 讀取到long " + msg); //給客戶端發送一個long ctx.writeAndFlush(98765L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
MyClient

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class MyClient { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new MyClientInitializer()); //自定義一個初始化類 ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
MyClientInitializer

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一個出站的handler 對數據進行一個編碼 pipeline.addLast(new MyLongToByteEncoder()); //這時一個入站的解碼器(入站handler ) //pipeline.addLast(new MyByteToLongDecoder()); pipeline.addLast(new MyByteToLongDecoder2()); //加入一個自定義的handler , 處理業務 pipeline.addLast(new MyClientHandler()); } }
MyClientHandler

import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import java.nio.charset.Charset; public class MyClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服務器的ip=" + ctx.channel().remoteAddress()); System.out.println("收到服務器消息=" + msg); } //重寫channelActive 發送數據 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyClientHandler 發送數據"); //ctx.writeAndFlush(Unpooled.copiedBuffer("")) ctx.writeAndFlush(123456L); //發送的是一個long //分析 //1. "abcdabcdabcdabcd" 是 16個字節 //2. 該處理器的前一個handler 是 MyLongToByteEncoder //3. MyLongToByteEncoder 父類 MessageToByteEncoder //4. 父類 MessageToByteEncoder /* public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { //判斷當前msg 是不是應該處理的類型,如果是就處理,不是就跳過encode @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf); } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } 4. 因此我們編寫 Encoder 是要注意傳入的數據類型和處理的數據類型一致 */ // ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8)); } }
MyByteToLongDecoder
解碼器

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyByteToLongDecoder extends ByteToMessageDecoder { /** * * decode 會根據接收的數據,被調用多次, 直到確定沒有新的元素被添加到list * , 或者是ByteBuf 沒有更多的可讀字節為止 * 如果list out 不為空,就會將list的內容傳遞給下一個 channelinboundhandler處理, 該處理器的方法也會被調用多次 * * @param ctx 上下文對象 * @param in 入站的 ByteBuf * @param out List 集合,將解碼后的數據傳給下一個handler * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyByteToLongDecoder 被調用"); //因為 long 8個字節, 需要判斷有8個字節,才能讀取一個long if(in.readableBytes() >= 8) { out.add(in.readLong()); } } }
解碼器-ReplayingDecoder
1) public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
2) ReplayingDecoder 擴展了 ByteToMessageDecoder 類, 使用這個類, 我們不必調用 readableBytes()方法。 參數 S指定了用戶狀態管理的類型, 其中 Void 代表不需要狀態管理
3) 應用實例: 使用 ReplayingDecoder 編寫解碼器, 對前面的案例進行簡化 [案例演示
MyByteToLongDecoder2

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyByteToLongDecoder2 被調用"); //在 ReplayingDecoder 不需要判斷數據是否足夠讀取,內部會進行處理判斷 out.add(in.readLong()); } }
4) ReplayingDecoder 使用方便, 但它也有一些局限性:
1. 並 不 是 所 有 的 ByteBuf 操 作 都 被 支 持 , 如 果 調 用 了 一 個 不 被 支 持 的 方 法 , 將 會 拋 出 一 個UnsupportedOperationException。
2. ReplayingDecoder 在某些情況下可能稍慢於 ByteToMessageDecoder, 例如網絡緩慢並且消息格式復雜時,
消息會被拆成了多個碎片, 速度變慢 。
6 其它編解碼器
1) LineBasedFrameDecoder: 這個類在 Netty 內部也有使用, 它使用行尾控制字符(\n 或者\r\n) 作為分隔符來解析數據。
2) DelimiterBasedFrameDecoder: 使用自定義的特殊字符作為消息的分隔符。
3) HttpObjectDecoder: 一個 HTTP 數據的解碼器
4) LengthFieldBasedFrameDecoder: 通過指定長度來標識整包消息, 這樣就可以自動的處理黏包和半包消息。
7 其它編碼器