粘包和分包出現的原因是:沒有一個穩定數據結構
解決辦法: 分割符
長度 + 數據
* <pre>
* 數據包格式
* +——----——+——-----——+——----——+——----——+——-----——+
* | 包頭 | 模塊號 | 命令號 | 長度 | 數據 |
* +——----——+——-----——+——----——+——----——+——-----——+
* </pre>
* 包頭4字節
* 模塊號2字節short
* 命令號2字節short
* 長度4字節(描述數據部分字節長度)
創建encoder 和 decoder 分別 加入pipeline 中
public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserialize(data, genericClass); out.add(obj); } }
public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data); } } }
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= BASE_LENGTH) { //防止socket字節流攻擊 if (in.readableBytes() > 2048) { in.skipBytes(in.readableBytes()); } //記錄包頭開始的index int beginReader; //讀取包頭 while (true) { beginReader = in.readerIndex(); in.markReaderIndex(); if (in.readInt() == Constantvalue.FLAG) { break; } //未讀到包頭, 略過一個字節 in.resetReaderIndex(); in.readByte(); //長度又變得不滿足 if (in.readableBytes() < BASE_LENGTH) { return; } } } // 模塊號 short module = in.readShort(); //命令好 short cmd = in.readShort(); // 長度 int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { //還原讀指針 in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Request request = new Request(); request.setModule(module); request.setCmd(cmd); request.setData(data); //繼續往下傳遞 out.add(request); }
buffer里面數據未被讀取完怎么辦
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { // 這里會對照長度 先判斷讀到東西了沒有, 沒有跳出 if (oldInputLength == in.readableBytes()) { // 讀取位置變化沒 break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } }
數據緩存在 cumulation中
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; //第一次請求 cumulation 為 null true if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // 第二次請求時進入 將新的信息追加到cumulation后面 } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } }