開源項目SMSS發開指南(三)——protobuf協議設計


本文的第一部分將介紹protobuf使用基礎以及如何利用protobuf設計通信協議。第二部分會給出smss項目的協議設計規范和源碼講解。

一.Protobuf使用基礎

什么是protobuf

protobuf是谷歌研發的一種數據序列化和存儲技術。主要可以用來解決網絡通訊中異構系統的通訊和數據持久化,與同類技術相比(JSON或XML),官方宣稱的數據量長度減少3~10倍,運算速度20~100倍。由於與平台無關,因此非常適合使用在多系統的交互中。

目前常見的使用版本是2和3,個人推薦如果你打算在項目中引入protobuf技術,不妨直接選擇版本3。以下的所有介紹也都基於protobuf3作為標准。

從個人的使用感受來看,protobuf的優點還是相當明顯的。不過,也有一些問題需要注意。

如何使用protobuf以及常見問題

protobuf依賴於一個可閱讀的描述文件,后綴以.proto結束。編寫proto描述文件有固定的格式,詳細說明參照官方文檔。smss項目的doc目錄下也有提供,描述文件可閱讀,因此不存在太大難度。只是需要注意,目前如果你打算使用protobuf3版本需要在文件開頭注明:syntax="proto3"。后期不清楚谷歌是否會更改,因此建議使用者應該關注官方說明。

一般來說,protobuf依賴谷歌提供的編譯工具將描述文件(.proto)翻譯為對應語言的源碼。你也可以在項目中直接引入protobuf的依賴包利用動態編譯(反射)直接使用。建議使用前一種方式,無論是難度上還是效率上都更直接。注意:編譯工具和protobuf的開發依賴有版本對應,需要保持統一

protobuf的C++版本會生成.h和.cc兩個文件。對於C/C++程序員來說,使用的方式和Struct幾乎一致。

protobuf的Java版本會生成一系列路徑(如果有設置的話)和對應的.java文件。Java開發人員可以直接復制到項目下使用,通過構造器模式來創建對象。

官方示例:

Person john = Person.newBuilder()
    .setId(1234)
    .setName("John Doe")
    .setEmail("jdoe@example.com")
    .build();
output = new FileOutputStream(args[0]);
john.writeTo(output);

protobuf的JS版本會生成.js文件,這里不再贅述。

不同語言的構造命令可以參考smss項目的腳本文件

使用Protobuf特別需要關注你的使用場景,通常來說需要注意以下兩點:

(1)protobuf專注於對小型字符數據的序列化/反序列化操作。如果你需要傳輸大型文件或二進制數據是不適合使用的。

(2)如果你打算在TCP/IP層來應用protobuf協議,依然需要設計包解析機制。TCP/IP傳輸會發生粘包或長包,這些問題protobuf無法幫你解決。如果你傳輸的包中包含多條數據,交給protobuf解析的時候,它只能反序列化出最后一條。這就要求在數據包的設計中必須包含一些必要字段。

目前smss項目的用法是在一個包的頭部增加了4個字節的包頭標識(AB47)和4個字節的小端整型數表示protobuf序列化后的包長度。根據筆者的實踐發現,相同的包結構如果設置的數據不同可能最后序列化的數據長度有差異。因此在設置包長度的時候一定要根據實際的序列化為標准。

什么場景下適合使用protobuf

相信在了解了protobuf的基本使用后,大多數有經驗的開發人員會有自己的判斷。我在這里僅拋磚引玉提供一些個人的思考:

(1)內部系統開發:目前protobuf並未被大規模的實踐。如果你的項目需要對接外部系統,請對方提供或支持protobuf協議難度較大。因此,內部系統開發進行交互推薦使用。

(2)TCP/IP層數據通信:目前的Java微服務應用大多使用http應用層協議,好處是實現過程相對簡單。而且由於各種開源框架對JSON-POJO的映射功能非常完備,如果從開發效率上考慮,顯然protobuf還不具備優勢。如果在業務中新增一些數據中台業務,需要開發更加高效的通信過程,利用protobuf是更加合理的方案。

(3)異構系統:不少物聯網項目會涉及多種語言多種設備間的通信。例如C++直接使用struct的序列化后傳輸給Java來處理,就必要麻煩。這類需求是使用protobuf的最佳使用場景。

二.SMSS項目協議設計規范

目前smss項目利用protobuf協議作為通信的主要手段,正如前文介紹的那樣。為了提高通信效率,項目各端內部使用TCP/IP層通信。因此在包頭設計了包頭標識和包長度標識(8個字節)。另外,與http協議不同的是,TCP/IP由於是長連接且是面向連接設計,因此需要設計應用層的規范。smss將一個完整的應用數據包分為數據頭和數據體,結構如下:

message MsgHeader
{
    int32 msg_size = 1; // 消息體的長度
    int64 msg_id = 2; // 消息ID,作為服務器應答時候的對應
    MsgType msg_type = 3; // 消息類型
    // 服務器為0
    uint32 from = 4; // 消息發送方
    uint32 to = 5; // 消息接收方
    string token = 6; // 令牌
}
message LoginReq
{
    string username = 1;
    string password = 2;
    bool is_need_key = 3; // 是否需要請求私鑰
}

message LoginResp
{
    enum LoginRespType
    {
        OK = 0;        //登陸成功
        ERROR=1;    //用戶名密碼錯誤
        NOUSER=2;    //用戶不存在
    }
    LoginRespType resp = 1;
    string token = 2; // 通訊令牌
    uint32 id = 3; // 用戶id
    string alias = 4; // 用戶別名
    string prv_key = 5; // 登錄成功攜帶用戶通訊私鑰
}

MsgHeader表示數據頭,除了提供發送發接收方等常用信息外,主要依賴消息類型(MsgType)和消息體長度(msg_size)作為數據包的反序列化依據。由於在通信的過程中需要加密,消息體是用過protobuf序列化完成后再使用算法進行加密傳輸。因此服務端只需要解析頭數據即可完成對消息的轉發和處理。為解析TCP包增加的8個字節,作為包長度的標識特指數據頭的長度,數據體的長度通過反序列化數據頭來確定。

數據的序列化與反序列化源碼主要包含在服務端(socket_manager.cpp)和客戶端(smss_socket_event.js)文件中。

服務端源碼:

void SocketManager::ReadCb()
{
    char flag[5] = {0};
    int head_size = 0;
    // 判斷報文頭
    int len = bufferevent_read(buff_ev_, flag, 4);
    if (len <= 0 || strcmp(flag, PKT_FLAG) != 0)
    {
        return;
    }
    // 獲得消息頭大小
    len = bufferevent_read(buff_ev_, &head_size, 4);
    if (len <= 0)
    {
        return;
    }
    char *head = new char[head_size];
    len = bufferevent_read(buff_ev_, head, head_size);
    // 解析消息頭對象
    MsgHeader msg_header;
    msg_header.ParseFromArray(head, head_size);
    delete[] head;
    char *msg_buff = new char[msg_header.msg_size()];
    // FIX:if msg_body too large
    len = bufferevent_read(buff_ev_, msg_buff, msg_header.msg_size());
    switch (msg_header.msg_type())
    {
    case MsgType::CONNECT_REQ:
        RecvConnectReqest(&msg_header, msg_buff, len);
        break;
    case MsgType::CLIENT_LOGIN_REQ: // 登錄請求
        RecvLoginRequest(&msg_header, msg_buff, len);
        break;
    case MsgType::HEART_BEAT: // 心跳
        RecvHeartBeat(msg_buff, len);
        break;
    case MsgType::USER_INFO_REQ: // 用戶信息請求
        if (msg_header.token() != this->token_)
        {
            LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "用戶通信令牌(token)驗證錯誤!");
            return;
        }
        RecvUserInfoRequest(&msg_header, msg_buff, len);
        break;
    case MsgType::MSG_SEND_REQ: // 消息發送請求
    {
        if (msg_header.token() != this->token_)
        {
            LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "用戶通信令牌(token)驗證錯誤!");
            return;
        }
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), "MsgType::MSG_SEND_REQ");
        work_thread_->SendToNetBus(msg_header.SerializeAsString().c_str(), msg_header.ByteSizeLong(), msg_buff, msg_header.msg_size());
    }
    break;
    case MsgType::USER_STATUS_REQ: // 用戶狀態請求
        if (msg_header.token() != this->token_)
        {
            LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), "用戶通信令牌(token)驗證錯誤!");
            return;
        }
        LOG4CPLUS_DEBUG(SimpleLogger::Get()->LoggerRef(), "MsgType::USER_STATUS_REQ");
        RecvUserStatusRequest(&msg_header, msg_buff, len);
        break;
    case MsgType::SERVICE_REGIST: // 服務注冊
        RecvServiceRegist(&msg_header, msg_buff, len);
        break;
    case MsgType::FILE_DOWNLOAD_NOTICE:
        work_thread_->SendToNetBus(msg_header.SerializeAsString().c_str(), msg_header.ByteSizeLong(), msg_buff, msg_header.msg_size());
        break;
    default:
    {
        stringstream ss;
        ss << "缺少對應的消息類型處理函數:" << msg_header.msg_type();
        LOG4CPLUS_ERROR(SimpleLogger::Get()->LoggerRef(), ss.str());
    }
    }
    delete[] msg_buff;
}

ReadCb()是數據接收的直接處理方法,首先讀取4個字節判斷包頭標識:int len = bufferevent_read(buff_ev_, flag, 4),判斷成功代表當前數據包是完整的。再讀取4個字節的整型數來判斷后續一個數據頭的長度:bufferevent_read(buff_ev_, &head_size, 4)。接下來收取數據頭的完整數據並通過protobuf反序列化:msg_header.ParseFromArray(head, head_size)。最后根據數據頭的msg_type字段判斷應該如何處理數據體。

客戶端源碼:

onData(data) {
  // 處理粘包,循環讀取
  let readSize = 0;
  while (readSize < data.length) {
    let flag = data.toString("utf8", readSize, readSize + 4);
    if (flag !== "AB47") {
      readSize += 4;
      continue;
    }
    readSize += 4;
    let headerSize = data.readInt32LE(readSize);
    readSize += 4;
    // 消息頭反向序列化
    let msgHeader = MsgHeader.deserializeBinary(
      data.subarray(readSize, headerSize + readSize)
    );
    readSize += headerSize;
    // 消息類型
    let msgType = msgHeader.getMsgType();
    // 消息大小
    let msgSize = msgHeader.getMsgSize();
    switch (msgType) {
      case MsgType.USER_INFO_RESP:
        this.onUserInfoResp(
          UserInfoResp.deserializeBinary(
            data.subarray(readSize, msgSize + readSize)
          )
        );
        break;
      case MsgType.USER_STATUS_NOTICE:
        this.onUserStatusNotice(
          UserStatus.deserializeBinary(
            data.subarray(readSize, msgSize + readSize)
          )
        );
        break;
      case MsgType.MSG_SEND_REQ:
        this.onSmsSendReq(data.subarray(readSize, msgSize + readSize));
        break;
      case MsgType.FILE_DOWNLOAD_NOTICE:
        new DownloadEvent(
          this.$store.state.User.userID,
          this.$store.state.User.userToken,
          msgHeader,
          FileDownloadNotice.deserializeBinary(
            data.subarray(readSize, msgSize + readSize)
          )
        );
        break;
      default:
    }
    readSize += msgSize;
  }
}
/**
 * 連接事件處理
 * 
 * @param {*} socket 
 * @param {*} userID 
 */
function ConnectEvent(socket, userID) {
    return new Promise((resolve, reject) => {
        fs.open("./data/.shadow/server.pem", "r", (err, fd) => {
            let connectReq = new ConnectReq();
            connectReq.setTimestamp(new Date().getTime());
            if (err) {
                connectReq.setIsNeedKey(true);
            } else {
                connectReq.setIsNeedKey(false);
            }
            let connectReqBuffer = connectReq.serializeBinary();
            let msgHeader = new MsgHeader();
            msgHeader.setMsgSize(connectReqBuffer.length);
            msgHeader.setMsgId(0);
            msgHeader.setMsgType(MsgType.CONNECT_REQ);
            msgHeader.setFrom(userID);
            msgHeader.setTo(0); // 發送給服務器
            const headerBuffer = msgHeader.serializeBinary();
            let packageHeader = Buffer.alloc(8);
            packageHeader.write("AB47");
            packageHeader.writeInt32LE(headerBuffer.length, 4);
            const packageBuffer = Buffer.concat([
                packageHeader,
                headerBuffer,
                connectReqBuffer
            ]);
            socket.write(packageBuffer, () => {
                socket.once("data", data => {
                    let flag = data.toString("utf8", 0, 4);
                    if (flag !== "AB47") {
                        return;
                    }
                    let headerSize = data.readInt32LE(4);
                    // 消息頭反向序列化
                    let msgHeader = MsgHeader.deserializeBinary(
                        data.subarray(8, headerSize + 8)
                    );
                    // 消息類型
                    let msgType = msgHeader.getMsgType();
                    // 消息大小
                    let msgSize = msgHeader.getMsgSize();
                    if (msgType !== MsgType.CONNECT_RESP) {
                        reject("ConnectEvent RES MsgType Error!");
                    } else {
                        let resp = ConnectResp.deserializeBinary(
                            data.subarray(8 + headerSize, msgSize + 8 + headerSize)
                        );
                        if (resp.getPubKey() !== "") {
                            fs.writeFile(
                                "./data/.shadow/server.pem",
                                resp.getPubKey(),
                                err => {
                                    if (err) {
                                        reject(err);
                                    };
                                    // 連接完成后進行登錄
                                    resolve(resp);
                                }
                            );
                        } else {
                            resolve(resp);
                        }
                    }
                })
            });
        });
    });
}

處理的過程和服務端的思路一致,也是從包頭到數據頭最后是數據體的解析。由於JavaScript在對網絡調用和文件讀取的時候大量需要使用回調函數,因此smss項目在客戶端利用Promise進行了封裝。學習的時候建議先熟悉一下Promise的使用方式。

 

完整源碼已經發布在碼雲上。

相關文件:《開源項目SMSS開發指南》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM