一、拆包/粘包的問題
正常情況下客戶端發上來的報文都是單獨,一條報文就是一個完善的。但是特殊情況下會出現2個報文粘在一起發上來。
正常情況的報文:
757200501011313130323630383237374137393738323030303532000000000000000055012238393836303242343130313638303038333035320000000000000000000000000000000000000000D58A
7572 是幀起始序列,也就是包頭
0050 是報文的長度
D58A 為CRC16 檢驗
粘包:
757200501011313130323630383237374137393738323030303532000000000000000055012238393836303242343130313638303038333035320000000000000000000000000000000000000000D58A757200501011313130323630383237374137393738323030303532000000000000000055
這是報文第二條是個不完整的包,我們服務端需要做到將包拆成完整的包,並且第二個包需要等到下一條報文拼接成完整的包。
二、 netty的解決方案
1.消息定長,報文大小固定長度,發送和接收方遵循相同的約定,這樣即使粘包了通過接收方編程實現獲取定長報文也能區分。
2.包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符作為報文分隔符,接收方通過特殊分隔符切分報文區分。
3.將消息分為消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段
三、實現方法
創建一個實現類繼承netty的 MessageToMessageDecoder方法
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> { private byte[] remainingBytes; private static byte[] HEAD_DATA = new byte[]{0x75, 0x72}; //協議幀起始序列 7572 2個字節 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { ByteBuf currBB = null; if(remainingBytes == null) { currBB = msg; }else { byte[] tb = new byte[remainingBytes.length + msg.readableBytes()]; System.arraycopy(remainingBytes, 0, tb, 0, remainingBytes.length); byte[] vb = new byte[msg.readableBytes()]; msg.readBytes(vb); System.arraycopy(vb, 0, tb, remainingBytes.length, vb.length); currBB = Unpooled.copiedBuffer(tb); } while(currBB.readableBytes() > 0) { if(!doDecode(ctx, currBB, out)) { break; } } if(currBB.readableBytes() > 0) { remainingBytes = new byte[currBB.readableBytes()]; currBB.readBytes(remainingBytes); }else { remainingBytes = null; } // out.add(remainingBytes); // remainingBytes=null; } private boolean doDecode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { if(msg.readableBytes() < 2) return false; msg.markReaderIndex(); byte[] header = new byte[2]; msg.readBytes(header); byte[] dataLength=new byte[2]; //報文的長度 msg.readBytes(dataLength); if (!Arrays.equals(header, HEAD_DATA)) { return false; // throw new DecoderException("errorMagic: " + Arrays.toString(header)); } int len = Integer.parseInt(DatatypeConverter.printHexBinary(dataLength), 16); // int len =msg.readInt(); if(msg.readableBytes() < len-4) { msg.resetReaderIndex(); return false; } msg.resetReaderIndex(); byte[] body = new byte[len]; msg.readBytes(body); out.add(Unpooled.copiedBuffer(body)); if(msg.readableBytes() > 0) return true; return false; } }
netty 客戶端ChannelPipeline加入創建的 MessageDecoder 類
public synchronized boolean openDev() { if(isOpen()){ return true; } if(group ==null){ group =new NioEventLoopGroup(); } Bootstrap b =new Bootstrap(); final MessageHandler hander =new MessageHandler(); hander.setMedia(this); final ServerHandler serverHandler =new ServerHandler(); serverHandler.setMedia(this); b.handler(new HeartbeatHandlerInitializer(this));//心跳 b.group(group).channel(NioSocketChannel.class). option(ChannelOption.TCP_NODELAY, true). handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline=socketChannel.pipeline(); //pipeline.addLast(new MessageEncoder());//協議編碼器 pipeline.addLast(new MessageDecoder());//協議解碼器 pipeline.addLast(hander); pipeline.addLast(new ClientHandler(TcpClient.this)); } }); try { f =b.connect(mediaPara.getIp(),mediaPara.getPort()).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); return false; } return channel.isActive(); }
