編程思想:如何設計一個好的通信網絡協議


當網絡中兩個進程需要通信時,我們往往會使用 Socket 來實現。Socket 都不陌生。當三次握手成功后,客戶端與服務端就能通信,並且,彼此之間通信的數據包格式都是二進制,由 TCP/IP 協議負責傳輸。

當客戶端和服務端取得了二進制數據包后,我們往往需要『萃取』出想要的數據,這樣才能更好的執行業務邏輯。所以,我們需要定義好數據結構來描述這些二進制數據的格式,這就是通信網絡協議。簡單講,就是需要約定好二進制數據包中每一段字節的含義,比如從第 n 字節開始的 m 長度是核心數據,有了這樣的約定后,我們就能解碼出想要的數據,執行業務邏輯,這樣我們就能暢通無阻的通信了。

網絡協議的設計

概要划分

一個最基本的網絡協議必須包含

  • 數據的長度
  • 數據

了解 TCP 協議的同學一定聽說過粘包、拆包 這兩個術語。因為TCP協議是數據流協議,它的底層根據二進制緩沖區的實際情況進行包的划分。所以,不可避免的會出現粘包,拆包 現象 。為了解決它們,我們的網絡協議往往會使用一個 4 字節的 int 類型來表示數據的大小。比如,Netty 就為我們提供了 LengthFieldBasedFrameDecoder 解碼器,它可以有效的使用自定義長度幀來解決上述問題。

同時一個好的網絡協議,還會將動作和業務數據分離。試想一下, HTTP 協議的分為請求頭,請求體——

  • 請求頭:定義了接口地址、Http MethodHTTP 版本
  • 請求體:定義了需要傳遞的數據

這就是一種分離關注點的思想。所以自定義的網絡協議也可以包含:

  • 動作指令:比如定義 code 來分門別類的代表不同的業務邏輯
  • 序列化算法:描述了 JAVA 對象和二進制之間轉換的形式,提供多種序列化/反序列化方式。比如 jsonprotobuf 等等,甚至是自定義算法。比如:rocketmq 等等。

同時,協議的開頭可以定義一個約定的魔數。這個固定值(4字節),一般用來判斷當前的數據包是否合法。比如,當我們使用 telnet 發送錯誤的數據包時,很顯然,它不合法,會導致解碼失敗。所以,為了減輕服務器的壓力,我們可以取出數據包的前4個字節與固定的魔數對比,如果是非法的格式,直接關閉連接,不繼續解碼。

網絡協議結構如下所示

+--------------+-----------+------------+-----------+----------+
| 魔數(4)       | code(1)   |序列化算法(1) |數據長度(4) |數據(n)   |
+--------------+-----------+------------+-----------+----------+ 

RocketMQ 通信網絡協議的實現

RocketMQ 網絡協議

這一小節,我們從RocketMQ 中,分析優秀通信網絡協議的實現。RocketMQ 項目中,客戶端和服務端的通信是基於 Netty 之上構建的。同時,為了更加有效的通信,往往需要對發送的消息自定義網絡協議。

RocketMQ 的網絡協議,從數據分類的角度上看,可分為兩大類

  • 消息頭數據(Header Data)
  • 消息體數據(Body Data)

從左到右

  • 第一段:4 個字節整數,等於2、3、4 長度總和

  • 第二段:4 個字節整數,等於3 的長度。特別的 byte[0] 代表序列化算法,byte[1~3]才是真正的長度

  • 第三段:代表消息頭數據,結構如下

{
    "code":0,
    "language":"JAVA",
    "version":0,
    "opaque":0,
    "flag":1,
    "remark":"hello, I am respponse /127.0.0.1:27603",
    "extFields":{
        "count":"0",
        "messageTitle":"HelloMessageTitle"
    }
}
  • 第四段:代表消息體數據

RocketMQ 消息頭協議詳細如下:

Header 字段名 類型 Request Response
code 整數 請求操作代碼,請求接收方根據不同的代碼做不同的操作 應答結果代碼,0表示成功,非0表示各種錯誤代碼
language 字符串 請求發起方實現語言,默認JAVA 應答接收方實現語言
version 整數 請求發起方程序版本 應答接收方程序版本
opaque 整數 請求發起方在同一連接上不同的請求標識代碼,多線程連接復用使用 應答方不做修改,直接返回
flag 整數 通信層的標志位 通信層的標志位
remark 字符串 傳輸自定義文本信息 錯誤詳細描述信息
extFields HashMap<String,String> 請求自定義字段 應答自定義字段

編碼過程

RocketMQ 的通信模塊是基於 Netty的。通過定義 NettyEncoder 來實現對每一個 Channel的 出棧數據進行編碼,如下所示:

@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
           ...
        }
    }
}

其中,核心的編碼過程位於 RemotingCommand 對象中,encodeHeader 階段,需要統計出消息總長度,即:

  • 定義消息頭長度,一個整數表示:占4個字節

  • 定義消息頭數據,並計算其長度

  • 定義消息體數據,並計算其長度

  • 額外再加 4是因為需要加入消息總長度,一個整數表示:占4個字節

public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> 消息頭長度,一個整數表示:占4個字節
    int length = 4;

    // 2> 消息頭數據
    byte[] headerData;
    headerData = this.headerEncode();
    // 再加消息頭數據長度
    length += headerData.length;

    // 3> 再加消息體數據長度
    length += bodyLength;
    // 4> 額外加 4是因為需要加入消息總長度,一個整數表示:占4個字節
    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // 5> 將消息總長度加入 ByteBuffer
    result.putInt(length);

    // 6> 將消息的頭長度加入 ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // 7> 將消息頭數據加入 ByteBuffer
    result.put(headerData);

    result.flip();

    return result;
}

其中,encode 階段會將 CommandCustomHeader 數據轉換 HashMap<String,String>,方便序列化

public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap<String, String>();
        }

        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

特別的,消息頭序列化支持兩種算法:

  • JSON
  • RocketMQ
private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}

這兒需要值得注意的是,encode階段將當前 RPC 類型和 headerData長度編碼到一個 byte[4] 數組中,byte[0] 位序列化類型。

public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

其中,通過與運算 & 0xFF 取低八位數據。

所以, 最終 length 長度等於序列化類型 + header length + header data + body data 的字節的長度。

解碼過程

RocketMQ 解碼通過NettyDecoder來實現,它繼承自 LengthFieldBasedFrameDecoder,其中調用了父類LengthFieldBasedFrameDecoder的構造函數

super(FRAME_MAX_LENGTH, 0, 4, 0, 4);

這些參數設置4個字節代表 length總長度,同時解碼時跳過最開始的4個字節:

frame = (ByteBuf) super.decode(ctx, in);

所以,得到的 frame= 序列化類型 + header length + header data + body data 。解碼如下所示:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    //總長度
    int length = byteBuffer.limit();
    //原始的 header length,4位
    int oriHeaderLen = byteBuffer.getInt();
    //真正的 header data 長度。忽略 byte[0]的 serializeType
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
        case JSON:
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);
            return resultJson;
        case ROCKETMQ:
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);
            return resultRMQ;
        default:
            break;
    }

    return null;
}

其中,getProtocolType,右移 24位,拿到 serializeType

public static SerializeType getProtocolType(int source) {
    return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}

getHeaderLength 拿到 0-24 位代表的 headerData length:

public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
}

小結

對於諸多中間件而言,底層的網絡通信模塊往往會使用 NettyNetty 提供了諸多的編解碼器,可以快速方便的上手。本文從如何設計一個網絡協議入手,最終切入到 RocketMQ 底層網絡協議的實現。可以看到,它並不復雜。仔細研讀幾遍變能理解其奧義。具體參考類NettyEncoderNettyDecoderRemotingCommand


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM