RocketMq通信協議格式及編解碼 (源碼分析)


一、RocketMq broker服務器與客戶端的網絡通信是基於netty4.x實現的,重點分析  RocketMq設計的通信協議及對應的編解碼 開發。

        名字解釋
                        編碼:將java對象轉換成二進制數據,用於放到網絡中進行傳輸
                        解碼:將從網絡中讀取到的二進制數據轉換成相應的java對象
 
二、Remoting設計的通信協議格式如下(重點理解,能根據通信協議格式來對網絡中讀取的二進制數據進行編解碼):
        
        協議格式 <length> <header length> <header data> <body data>
                                1                      2                        3                      4
 
        1、4個字節的int型數據來存儲2、3、4的總長度
        2、4個字節的int型數據來存儲報文頭部的字節長度等於3的長度
        3、存儲報文頭部的數據
        4、存儲報文體的數據
 
三、在通信過程中,服務器與客戶端通過傳遞 RemotingCommand 對象來進行交互,下面將根據代碼實現來分析通信協議的編解碼開發
 
         3.1  分析RemotingCommand 類encode()方法,它將按照上述定義的協議格式進行對象的編碼操作:
 
public ByteBuffer encode() {
        // 1> header length size
        int length = 4;    //表示用4個字節來存儲頭部長度
 
        // 2> header data length
        byte[] headerData = this.buildHeader();   //報文頭部的數據
        length += headerData.length;                  //加上頭部報文的字節長度
 
        // 3> body data length
        if (this.body != null) {
            length += body.length;                         //如果報文體body有數據則加上報文體的字節長度
        }
 
ByteBuffer result = ByteBuffer.allocate(4 + length);     //分配一個  (4+length)這么大的字節緩沖區,這個緩沖區就用來存儲上述協議格式的整個報文的數據
 
         //下面代碼開始往緩沖區存放數據
 
        // length                                         
        result.putInt(length);                               //緩沖區的最開始的4個字節用來存儲總的長度length
 
        // header length
        result.putInt(headerData.length);         //緩沖區接下來4個字節用來存儲報文頭部的長度
 
        // header data
        result.put(headerData);                         //緩沖區接下來存儲報文頭部數據
 
        // body data;
        if (this.body != null) {
            result.put(this.body);                          //緩沖區最后用來存儲報文體的數據
        }
 
        result.flip();                                             //將緩沖區翻轉,用於將ByteBuffer放到網絡通道中進行傳輸
 
        return result;
    }
       
        3.2  分析RemotingCommand 類decode()方法,它將按照上述定義的協議格式進行各個報文段的字節數據讀取,然后轉換成RemotingCommand對象:
        
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);
    private static final int FRAME_MAX_LENGTH = //
            Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "8388608"));
 
 
    public NettyDecoder() {
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);        //0,4,0,4   每一個0,4,這個是表示存放長度的變量的字節所占的長度,為4個,第二個4表示就是解碼之后的數據包跳過的字節數為4,表示就將數據包的頭部給去掉了。
    }
 
//此處省略一萬字
}
 
                    // 在調用decode()方法解碼之前,會調用上述NettyDecoder 類的decode()方法,在上述構造方法中,會先去掉報文的前4個字節,這4個字節是存儲的后面報文的長度.
 
 public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();                              //獲取字節緩沖區的整個長度,這個長度等於通信協議格式的2、3、4段的總長度
        int headerLength = byteBuffer.getInt();               //從緩沖區中讀取4個字節的int類型的數據值 ,這個值就是報文頭部的長度
 
        byte[] headerData = new byte[headerLength];  
        byteBuffer.get(headerData);                                 //接下來從緩沖區中讀取headerLength個字節的數據,這個數據就是報文頭部的數據
 
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;                                       
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);                                //接下來讀取length-4-headerLength  個字節的數據,這個數據就是報文體的數據
        }
  
//接下來將讀取到的數據轉換成   RemotingCommand 對象
        RemotingCommand cmd = RemotingSerializable.decode(headerData, RemotingCommand.class);      
        cmd.body = bodyData;
 
        return cmd;
     }
 
 
 
四 、總結
開發任何的socket 長連接的網絡程序,涉及服務器與客戶端的開發,首先要定義服務端與客戶端的通信協議格式,第二根據定義的通信協議格式來進行
傳輸數據的編解碼操作。  上述的通信協議格式為常用的通信協議格式。當長連接在設定的間隔時間范圍內沒有數據傳輸時,需要按照協議發送心跳包,
心跳包的協議格式也可以按照這種協議格式發送,也可以另外針對發送的心跳包來定義通信協議格式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM