Netty自定義編-解碼器解決TCP通訊粘包拆包的問題


1. TCP 粘包和拆包基本介紹

  1. TCP 是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有一一成對的 socket,因此,發送端為了將多個發給接收端的包,更有效的發給對方,使用了優化方法(Nagle 算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包。這樣做雖然提高了效率,但是接收端就難於分辨出完整的數據包了,因為面向流的通信是無消息保護邊界的
  2. 由於 TCP 無消息保護邊界,需要在接收端處理消息邊界問題,也就是我們所說的粘包、拆包問題,看一張圖
  3. 示意圖 TCP 粘包、拆包圖解

對圖的說明: 假設客戶端分別發送了兩個數據包 D1 和 D2 給服務端,由於服務端一次讀取到字節數是不確定的,故可能存在以下四種情況:

  1. 服務端分兩次讀取到了兩個獨立的數據包,分別是 D1 和 D2,沒有粘包和拆包
  2. 服務端一次接受到了兩個數據包,D1 和 D2 粘合在一起,稱之為 TCP 粘包
  3. 服務端分兩次讀取到了數據包,第一次讀取到了完整的 D1 包和 D2 包的部分內容,第二次讀取到了 D2 包的剩余內容,這稱之為 TCP 拆包
  4. 服務端分兩次讀取到了數據包,第一次讀取到了 D1 包的部分內容 D1_1,第二次讀取到了 D1 包的剩余部分內容 D1_2 和完整的 D2 包。

2. TCP 粘包和拆包解決方案

  1. 使用自定義協議+編解碼器來解決
  2. 關鍵就是要解決服務器端每次讀取數據長度的問題,這個問題解決,就不會出現服務器多讀或少讀數據的問題,從而避免的 TCP 粘包、拆包。

3. 看一個具體的實例

這是一個真實的案例,使我們公司開發的協議,我們在和充電樁進行通訊的時候,協議報文格式長這樣:

 

 報文說明:

 報文里面有起始域域和長度域,我們可以先判斷前兩個字節是不是AAF5,再取3.4字節獲取包長度,最后按照包長度取定長的數據

解決問題之前首先說明一個問題:

首先,我自定義了一個非常簡單的解碼器,其實並不具備解碼功能,目的就是為了證實我的一個猜測:

public class DecodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        System.out.println(byteBuf.writerIndex());
    }
}

在解碼器中,輸出的是寫索引的位置。

然后,我開始嘗試觸發解碼器,發現當我不斷向byteBuf中寫入內容后,寫索引也不斷增長,我寫入兩字節,寫索引就增大2,寫入三字節,寫索引就增加3。由此,我猜測每次接收數據准備進行解碼的bytebuf都是同一個!而不是新建的bytebuf,所以我們就可以使用這個bytebuf來實現粘包分包問題!

確定了讀索引的位置就比較好辦了,接下來的解析方式就看數據的具體格式了,在解析之前有必要檢測一下數據長度是否完整,如果不完整,可以選擇跳過這一波解析,等待數據接收完整再解析(記得要將讀索引恢復到正確的位置)

大部分協議中,數據是有開頭標識和長度域的,比如此協議。那么可以先找到數據的起始值和長度域在哪里。

具體我們看代碼,本部分代碼在解碼器里實現:

/**
 * 解碼器
 */
public class DecodeUtil extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        try {
            //byteBuf的長度
            int bufNum = byteBuf.readableBytes();
            //byteBuf當前的讀索引
            int readerIndex = byteBuf.readerIndex();
            byte[] bytes = new byte[2];
            if (bufNum >= 4) {   //byteBuf的長度大於4,
               //查看前兩個字節判斷消息頭
                for (int index = 0; index < 2; index++) {
                    bytes[index] = byteBuf.getByte(readerIndex);
                    readerIndex++;
                }
                //將前2個字節轉換為16進制
                String header = ConvertCode.receiveHexToString(bytes);
                int length = 0;
                if (header.toUpperCase().equals("AAF5")) {
                    //獲取包長度
                    bytes = new byte[2];
                    bytes[0] = byteBuf.getByte(2);
                    bytes[1] = byteBuf.getByte(3);
                    length = ConvertCode.getShort(bytes, 0);
                } else {
                    return;
                }
                if (bufNum >= length) {
                    bytes = new byte[length];
                    byteBuf.readBytes(bytes);
                    list.add(bytes);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

工具類

public class ConvertCode {
    /**
     * @Title:bytes2HexString
     * @Description:字節數組轉16進制字符串
     * @param b
     *            字節數組
     * @return 16進制字符串
     * @throws
     */
    public static String bytes2HexString(byte[] b) {
        StringBuffer result = new StringBuffer();
        String hex;
        for (int i = 0; i < b.length; i++) {
            hex = Integer.toHexString(b[i] & 0xFF);
            if (hex.length() == 1) {
                hex = '0' + hex;
            }
            result.append(hex.toUpperCase());
        }
        return result.toString();
    }
    /**
     * @Title:hexString2Bytes
     * @Description:16進制字符串轉字節數組
     * @param src  16進制字符串
     * @return 字節數組
     */
    public static byte[] hexString2Bytes(String src) {
        int l = src.length() / 2;
        byte[] ret = new byte[l];
        for (int i = 0; i < l; i++) {
            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
        }
        return ret;
    }
    /**
     * @Title:string2HexString
     * @Description:字符串轉16進制字符串
     * @param strPart  字符串
     * @return 16進制字符串
     */
    public static String string2HexString(String strPart) {
        StringBuffer hexString = new StringBuffer();
        for (int i = 0; i < strPart.length(); i++) {
            int ch = (int) strPart.charAt(i);
            String strHex = Integer.toHexString(ch);
            hexString.append(strHex);
        }
        return hexString.toString();
    }
    /**
     * @Title:hexString2String
     * @Description:16進制字符串轉字符串
     * @param src
     *            16進制字符串
     * @return 字節數組
     * @throws
     */
    public static String hexString2String(String src) {
        String temp = "";
        for (int i = 0; i < src.length() / 2; i++) {
            //System.out.println(Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue());
            temp = temp+ (char)Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue();
        }
        return temp;
    }

    /**
     * @Title:char2Byte
     * @Description:字符轉成字節數據char-->integer-->byte
     * @param src
     * @return
     * @throws
     */
    public static Byte char2Byte(Character src) {
        return Integer.valueOf((int)src).byteValue();
    }

    /**
     * @Title:intToHexString
     * @Description:10進制數字轉成16進制
     * @param a 轉化數據
     * @param len 占用字節數
     * @return
     * @throws
     */
    public static String intToHexString(int a,int len){
        len<<=1;
        String hexString = Integer.toHexString(a);
        int b = len -hexString.length();
        if(b>0){
            for(int i=0;i<b;i++)  {
                hexString = "0" + hexString;
            }
        }
        return hexString;
    }


    /**
     * 將16進制的2個字符串進行異或運算
     * http://blog.csdn.net/acrambler/article/details/45743157
     * @param strHex_X
     * @param strHex_Y
     * 注意:此方法是針對一個十六進制字符串一字節之間的異或運算,如對十五字節的十六進制字符串異或運算:1312f70f900168d900007df57b4884
    先進行拆分:13 12 f7 0f 90 01 68 d9 00 00 7d f5 7b 48 84
    13 xor 12-->1
    1 xor f7-->f6
    f6 xor 0f-->f9
    ....
    62 xor 84-->e6
    即,得到的一字節校驗碼為:e6
     * @return
     */
    public static String xor(String strHex_X,String strHex_Y){
        //將x、y轉成二進制形式
        String anotherBinary=Integer.toBinaryString(Integer.valueOf(strHex_X,16));
        String thisBinary=Integer.toBinaryString(Integer.valueOf(strHex_Y,16));
        String result = "";
        //判斷是否為8位二進制,否則左補零
        if(anotherBinary.length() != 8){
            for (int i = anotherBinary.length(); i <8; i++) {
                anotherBinary = "0"+anotherBinary;
            }
        }
        if(thisBinary.length() != 8){
            for (int i = thisBinary.length(); i <8; i++) {
                thisBinary = "0"+thisBinary;
            }
        }
        //異或運算
        for(int i=0;i<anotherBinary.length();i++){
            //如果相同位置數相同,則補0,否則補1
            if(thisBinary.charAt(i)==anotherBinary.charAt(i))
                result+="0";
            else{
                result+="1";
            }
        }
        return Integer.toHexString(Integer.parseInt(result, 2));
    }


    /**
     *  Convert byte[] to hex string.這里我們可以將byte轉換成int
     * @param src byte[] data
     * @return hex string
     */
    public static String bytes2Str(byte[] src){
        StringBuilder stringBuilder = new StringBuilder("");
        if (src == null || src.length <= 0) {
            return null;
        }
        for (int i = 0; i < src.length; i++) {
            int v = src[i] & 0xFF;
            String hv = Integer.toHexString(v);
            if (hv.length() < 2) {
                stringBuilder.append(0);
            }
            stringBuilder.append(hv);
        }
        return stringBuilder.toString();
    }
    /**
     * @return 接收字節數據並轉為16進制字符串
     */
    public static String receiveHexToString(byte[] by) {
        try {
            /*io.netty.buffer.WrappedByteBuf buf = (WrappedByteBuf)msg;
            ByteBufInputStream is = new ByteBufInputStream(buf);
            byte[] by = input2byte(is);*/
            String str = bytes2Str(by);
            str = str.toUpperCase();
            return str;
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("接收字節數據並轉為16進制字符串異常");
        }
        return null;
    }



    /**
     * "7dd",4,'0'==>"07dd"
     * @param input 需要補位的字符串
     * @param size 補位后的最終長度
     * @param symbol 按symol補充 如'0'
     * @return
     * N_TimeCheck中用到了
     */
    public static String fill(String input, int size, char symbol) {
        while (input.length() < size) {
            input = symbol + input;
        }
        return input;
    }
//    public static void main(String args[]) {
//        String productNo = "3030303032383838";
//        System.out.println(hexString2String(productNo));
//        productNo = "04050103000001070302050304";
//        System.out.println(hexString2String(productNo));
//    }

    /**
     * 獲取short,小端
     *
     * @param src
     * @param index
     * @return
     */
    public static short getShort(byte[] src, int index) {
        return (short) (((src[index + 1] << 8) | src[index] & 0xff));
    }



}

我們用真實的報文來驗證處理的結果:

完整的報文:AAF56E0010026A0000000000363130313133303032373030303031000000000000000000000000000000000000A851000001006400000001010000010A02ED0B000020210414180000FF00000000000000000000000000000000000000000000000000000035C901000000000024

1.首先模擬完整報文發送100次

 

 服務端接收解碼后的結果:沒有粘包

 

2.模擬1包完整報文+半包報文,再發后半包報文

 

 

 

 至此,完美解決粘包問題,其他情況各位可以自己模擬!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM