Netty自定義Encoder/Decoder進行對象傳遞


轉載:http://blog.csdn.net/top_code/article/details/50901623

在上一篇文章中,我們使用Netty4本身自帶的ObjectDecoder,ObjectEncoder來實現POJO對象的傳輸,但其使用的是Java內置的序列化,由於Java序列化的性能並不是很好,所以很多時候我們需要用其他高效的序列化方式,例如 protobuf,Hessian, Kryo,Jackson,fastjson等。

本文中Java序列化不是重點,對Java序列化不熟悉的同學的請自行查找資料學習,本篇我們重點介紹如何構造我們的Encoder和Decoder 。

流式傳輸特點

In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote. For example, let us assume that the TCP/IP stack of an operating system has received three packets: 
這里寫圖片描述 
Because of this general property of a stream-based protocol, there’s high chance of reading them in the following fragmented form in your application: 
這里寫圖片描述

Therefore, a receiving part, regardless it is server-side or client-side, should defrag the received data into one or more meaningful frames that could be easily understood by the application logic. In case of the example above, the received data should be framed like the following: 
這里寫圖片描述

通常情況下有下面幾種解決方案:

  1. 消息定長
  2. 在包尾增加一個標識,通過這個標志符進行分割
  3. 將消息分為兩部分,也就是消息頭和消息尾,消息頭中寫入要發送數據的總長度,通常是在消息頭的第一個字段使用int值(如果消息很大可以考慮用long值)來標識發送數據的長度。

本文中采用第三種方案,自定義Encoder/Decoder進行對象的傳輸。

准備工作

JDK 7 
Eclipse Juno 
Maven 3.3

序列化框架

本篇我們使用Kryo對POJO對象進行序列化,當然也可以采用protobuf,Hessian做序列化,有興趣的同學可以自己動手試試。

1、添加Kyro 依賴

<dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>3.0.3</version> </dependency>

2、自定義Encoder 
首先我們實現一個Encoder,繼承自MessageToByteEncoder

package com.ricky.codelab.netty.ch3.serialiaztion; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.ByteArrayOutputStream; import org.apache.commons.io.IOUtils; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Output; import com.ricky.codelab.netty.model.Car; /** * 自定義Encoder * @author Ricky * */ public class KyroMsgEncoder extends MessageToByteEncoder<Car> { private Kryo kryo = new Kryo(); @Override protected void encode(ChannelHandlerContext ctx, Car msg, ByteBuf out) throws Exception { byte[] body = convertToBytes(msg); //將對象轉換為byte int dataLength = body.length; //讀取消息的長度 out.writeInt(dataLength); //先將消息長度寫入,也就是消息頭 out.writeBytes(body); //消息體中包含我們要發送的數據 } private byte[] convertToBytes(Car car) { ByteArrayOutputStream bos = null; Output output = null; try { bos = new ByteArrayOutputStream(); output = new Output(bos); kryo.writeObject(output, car); output.flush(); return bos.toByteArray(); } catch (KryoException e) { e.printStackTrace(); }finally{ IOUtils.closeQuietly(output); IOUtils.closeQuietly(bos); } return null; } }

在KyroMsgEncoder中我們需要覆蓋 encode(ChannelHandlerContext ctx, Car msg, ByteBuf out) 方法,其主要用來將要傳輸的對象轉換為byte數組。

3、自定義Decoder 
自定義Decoder 需繼承ByteToMessageDecoder類,並覆蓋其decode方法。

package com.ricky.codelab.netty.ch3.serialiaztion; import java.io.ByteArrayInputStream; import java.util.List; import org.apache.commons.io.IOUtils; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.ricky.codelab.netty.model.Car; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; /** * 自定義Decoder * @author Ricky * */ public class KyroMsgDecoder extends ByteToMessageDecoder { public static final int HEAD_LENGTH = 4; private Kryo kryo = new Kryo(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < HEAD_LENGTH) { //這個HEAD_LENGTH是我們用於表示頭長度的字節數。 由於Encoder中我們傳的是一個int類型的值,所以這里HEAD_LENGTH的值為4. return; } in.markReaderIndex(); //我們標記一下當前的readIndex的位置 int dataLength = in.readInt(); // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4 if (dataLength < 0) { // 我們讀到的消息體長度為0,這是不應該出現的情況,這里出現這情況,關閉連接。 ctx.close(); } if (in.readableBytes() < dataLength) { //讀到的消息體長度如果小於我們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; //傳輸正常 in.readBytes(body); Object o = convertToObject(body); //將byte數據轉化為我們需要的對象 out.add(o); } private Object convertToObject(byte[] body) { Input input = null; ByteArrayInputStream bais = null; try { bais = new ByteArrayInputStream(body); input = new Input(bais); return kryo.readObject(input, Car.class); } catch (KryoException e) { e.printStackTrace(); }finally{ IOUtils.closeQuietly(input); IOUtils.closeQuietly(bais); } return null; } }

在KyroMsgDecoder中覆蓋父類的decode(ChannelHandlerContext ctx, ByteBuf in, List out)方法,將byte數組轉換為對象。

服務端程序

KyroTransferServer.java

package com.ricky.codelab.netty.ch3; import com.ricky.codelab.netty.ch3.serialiaztion.KyroMsgDecoder; import com.ricky.codelab.netty.ch3.serialiaztion.KyroMsgEncoder; import com.ricky.codelab.netty.util.Constant; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Netty4.x 自定義Decoder,Encoder進行對象傳遞 * @author Ricky * */ public class KyroTransferServer { private final int port; public KyroTransferServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new KyroMsgEncoder(), new KyroMsgDecoder(), new KyroServerHandler()); } }); // Bind and start to accept incoming connections. b.bind(port).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new KyroTransferServer(Constant.PORT).run(); } }

KyroServerHandler.java

package com.ricky.codelab.netty.ch3; import com.ricky.codelab.netty.model.Car; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class KyroServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server receive msg:"+msg); Car car = new Car(); car.setName("K5"); car.setBrand("KIA"); car.setPrice(24.5); car.setSpeed(196); System.out.println("server write msg:"+car); ctx.writeAndFlush(car); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

客戶端程序

KyroTransferClient.java

package com.ricky.codelab.netty.ch3; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import com.ricky.codelab.netty.ch3.serialiaztion.KyroMsgDecoder; import com.ricky.codelab.netty.ch3.serialiaztion.KyroMsgEncoder; import com.ricky.codelab.netty.model.Car; import com.ricky.codelab.netty.util.Constant; public class KyroTransferClient { private String host; private int port; private Car message; public KyroTransferClient(String host, int port, Car message) { this.host = host; this.port = port; this.message = message; } public void send() throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new KyroMsgEncoder(), new KyroMsgDecoder(), new KyroClientHandler(message)); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { Car message = new Car(); message.setName("X5"); message.setBrand("BMW"); message.setPrice(52.6); message.setSpeed(200); new KyroTransferClient(Constant.HOST, Constant.PORT, message).send(); } }

 

KyroClientHandler.java

package com.ricky.codelab.netty.ch3; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import com.ricky.codelab.netty.model.Car; public class KyroClientHandler extends ChannelInboundHandlerAdapter { private final Car message; /** * Creates a client-side handler. */ public KyroClientHandler(Car message) { this.message = message; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send the message to Server super.channelActive(ctx); System.out.println("client send message:"+message); ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // you can use the Object from Server here System.out.println("client receive msg:"+msg); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

運行測試

先運行服務端程序,然后運行客戶端程序,將看到控制台有內容輸出

服務端程序輸出

server receive msg:Car [name=X5, brand=BMW, price=52.6, speed=200.0] 
server write msg:Car [name=K5, brand=KIA, price=24.5, speed=196.0]

客戶端程序輸出

client send message:Car [name=X5, brand=BMW, price=52.6, speed=200.0] 
client receive msg:Car [name=K5, brand=KIA, price=24.5, speed=196.0]

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM