自定義應用層通信協議


基於傳輸層TCP協議,自定義實現一個應用層協議

一:回顧JsonCpp

C++通過JsonCpp讀取Json文件

網絡編程字節序轉換問題

二:實現自定義應用層

(一)協議分類

1.按編碼方式

二進制協議:比如網絡通信運輸層中的tcp協議。

明文的文本協議:比如應用層的http、redis協議。

混合協議(二進制+明文):比如蘋果公司早期的APNs推送協議。

2.按協議邊界

固定邊界協議:能夠明確得知一個協議報文的長度,這樣的協議易於解析,比如tcp協議。

模糊邊界協議:無法明確得知一個協議報文的長度,這樣的協議解析較為復雜,通常需要通過某些特定的字節來界定報文是否結束,比如http協議。

(二)協議設計

本協議采用固定邊界+混合編碼策略。用於傳輸Json數據(命令)

1.協議頭

8字節的定長協議頭。支持版本號,基於魔數的快速校驗,不同服務的復用。定長協議頭使協議易於解析且高效。

2.協議體

變長json作為協議體。json使用明文文本編碼,可讀性強、易於擴展、前后兼容、通用的編解碼算法。json協議體為協議提供了良好的擴展性和兼容性

3.協議圖

(三)設計協議結構

const uint8_t MY_PROTO_MAGIC = 8; //協議魔數:通過魔數進行簡單對比校驗,也可以像之前學的CRC校驗替換
const uint32_t MY_PROTO_MAX_SIZE = 10*1024*1024; //10M協議中數據最大
const uint32_t MY_PROTO_HEAD_SIZE = 8; //協議頭大小
//協議頭部
struct MyProtoHead
{
    uint8_t version; //協議版本號
    uint8_t magic; //協議魔數
    uint16_t server; //協議復用的服務號,用於標識協議中的不同服務,比如向服務器獲取get 設置set 添加add ... 都是不同服務(由我們指定)
    uint32_t len; //協議長度(協議頭部+變長json協議體=總長度)
};

//協議消息體
struct MyProtoMsg
{
    MyProtoHead head; //協議頭
    Json::Value body; //協議體
};

(四)實現協議封裝函數

//協議封裝類
class MyProtoEncode
{
public:
    //協議消息體封裝函數:傳入的pMsg里面只有部分數據,比如Json協議體,服務號,我們對消息編碼后會修改長度信息,這時需要重新編碼協議
    uint8_t* encode(MyProtoMsg* pMsg, uint32_t& len); //返回長度信息,用於后面socket發送數據
private:
    //協議頭封裝函數
    void headEncode(uint8_t* pData,MyProtoMsg* pMsg);
};
//----------------------------------協議頭封裝函數----------------------------------
//pData指向一個新的內存,需要pMsg中數據對pData進行填充
void MyProtoEncode::headEncode(uint8_t* pData,MyProtoMsg* pMsg)
{
    //設置協議頭版本號為1
    *pData = 1;
    ++pData; //向前移動一個字節位置到魔數

    //設置協議頭魔數
    *pData = MY_PROTO_MAGIC; //用於簡單校驗數據,只要發送方和接受方的魔數號一致,則接受認為數據正常
    ++pData; //向前移動一個字節位置,到server服務字段(16位大小)

    //設置協議服務號,服務號,用於標識協議中的不同服務,比如向服務器獲取get 設置set 添加add ... 都是不同服務(由我們指定)
    //外部設置,存放在pMsg中,其實可以不用修改,直接跳過該地址
    *(uint16_t*)pData = pMsg->head.server; //原文是打算轉換為網絡字節序(但是沒必要)網絡中不會查看應用層數據的
    pData+=2; //向前移動兩個字節,到len長度字段

    //設置協議頭長度字段(協議頭+協議消息體),其實在消息體編碼中已經被修正了,這里也可以直接跳過
    *(uint32_t*)pData = pMsg->head.len; //原文也是進行了字節序轉化,無所謂了。反正IP網絡層也不看
}

//協議消息體封裝函數:傳入的pMsg里面只有部分數據,比如Json協議體,服務號,版本號,我們對消息編碼后會修改長度信息,這時需要重新編碼協議
//len返回長度信息,用於后面socket發送數據
uint8_t* MyProtoEncode::encode(MyProtoMsg* pMsg, uint32_t& len)
{
    uint8_t* pData = NULL; //用於開辟新的空間,存放編碼后的數據
    Json::FastWriter fwriter; //讀取Json::Value數據,轉換為可以寫入文件的字符串

    //協議Json體序列化
    string bodyStr = fwriter.write(pMsg->body);

    //計算消息序列化以后的新長度
    len = MY_PROTO_HEAD_SIZE + (uint32_t)bodyStr.size();
    pMsg->head.len = len; //一會編碼協議頭部時,會用到
    //申請一塊新的空間,用於保存消息(這里可以不用,直接使用原來空間也可以)
    pData = new uint8_t[len];
    //編碼協議頭
    headEncode(pData,pMsg); //函數內部沒有通過二級指針修改pData的數據,修改的是臨時數據
    //打包協議體
    memcpy(pData+MY_PROTO_HEAD_SIZE,bodyStr.data(),bodyStr.size());

    return pData; //返回消息首部地址
}

(五)實現協議解析函數

typedef enum MyProtoParserStatus //協議解析的狀態
{
    ON_PARSER_INIT = 0, //初始狀態
    ON_PARSER_HEAD = 1, //解析頭部
    ON_PARSER_BODY = 2, //解析數據
}MyProtoParserStatus;
//協議解析類
class MyProtoDecode
{
private:
    MyProtoMsg mCurMsg; //當前解析中的協議消息體
    queue<MyProtoMsg*> mMsgQ; //解析好的協議消息隊列
    vector<uint8_t> mCurReserved; //未解析的網絡字節流,可以緩存所有沒有解析的數據(按字節)
    MyProtoParserStatus mCurParserStatus; //當前接受方解析狀態
public:
    void init(); //初始化協議解析狀態
    void clear(); //清空解析好的消息隊列
    bool empty(); //判斷解析好的消息隊列是否為空
    void pop();  //出隊一個消息

    MyProtoMsg* front(); //獲取一個解析好的消息
    bool parser(void* data,size_t len); //從網絡字節流中解析出來協議消息,len是網絡中的字節流長度,通過socket可以獲取
private:
    bool parserHead(uint8_t** curData,uint32_t& curLen,
        uint32_t& parserLen,bool& parserBreak); //用於解析消息頭
    bool parserBody(uint8_t** curData,uint32_t& curLen,
        uint32_t& parserLen,bool& parserBreak); //用於解析消息體
};
//----------------------------------協議解析類----------------------------------
//初始化協議解析狀態
void MyProtoDecode::init()
{
    mCurParserStatus = ON_PARSER_INIT;
}

//清空解析好的消息隊列
void MyProtoDecode::clear()
{
    MyProtoMsg* pMsg=NULL;
    while(!mMsgQ.empty())
    {
        pMsg = mMsgQ.front();
        delete pMsg;
        mMsgQ.pop();
    }
}

//判斷解析好的消息隊列是否為空
bool MyProtoDecode::empty()
{
    return mMsgQ.empty();
}

//出隊一個消息
void MyProtoDecode::pop()
{
    mMsgQ.pop();
}  

//獲取一個解析好的消息
MyProtoMsg* MyProtoDecode::front()
{
    return mMsgQ.front();
}

//從網絡字節流中解析出來協議消息,len由socket函數recv返回
bool MyProtoDecode::parser(void* data,size_t len)
{
    if(len<=0)
        return false;

    uint32_t curLen = 0; //用於保存未解析的網絡字節流長度(是對vector)
    uint32_t parserLen = 0; //保存vector中已經被解析完成的字節流,一會用於清除vector中數據
    uint8_t* curData = NULL; //指向data,當前未解析的網絡字節流

    curData = (uint8_t*)data;
    
    //將當前要解析的網絡字節流寫入到vector中    
    while(len--)
    {
        mCurReserved.push_back(*curData);
        ++curData;
    }

    curLen = mCurReserved.size();
    curData = (uint8_t*)&mCurReserved[0]; //獲取數據首地址

    //只要還有未解析的網絡字節流,就持續解析
    while(curLen>0)
    {
        bool parserBreak = false;

        //解析頭部
        if(ON_PARSER_INIT == mCurParserStatus || //注意:標識很有用,當數據沒有完全達到,會等待下一次接受數據以后繼續解析頭部
            ON_PARSER_BODY == mCurParserStatus) //可以進行頭部解析
        {
            if(!parserHead(&curData,curLen,parserLen,parserBreak))
                return false;
            if(parserBreak)
                break; //退出循環,等待下一次數據到達,一起解析頭部
        }
        
        //解析完成協議頭,開始解析協議體
        if(ON_PARSER_HEAD == mCurParserStatus)
        {
            if(!parserBody(&curData,curLen,parserLen,parserBreak))
                return false;
            if(parserBreak)
                break;
        }

        //如果成功解析了消息,就把他放入消息隊列
        if(ON_PARSER_BODY == mCurParserStatus)
        {
            MyProtoMsg* pMsg = NULL;
            pMsg = new MyProtoMsg;
            *pMsg = mCurMsg;
            mMsgQ.push(pMsg);
        }

        if(parserLen>0)
        {
            //刪除已經被解析的網絡字節流
            mCurReserved.erase(mCurReserved.begin(),mCurReserved.begin()+parserLen);
        }

        return true;
    }
}

//用於解析消息頭
bool MyProtoDecode::parserHead(uint8_t** curData,uint32_t& curLen,
    uint32_t& parserLen,bool& parserBreak)
{
    if(curLen < MY_PROTO_HEAD_SIZE)
    {
        parserBreak = true; //由於數據沒有頭部長,沒辦法解析,跳出即可
        return true; //但是數據還是有用的,我們沒有發現出錯,返回true。等待一會數據到了,再解析頭部。由於標志沒變,一會還是解析頭部
    }

    uint8_t* pData = *curData;
    
    //從網絡字節流中,解析出來協議格式數據。保存在MyProtoMsg mCurMsg; //當前解析中的協議消息體
    //解析出來版本號
    mCurMsg.head.version = *pData;
    pData++;
    //解析出用於校驗的魔數
    mCurMsg.head.magic = *pData;
    pData++;

    //判斷校驗信息
    if(MY_PROTO_MAGIC != mCurMsg.head.magic)
        return false; //數據出錯

    //解析服務號
    mCurMsg.head.server = *(uint16_t*)pData;
    pData+=2;

    //解析協議消息體長度
    mCurMsg.head.len = *(uint32_t*)pData;

    //判斷數據長度是否超過指定的大小
    if(mCurMsg.head.len > MY_PROTO_MAX_SIZE)
        return false;

    //將解析指針向前移動到消息體位置,跳過消息頭大小
    (*curData) += MY_PROTO_HEAD_SIZE;
    curLen -= MY_PROTO_HEAD_SIZE;
    parserLen += MY_PROTO_HEAD_SIZE;
    mCurParserStatus = ON_PARSER_HEAD;

    return true;
}

//用於解析消息體
bool MyProtoDecode::parserBody(uint8_t** curData,uint32_t& curLen,
    uint32_t& parserLen,bool& parserBreak)
{
    uint32_t JsonSize = mCurMsg.head.len - MY_PROTO_HEAD_SIZE; //消息體的大小
    if(curLen<JsonSize)
    {
        parserBreak = true; //數據還沒有完全到達,我們還要等待一會數據到了,再解析消息體。由於標志沒變,一會還是解析消息體
        return true;
    }

    Json::Reader reader; //Json解析類
    if(!reader.parse((char*)(*curData),
        (char*)((*curData)+JsonSize),mCurMsg.body,false)) //false表示丟棄注釋
        return false; //解析數據到body中

    //數據指針向前移動
    (*curData)+=JsonSize;
    curLen -= JsonSize;
    parserLen += JsonSize;
    mCurParserStatus = ON_PARSER_BODY;

    return true;
}

(六)實現對應用層封裝、解析的測試

int main(int argc,char* argv[])
{
    uint32_t len=0;
    uint8_t* pData = NULL;

    MyProtoMsg msg1;
    MyProtoMsg msg2;

    MyProtoDecode myDecode;
    MyProtoEncode myEncode;

    //------放入第一個消息
    msg1.head.server = 1;
    msg1.body["op"] = "set";
    msg1.body["key"] = "id";
    msg1.body["value"] = "6666";

    pData = myEncode.encode(&msg1,len);

    myDecode.init();

    if(!myDecode.parser(pData,len))
    {
        cout<<"parser msg1 failed!"<<endl;
    }
    else
    {
        cout<<"parser msg1 successful!"<<endl;
    }
    
    //------放入第二個消息

    msg2.head.server = 2;
    msg2.body["op"] = "get";
    msg2.body["key"] = "id";
    pData = myEncode.encode(&msg2,len);

    if(!myDecode.parser(pData,len))
    {
        cout<<"parser msg2 failed!"<<endl;
    }
    else
    {
        cout<<"parser msg2 successful!"<<endl;
    }

    //------解析兩個消息
    MyProtoMsg* pMsg = NULL;

    while(!myDecode.empty())
    {
        pMsg = myDecode.front();
        printMyProtoMsg(*pMsg);
        myDecode.pop();
    }

    return 0;
}

文件結構:

編譯:

g++ testApp.cpp ./myproto.cpp ./lib_json/*.cpp -I ./ -o test 

三:實現傳輸層TCP編程

(一)TCP回顧

 

(二)客戶端代碼實現

#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include "myproto.h"

int myprotoSend(int sock);

int main(int argc,char* argv[])
{
    if(argc != 3)
    {
        printf("USage:%s ip port\n", argv[0]);
        return 0;
    }

    //開始創建socket
    int sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    if(sock < 0)
    {
        printf("socket create failure\n");
        return -1;
    }

    //使用connect與服務器地址,端口連接,需要定義服務端信息:地址結構體
    struct sockaddr_in server;
    server.sin_family = AF_INET; //IPV4
    server.sin_port = htons(atoi(argv[2])); //atoi將字符串轉數字
    server.sin_addr.s_addr = inet_addr(argv[1]); //不直接使用htonl,因為傳入的是字符串IP地址,使用inet_addr正好對字符串IP,轉網絡大端所用字節序

    unsigned int len = sizeof(struct sockaddr_in); //獲取socket地址結構體長度

    if(connect(sock,(struct sockaddr*)&server,len)<0)
    {
        printf("socket connect failure\n");
        return -2;
    }

    //連接成功,進行數據發送-------------這里可以改為循環發送
    len = myprotoSend(sock);

    close(sock);
    return 0;
}

int myprotoSend(int sock) //-----------這里改為字符串解析,發送自己解析的Json數據
{

    uint32_t len=0;
    uint8_t* pData = NULL;

    MyProtoMsg msg1;

    MyProtoEncode myEncode;

    //------放入消息
    msg1.head.server = 1;
    msg1.body["op"] = "set";
    msg1.body["key"] = "id";
    msg1.body["value"] = "6666";

    pData = myEncode.encode(&msg1,len);

    return send(sock,pData,len,0);
}

補充:如果不進行解析,直接按照一般的服務端接收程序接收我們的自定義數據:

其中47是輸出的應用層數據大小(協議頭+協議體),但是沒有對協議進行解碼,所以無法顯示!!

(三)服務器端實現

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<stdlib.h>
#include<unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "myproto.h"

int startup(char* _port,char* _ip);
int myprotoRecv(int sock,char* buf,int max_len);

int main(int argc,char* argv[])
{
    if(argc!=3)
    {
        printf("Usage:%s local_ip local_port\n",argv[0]);
        return 1;
    }

    //獲取監聽socket信息
    int listen_sock = startup(argv[2],argv[1]); 

    //設置結構體,用於接收客戶端的socket地址結構體
    struct sockaddr_in remote;
    unsigned int len = sizeof(struct sockaddr_in);

    while(1)
    {
        //開始阻塞方式接收客戶端鏈接
        int sock = accept(listen_sock,(struct sockaddr*)&remote,&len);
        if(sock<0)
        {
            printf("client accept failure!\n");
            continue;
        }
        //開始接收客戶端消息
        printf("get connect from %s:%d\n",inet_ntoa(remote.sin_addr),ntohs(remote.sin_port)); //inet_ntoa將網絡地址轉換成“.”點隔的字符串格式
        char buf[1024];

        len = myprotoRecv(sock,buf,1024); //len復用,這里作為接收長度------這里可以改為循環
        
        close(sock);
    }
    return 0;
}

int startup(char* _port,char* _ip)
{
    int sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    if(sock < 0)
    {
        printf("socket create failure!\n");
        exit(-1);
    }

    //綁定服務端的地址信息,用於監聽當前服務的某網卡、端口
    struct sockaddr_in local;
    local.sin_family = AF_INET;
    local.sin_port = htons(atoi(_port));
    local.sin_addr.s_addr = inet_addr(_ip);

    int len = sizeof(local);

    if(bind(sock,(struct sockaddr*)&local,len)<0)
    {
        printf("socket bind failure!\n");
        exit(-2);
    }

    //開始監聽sock,設置同時並發數量
    if(listen(sock,5)<0) //允許最大連接數量5
    {
        printf("socket listen failure!\n");
        exit(-3);
    }

    return sock; //返回文件句柄
}

int myprotoRecv(int sock,char* buf,int max_len)
{
    unsigned int len;

    len = recv(sock,buf,sizeof(char)*max_len,0);

    MyProtoDecode myDecode;
    myDecode.init();

    if(!myDecode.parser(buf,len))
    {
        cout<<"parser msg failed!"<<endl;
    }
    else
    {
        cout<<"parser msg successful!"<<endl;
    }

    //------解析消息
    MyProtoMsg* pMsg = NULL;

    while(!myDecode.empty())
    {
        pMsg = myDecode.front();
        printMyProtoMsg(*pMsg);
        myDecode.pop();
    }

    return len;
}


/*
inet_addr 將字符串形式的IP地址 -> 網絡字節順序  的整型值
inet_ntoa 網絡字節順序的整型值 ->字符串形式的IP地址
*/

四:編譯測試自定義協議

(一)編譯TCP程序

g++ tcpServer.cpp ./myproto.cpp ./lib_json/*.cpp -I ./ -o ts 

g++ tcpClient.cpp ./myproto.cpp ./lib_json/*.cpp -I ./ -o tc

(二)進行測試

完成自定義協議!!!

(三)全部代碼見:GitHub(500行不到)

五:補充協議頭設計

(一)如果基於UDP實現,則需要在服務端設置應答(含有包序號、返回接受的數據大小...),以防止數據丟失

(二)協議頭的其它設計方案

方案1:包含大多數信息,但是出現:如果length數據丟失或者移位....

方案2:設置開始標志(同我們設置的magic標識),符合標志以后,開始解析協議

(三)數據類型type

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM