本篇將自定義 編碼解碼器,對數據傳輸過程進行“入站解碼,出站編碼”。
- 服務端接收的是字節數據,通過“入站解碼”,得到知道格式的數據;
- 服務器發送指定格式數據通過 “出站編碼” 轉換成字節數據,然后發送給客戶端;
- 客戶端類似;
- ChannelPipeLine 管理一系列 ChannelHandler,入站消息解碼后轉發給下一個 handler 進行處理
案例需求:客戶端或服務器發送 Long 類型數據,出站編碼成字節數據,入站解碼讀取對方發送的消息
編碼器 MyLongToByteEncoder
package com.oy.inboundandoutbound; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MyLongToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("MyLongToByteEncoder encoder 被調用. msg: " + msg); out.writeLong(msg); } }
解碼器 MyByteToLongDecoder
package com.oy.inboundandoutbound; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyByteToLongDecoder extends ByteToMessageDecoder { /** * decode() 會根據接收的數據,被調用多次,知道確定沒有新的元素添加到list, * 或者是 ByteBuf 沒有更多的可讀字節為止。 * 如果 list 不為空,就會將 list 的內容傳遞給下一個 handler * @param ctx 上下文對象 * @param in 入站后的 ByteBuf * @param out 將解碼后的數據傳遞給下一個 handler * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // long 類型 為 8 字節 if (in.readableBytes() >= 8) { out.add(in.readLong()); } } }
Server
package com.oy.inboundandoutbound.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(boss, work) .channel(NioServerSocketChannel.class) .childHandler(new MyServerChannelInitializer()); ChannelFuture future = serverBootstrap.bind(8004).sync(); System.out.println("server started and listen " + 8004); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } }
package com.oy.inboundandoutbound.server; import com.oy.inboundandoutbound.MyByteToLongDecoder; import com.oy.inboundandoutbound.MyLongToByteEncoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { /* 向管道加入處理器 */ ChannelPipeline pipeline = socketChannel.pipeline(); // 入站的 handler 進行解碼 pipeline.addLast("decoder", new MyByteToLongDecoder()); // 添加一個出站的 handler 對數據進行編碼 pipeline.addLast("encoder", new MyLongToByteEncoder()); // 添加自定義的處理器 pipeline.addLast("MyServerHandler", new MyServerHandler()); } }
package com.oy.inboundandoutbound.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class MyServerHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("從客戶端讀到的數據:" + msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 服務器返回 long 類型數據 ctx.writeAndFlush(654321L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
Client
package com.oy.inboundandoutbound.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new MyClientChannelInitializer()); ChannelFuture future = bootstrap.connect("127.0.0.1", 8004).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
package com.oy.inboundandoutbound.client; import com.oy.inboundandoutbound.MyByteToLongDecoder; import com.oy.inboundandoutbound.MyLongToByteEncoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioSocketChannel; public class MyClientChannelInitializer extends ChannelInitializer<NioSocketChannel> { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 入站的 handler 進行解碼 pipeline.addLast("decoder", new MyByteToLongDecoder()); // 添加一個出站的 handler 對數據進行編碼 pipeline.addLast("encoder", new MyLongToByteEncoder()); // 添加自定義 handler,處理業務邏輯 pipeline.addLast(new MyClientHandler()); } }
package com.oy.inboundandoutbound.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class MyClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long msg) throws Exception { // 客戶端讀取服務器發送的 long 類型數據 System.out.println("客戶端讀取服務器發送的, msg:" + msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 客戶端發送 long 類型數據 ctx.writeAndFlush(123456L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
啟動服務器和客戶端程序, 控制台打印結果: