Netty4.x中文教程系列(四) 對象傳輸
我們在使用netty的過程中肯定會遇到傳輸對象的情況,Netty4通過ObjectEncoder和ObjectDecoder來支持。
首先我們定義一個User對象,一定要實現Serializable接口:
package mjorcen.netty.object; import java.io.Serializable; /** * User: hupeng Date: 14-6-3 Time: 上午1:31 */ public class User implements Serializable { private int id; private String name; private String cardNo; private String description; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCardNo() { return cardNo; } public void setCardNo(String cardNo) { this.cardNo = cardNo; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "User{" + "id=" + id + ", name='" + name + '\'' + ", cardNo='" + cardNo + '\'' + ", description='" + description + '\'' + '}'; } }
服務端和客戶端里,我們自定義的Handler實現如下:
server
package mjorcen.netty.object; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class ObjectTranferServer { private final int port; public ObjectTranferServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ObjectEncoder(), new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)), new ObjectTransferServerHandler()); } }); // Bind and start to accept incoming connections. b.bind(port).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 11000; } new ObjectTranferServer(port).run(); } }
serverHandler
package mjorcen.netty.object; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.logging.Level; import java.util.logging.Logger; public class ObjectTransferServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(ObjectTransferServerHandler.class.getName()); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); ctx.writeAndFlush(msg); } // @Override // public void channelReadComplete(ChannelHandlerContext ctx) throws // Exception { // ctx.flush(); // ctx.close(); // } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); ctx.close(); } }
client
package mjorcen.netty.object; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class ObjectTransferClient { private String host; private int port; private int messageSize; public ObjectTransferClient(String host, int port, int messageSize) { this.host = host; this.port = port; this.messageSize = messageSize; } public void run() throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ObjectEncoder(), new ObjectDecoder(Integer.MAX_VALUE ,ClassResolvers.cacheDisabled(null)), new ObjectTransferClientHandler(messageSize)); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { final String host = "localhost"; final int port = 11000; final int messageSize = 20; new ObjectTransferClient(host, port, messageSize).run(); } }
clientHandler
package mjorcen.netty.object; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; public class ObjectTransferClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(ObjectTransferClientHandler.class.getName()); private final List<User> message; /** * Creates a client-side handler. */ public ObjectTransferClientHandler(int messageSize) { if (messageSize <= 0) { throw new IllegalArgumentException("firstMessageSize: " + messageSize); } message = new ArrayList<User>(messageSize); for (int i = 0; i < messageSize; i++) { User user = new User(); user.setId(i); user.setCardNo("420000" + i); user.setName("hu" + i); user.setDescription("你覺得這樣好嗎??真的好嗎" + i); message.add(user); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send the message to Server super.channelActive(ctx); ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // you can use the Object from Server here System.out.println(msg); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); ctx.close(); } }
簡單梳理一下思路:
- 通過Netty傳遞,都需要基於流,以ChannelBuffer的形式傳遞。所以,Object -> ChannelBuffer.
- Netty提供了轉換工具,需要我們配置到Handler。
- 樣例從客戶端 -> 服務端,單向發消息,所以在客戶端配置了編碼,服務端解碼。如果雙向收發,則需要全部配置Encoder和Decoder。
這里需要注意,注冊到Server的Handler是有順序的,如果你顛倒一下注冊順序:
結果就是,會先進入我們自己的業務,再進行解碼。這自然是不行的,會強轉失敗。至此,你應該會用Netty傳遞對象了吧。