記一次解決netty半包問題的經歷


最近學習了netty,想寫一個簡單的rpc,結果發現發送消息時遇到難題了,網上搜了一下,這種情況是半包問題和粘包問題,主要是出現在並發高一些的時候。

talk is cheap 

客戶端編碼:

    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
            encode0(channelHandlerContext,o,byteBuf);
    }
    private void encode0(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        if(o instanceof UserInfo){
            byte[] data = Serializition.serialize((UserInfo) o,UserInfo.class);
            byteBuf.writeBytes(data);
        }
    }

服務端解碼:

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
myDecode(channelHandlerContext,byteBuf,list);
}
public void myDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
int len = byteBuf.readableBytes();
byte[] data = new byte[len];
byteBuf.readBytes(data);
UserInfo userInfo = Serializition.deSerialize(data,UserInfo.class);
list.add(userInfo);
}

這是最初版本的,一開始以為只要讀出來反序列化成對象就ok了,進行了簡單的測試發現沒問題,但客戶端發送頻繁一些服務端就開始報錯:

警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: java.lang.RuntimeException: Reading from a byte array threw an IOException (should never happen).

分析一下發現對於來自同一個遠程連接來說,服務端只會分配一個bytebuf來接收消息(這里使用的是UnpooledDirectByteBuf),這個bytebuf容量是動態擴增的,如果當前的長度不夠用來存儲新的消息就會自動擴展。當客戶端發送不頻繁時,服務端有足夠的時間來做准備接收和處理消息,不會出現問題。但客戶端頻繁發送時就會出現問題了,如上,服務端的可讀的字節超過了一個對象,讀取后下一個對象反序列化就會出現問題。

解決思路:

  1.每次發送定長的消息,不夠就補全,服務端設置對應的長度(但這樣有問題:如果這樣做客戶端會發送很多無用信息,浪費性能,而且不知道設置多大的長度合適)

  2.使用netty自帶的編碼和解碼器,如使用/r/n標志符解碼,這就要繼承MessageDecoder了,也就是字符解碼,即先將消息在字節--字符串--對象將轉換(有點浪費效率,而且萬一內容中有對應的分隔符就會出問題)

  3.每次發送消息前先獲取對象字節數組的長度(我最開始使用的方法,后來在網上也找到別人一樣的思路)

  客戶端:

    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
            encode1(channelHandlerContext,o,byteBuf);
    }
    private void encode1(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        if(o instanceof UserInfo){
            byte[] data = Serializition.serialize((UserInfo) o,UserInfo.class);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }

服務端:

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        myDecode1(channelHandlerContext,byteBuf,list);
    }
    public void myDecode1(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
        if(byteBuf.readableBytes()>4){
            int len = byteBuf.readInt();
            byte[] data = new byte[len];
            byteBuf.readBytes(data);
            UserInfo userInfo = Serializition.deSerialize(data,UserInfo.class);
            list.add(userInfo);
        }
    }

這就看起來簡單了  數據流是 |int|bytes|int|bytes,但實際情況還是發生了問題,還是出現了一樣的問題。異常原因是服務端實例化數組長度后可讀字節不夠,原因是發送時客戶端是分包發送的。

因此我在這個方法的基礎上增加了一個條件:如果可讀字節數不夠就保存已創建好的字節數組,等下一次字節數夠時使用

    private volatile int len=0;
    protected void decode5(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int length =len>0?len:(byteBuf.readableBytes()>=4?byteBuf.readInt():0);
        if(byteBuf.readableBytes()>=length&&length>0) {
            byte[] data = new byte[length];
            byteBuf.readBytes(data);
            UserInfo userInfo = Serializition.deSerialize(data, UserInfo.class);
            list.add(userInfo);
            //bytes.put(length, data);
            len=0;
        }else {
            len = length;
        }
    }

經過測試,問題得到解決。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM