一、RocketMq broker服務器與客戶端的網絡通信是基於netty4.x實現的,重點分析 RocketMq設計的通信協議及對應的編解碼 開發。
名字解釋
編碼:將java對象轉換成二進制數據,用於放到網絡中進行傳輸
解碼:將從網絡中讀取到的二進制數據轉換成相應的java對象
二、Remoting設計的通信協議格式如下(重點理解,能根據通信協議格式來對網絡中讀取的二進制數據進行編解碼):
協議格式 <length> <header length> <header data> <body data>
1 2 3 4
1、4個字節的int型數據來存儲2、3、4的總長度
2、4個字節的int型數據來存儲報文頭部的字節長度等於3的長度
3、存儲報文頭部的數據
4、存儲報文體的數據
三、在通信過程中,服務器與客戶端通過傳遞 RemotingCommand 對象來進行交互,下面將根據代碼實現來分析通信協議的編解碼開發
3.1 分析RemotingCommand 類encode()方法,它將按照上述定義的協議格式進行對象的編碼操作:
public ByteBuffer encode() {// 1> header length sizeint length = 4; //表示用4個字節來存儲頭部長度// 2> header data lengthbyte[] headerData = this.buildHeader(); //報文頭部的數據length += headerData.length; //加上頭部報文的字節長度// 3> body data lengthif (this.body != null) {length += body.length; //如果報文體body有數據則加上報文體的字節長度}ByteBuffer result = ByteBuffer.allocate(4 + length); //分配一個 (4+length)這么大的字節緩沖區,這個緩沖區就用來存儲上述協議格式的整個報文的數據//下面代碼開始往緩沖區存放數據// lengthresult.putInt(length); //緩沖區的最開始的4個字節用來存儲總的長度length// header lengthresult.putInt(headerData.length); //緩沖區接下來4個字節用來存儲報文頭部的長度// header dataresult.put(headerData); //緩沖區接下來存儲報文頭部數據// body data;if (this.body != null) {result.put(this.body); //緩沖區最后用來存儲報文體的數據}result.flip(); //將緩沖區翻轉,用於將ByteBuffer放到網絡通道中進行傳輸return result;}
3.2 分析RemotingCommand 類decode()方法,它將按照上述定義的協議格式進行各個報文段的字節數據讀取,然后轉換成RemotingCommand對象:
public class NettyDecoder extends LengthFieldBasedFrameDecoder {private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);private static final int FRAME_MAX_LENGTH = //Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "8388608"));public NettyDecoder() {super(FRAME_MAX_LENGTH, 0, 4, 0, 4); //0,4,0,4 每一個0,4,這個是表示存放長度的變量的字節所占的長度,為4個,第二個4表示就是解碼之后的數據包跳過的字節數為4,表示就將數據包的頭部給去掉了。}//此處省略一萬字
}
//
在調用decode()方法解碼之前,會調用上述NettyDecoder 類的decode()方法,在上述構造方法中,會先去掉報文的前4個字節,這4個字節是存儲的后面報文的長度.
public static RemotingCommand decode(final ByteBuffer byteBuffer) {int length = byteBuffer.limit(); //獲取字節緩沖區的整個長度,這個長度等於通信協議格式的2、3、4段的總長度int headerLength = byteBuffer.getInt(); //從緩沖區中讀取4個字節的int類型的數據值 ,這個值就是報文頭部的長度byte[] headerData = new byte[headerLength];byteBuffer.get(headerData); //接下來從緩沖區中讀取headerLength個字節的數據,這個數據就是報文頭部的數據int bodyLength = length - 4 - headerLength;byte[] bodyData = null;if (bodyLength > 0) {bodyData = new byte[bodyLength];byteBuffer.get(bodyData); //接下來讀取length-4-headerLength 個字節的數據,這個數據就是報文體的數據}
//接下來將讀取到的數據轉換成 RemotingCommand 對象
RemotingCommand cmd = RemotingSerializable.decode(headerData, RemotingCommand.class);cmd.body = bodyData;return cmd;}四 、總結
開發任何的socket 長連接的網絡程序,涉及服務器與客戶端的開發,首先要定義服務端與客戶端的通信協議格式,第二根據定義的通信協議格式來進行傳輸數據的編解碼操作。 上述的通信協議格式為常用的通信協議格式。當長連接在設定的間隔時間范圍內沒有數據傳輸時,需要按照協議發送心跳包,心跳包的協議格式也可以按照這種協議格式發送,也可以另外針對發送的心跳包來定義通信協議格式。