1、netty的編碼和解碼,在數據傳輸的時候,考慮數據安全,數據完整性都是很有必要的。這里主要是介紹netty3和netty5的編解碼方式。其實從StringEncoder和StringDecoder中也可以獲取源碼的編解碼規則。然后改變成自己的編解碼規則也是可以的。
2、netty3和netty5的編解碼方式還是存在一定差別的。個人感覺netty5來的更加實用和方便。
3、netty3的編解碼規則
1)數據編碼規則(我這里只是用於顯示,數據規則很簡單)
包頭+模塊+數據(請求編解碼)
包頭+模塊+狀態+數據(響應編解碼)
2)目錄
3)請求和相應對象
package com.troy.data.domain; //請求數據 public class Request { //模塊類型 private int model; //數據 private byte[] data; public int getModel() { return model; } public void setModel(int model) { this.model = model; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } @Override public String toString() { return "Request{" + "model=" + model + ", data=" + new String(data) + '}'; } }
package com.troy.data.domain; import java.util.Arrays; //響應數據 public class Response { //模塊類型 private int model; //狀態碼 private int status; //數據 private byte[] data; public int getModel() { return model; } public void setModel(int model) { this.model = model; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } @Override public String toString() { return "Response{" + "model=" + model + ", status=" + status + ", data=" + new String(data) + '}'; } }
4)常量(這里的包頭,因為是固定的就寫了一個常量)
package com.troy.data.constant; //常量 public class ConsantUtil { //固定常量用於數據拼接,確認 public static final int PACKAGE_HEADER = -32523523; }
5)請求編解碼
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Request; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; public class RequestEncode extends OneToOneEncoder { protected Object encode(ChannelHandlerContext channelHandlerContext, Channel channel, Object o) throws Exception { Request request = (Request) o; ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER); channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER); channelBuffer.writeBytes(request.getData()); return channelBuffer; } }
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Request; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; //請求數據解碼 public class RequestDecode extends OneToOneDecoder { protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, Object o) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) o; if (ConsantUtil.PACKAGE_HEADER == channelBuffer.readInt()) { Request request = new Request(); request.setModel(channelBuffer.readInt()); byte[] bytes = new byte[channelBuffer.readableBytes()]; channelBuffer.readBytes(bytes); request.setData(bytes); return request; } return null; } }
6)響應編解碼
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Response; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; //響應編碼器 public class ResponseEncode extends OneToOneEncoder { protected Object encode(ChannelHandlerContext channelHandlerContext, Channel channel, Object o) throws Exception { Response response = (Response) o; ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER); channelBuffer.writeInt(response.getModel()); channelBuffer.writeInt(response.getStatus()); channelBuffer.writeBytes(response.getData()); return channelBuffer; } }
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Response; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; //響應解碼 public class ResponseDecode extends OneToOneDecoder{ protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, Object o) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) o; if (ConsantUtil.PACKAGE_HEADER == channelBuffer.readInt()) { Response response = new Response(); response.setModel(channelBuffer.readInt()); response.setStatus(channelBuffer.readInt()); byte[] bytes = new byte[channelBuffer.readableBytes()]; channelBuffer.readBytes(bytes); response.setData(bytes); return response; } return null; } }
7)設置對應的管道編解碼就可以了
a、客戶端
//設置管道工廠 clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("decode",new RequestEncode()); channelPipeline.addLast("encode",new ResponseDecode()); channelPipeline.addLast("client",new ClientHandler()); return channelPipeline; } });
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Response response = (Response) e.getMessage(); System.out.println(response.toString()); super.messageReceived(ctx, e); }
b、服務端
//設置管道流 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); //添加處理方式 channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),60,60,60)); channelPipeline.addLast("decode",new RequestDecode()); channelPipeline.addLast("encode",new ResponseEncode()); channelPipeline.addLast("server",new ServerHandler()); return channelPipeline; } });
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Request request = (Request) e.getMessage(); System.out.println("client:"+request.toString()); Response response = new Response(); response.setModel(1); response.setStatus(1); response.setData("hello client".getBytes()); ctx.getChannel().write(response); super.messageReceived(ctx, e); }
4、netty5的編解碼規則
1)數據結構、目錄結構、對象、常量都是一樣。
2)編解碼的編寫方式有些不一樣
a、請求編解碼
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Request; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List; @ChannelHandler.Sharable public class RequestEncode extends MessageToMessageEncoder<Request> { protected void encode(ChannelHandlerContext channelHandlerContext, Request request, List<Object> list) throws Exception { ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeInt(ConsantUtil.PACKAGE_HEADER); byteBuf.writeInt(request.getModel()); byteBuf.writeBytes(request.getData()); list.add(byteBuf); } }
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Request; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; //請求數據解碼 @ChannelHandler.Sharable public class RequestDecode extends MessageToMessageDecoder<ByteBuf>{ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { //當數據超過指定值的時候跳過這部分數據 if (byteBuf.readableBytes() > 2048) { byteBuf.skipBytes(byteBuf.readableBytes()); } //一個字節一個字節的讀取,知道讀取到包頭 while(true) { byteBuf.markReaderIndex(); if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { break; } byteBuf.resetReaderIndex(); byteBuf.readByte(); } Request request = new Request(); request.setModel(byteBuf.readInt()); byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); request.setData(bytes); list.add(request); } } }
b、響應編解碼
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Response; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List; //響應編碼器 @ChannelHandler.Sharable public class ResponseEncode extends MessageToMessageEncoder<Response> { protected void encode(ChannelHandlerContext channelHandlerContext, Response response, List<Object> list) throws Exception { ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeInt(ConsantUtil.PACKAGE_HEADER); byteBuf.writeInt(response.getModel()); byteBuf.writeInt(response.getStatus()); byteBuf.writeBytes(response.getData()); list.add(byteBuf); } }
package com.troy.data.codec; import com.troy.data.constant.ConsantUtil; import com.troy.data.domain.Response; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; //響應解碼 @ChannelHandler.Sharable public class ResponseDecode extends MessageToMessageDecoder<ByteBuf>{ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { //當數據超過指定值的時候跳過這部分數據 if (byteBuf.readableBytes() > 2048) { byteBuf.skipBytes(byteBuf.readableBytes()); } //一個字節一個字節的讀取,知道讀取到包頭 while(true) { byteBuf.markReaderIndex(); if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { break; } byteBuf.resetReaderIndex(); byteBuf.readByte(); } Response response = new Response(); response.setModel(byteBuf.readInt()); response.setStatus(byteBuf.readInt()); byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); response.setData(bytes); list.add(response); } } }
3)處理上面基本上都是一樣的。
5、netty的編解碼,主要目的就是處理通訊問題,對數據進行自定義處理!