基於傳輸層TCP協議,自定義實現一個應用層協議
一:回顧JsonCpp
C++通過JsonCpp讀取Json文件
網絡編程字節序轉換問題
二:實現自定義應用層
(一)協議分類
1.按編碼方式
二進制協議:比如網絡通信運輸層中的tcp協議。
明文的文本協議:比如應用層的http、redis協議。
混合協議(二進制+明文):比如蘋果公司早期的APNs推送協議。
2.按協議邊界
固定邊界協議:能夠明確得知一個協議報文的長度,這樣的協議易於解析,比如tcp協議。
模糊邊界協議:無法明確得知一個協議報文的長度,這樣的協議解析較為復雜,通常需要通過某些特定的字節來界定報文是否結束,比如http協議。
(二)協議設計
本協議采用固定邊界+混合編碼策略。用於傳輸Json數據(命令)
1.協議頭
8字節的定長協議頭。支持版本號,基於魔數的快速校驗,不同服務的復用。定長協議頭使協議易於解析且高效。
2.協議體
變長json作為協議體。json使用明文文本編碼,可讀性強、易於擴展、前后兼容、通用的編解碼算法。json協議體為協議提供了良好的擴展性和兼容性
3.協議圖

(三)設計協議結構
const uint8_t MY_PROTO_MAGIC = 8; //協議魔數:通過魔數進行簡單對比校驗,也可以像之前學的CRC校驗替換 const uint32_t MY_PROTO_MAX_SIZE = 10*1024*1024; //10M協議中數據最大 const uint32_t MY_PROTO_HEAD_SIZE = 8; //協議頭大小
//協議頭部 struct MyProtoHead { uint8_t version; //協議版本號 uint8_t magic; //協議魔數 uint16_t server; //協議復用的服務號,用於標識協議中的不同服務,比如向服務器獲取get 設置set 添加add ... 都是不同服務(由我們指定) uint32_t len; //協議長度(協議頭部+變長json協議體=總長度) }; //協議消息體 struct MyProtoMsg { MyProtoHead head; //協議頭 Json::Value body; //協議體 };
(四)實現協議封裝函數
//協議封裝類 class MyProtoEncode { public: //協議消息體封裝函數:傳入的pMsg里面只有部分數據,比如Json協議體,服務號,我們對消息編碼后會修改長度信息,這時需要重新編碼協議 uint8_t* encode(MyProtoMsg* pMsg, uint32_t& len); //返回長度信息,用於后面socket發送數據 private: //協議頭封裝函數 void headEncode(uint8_t* pData,MyProtoMsg* pMsg); };
//----------------------------------協議頭封裝函數---------------------------------- //pData指向一個新的內存,需要pMsg中數據對pData進行填充 void MyProtoEncode::headEncode(uint8_t* pData,MyProtoMsg* pMsg) { //設置協議頭版本號為1 *pData = 1; ++pData; //向前移動一個字節位置到魔數 //設置協議頭魔數 *pData = MY_PROTO_MAGIC; //用於簡單校驗數據,只要發送方和接受方的魔數號一致,則接受認為數據正常 ++pData; //向前移動一個字節位置,到server服務字段(16位大小) //設置協議服務號,服務號,用於標識協議中的不同服務,比如向服務器獲取get 設置set 添加add ... 都是不同服務(由我們指定) //外部設置,存放在pMsg中,其實可以不用修改,直接跳過該地址 *(uint16_t*)pData = pMsg->head.server; //原文是打算轉換為網絡字節序(但是沒必要)網絡中不會查看應用層數據的 pData+=2; //向前移動兩個字節,到len長度字段 //設置協議頭長度字段(協議頭+協議消息體),其實在消息體編碼中已經被修正了,這里也可以直接跳過 *(uint32_t*)pData = pMsg->head.len; //原文也是進行了字節序轉化,無所謂了。反正IP網絡層也不看 } //協議消息體封裝函數:傳入的pMsg里面只有部分數據,比如Json協議體,服務號,版本號,我們對消息編碼后會修改長度信息,這時需要重新編碼協議 //len返回長度信息,用於后面socket發送數據 uint8_t* MyProtoEncode::encode(MyProtoMsg* pMsg, uint32_t& len) { uint8_t* pData = NULL; //用於開辟新的空間,存放編碼后的數據 Json::FastWriter fwriter; //讀取Json::Value數據,轉換為可以寫入文件的字符串 //協議Json體序列化 string bodyStr = fwriter.write(pMsg->body); //計算消息序列化以后的新長度 len = MY_PROTO_HEAD_SIZE + (uint32_t)bodyStr.size(); pMsg->head.len = len; //一會編碼協議頭部時,會用到 //申請一塊新的空間,用於保存消息(這里可以不用,直接使用原來空間也可以) pData = new uint8_t[len]; //編碼協議頭 headEncode(pData,pMsg); //函數內部沒有通過二級指針修改pData的數據,修改的是臨時數據 //打包協議體 memcpy(pData+MY_PROTO_HEAD_SIZE,bodyStr.data(),bodyStr.size()); return pData; //返回消息首部地址 }
(五)實現協議解析函數
typedef enum MyProtoParserStatus //協議解析的狀態 { ON_PARSER_INIT = 0, //初始狀態 ON_PARSER_HEAD = 1, //解析頭部 ON_PARSER_BODY = 2, //解析數據 }MyProtoParserStatus;
//協議解析類 class MyProtoDecode { private: MyProtoMsg mCurMsg; //當前解析中的協議消息體 queue<MyProtoMsg*> mMsgQ; //解析好的協議消息隊列 vector<uint8_t> mCurReserved; //未解析的網絡字節流,可以緩存所有沒有解析的數據(按字節) MyProtoParserStatus mCurParserStatus; //當前接受方解析狀態 public: void init(); //初始化協議解析狀態 void clear(); //清空解析好的消息隊列 bool empty(); //判斷解析好的消息隊列是否為空 void pop(); //出隊一個消息 MyProtoMsg* front(); //獲取一個解析好的消息 bool parser(void* data,size_t len); //從網絡字節流中解析出來協議消息,len是網絡中的字節流長度,通過socket可以獲取 private: bool parserHead(uint8_t** curData,uint32_t& curLen, uint32_t& parserLen,bool& parserBreak); //用於解析消息頭 bool parserBody(uint8_t** curData,uint32_t& curLen, uint32_t& parserLen,bool& parserBreak); //用於解析消息體 };
//----------------------------------協議解析類---------------------------------- //初始化協議解析狀態 void MyProtoDecode::init() { mCurParserStatus = ON_PARSER_INIT; } //清空解析好的消息隊列 void MyProtoDecode::clear() { MyProtoMsg* pMsg=NULL; while(!mMsgQ.empty()) { pMsg = mMsgQ.front(); delete pMsg; mMsgQ.pop(); } } //判斷解析好的消息隊列是否為空 bool MyProtoDecode::empty() { return mMsgQ.empty(); } //出隊一個消息 void MyProtoDecode::pop() { mMsgQ.pop(); } //獲取一個解析好的消息 MyProtoMsg* MyProtoDecode::front() { return mMsgQ.front(); } //從網絡字節流中解析出來協議消息,len由socket函數recv返回 bool MyProtoDecode::parser(void* data,size_t len) { if(len<=0) return false; uint32_t curLen = 0; //用於保存未解析的網絡字節流長度(是對vector) uint32_t parserLen = 0; //保存vector中已經被解析完成的字節流,一會用於清除vector中數據 uint8_t* curData = NULL; //指向data,當前未解析的網絡字節流 curData = (uint8_t*)data; //將當前要解析的網絡字節流寫入到vector中 while(len--) { mCurReserved.push_back(*curData); ++curData; } curLen = mCurReserved.size(); curData = (uint8_t*)&mCurReserved[0]; //獲取數據首地址 //只要還有未解析的網絡字節流,就持續解析 while(curLen>0) { bool parserBreak = false; //解析頭部 if(ON_PARSER_INIT == mCurParserStatus || //注意:標識很有用,當數據沒有完全達到,會等待下一次接受數據以后繼續解析頭部 ON_PARSER_BODY == mCurParserStatus) //可以進行頭部解析 { if(!parserHead(&curData,curLen,parserLen,parserBreak)) return false; if(parserBreak) break; //退出循環,等待下一次數據到達,一起解析頭部 } //解析完成協議頭,開始解析協議體 if(ON_PARSER_HEAD == mCurParserStatus) { if(!parserBody(&curData,curLen,parserLen,parserBreak)) return false; if(parserBreak) break; } //如果成功解析了消息,就把他放入消息隊列 if(ON_PARSER_BODY == mCurParserStatus) { MyProtoMsg* pMsg = NULL; pMsg = new MyProtoMsg; *pMsg = mCurMsg; mMsgQ.push(pMsg); } if(parserLen>0) { //刪除已經被解析的網絡字節流 mCurReserved.erase(mCurReserved.begin(),mCurReserved.begin()+parserLen); } return true; } } //用於解析消息頭 bool MyProtoDecode::parserHead(uint8_t** curData,uint32_t& curLen, uint32_t& parserLen,bool& parserBreak) { if(curLen < MY_PROTO_HEAD_SIZE) { parserBreak = true; //由於數據沒有頭部長,沒辦法解析,跳出即可 return true; //但是數據還是有用的,我們沒有發現出錯,返回true。等待一會數據到了,再解析頭部。由於標志沒變,一會還是解析頭部 } uint8_t* pData = *curData; //從網絡字節流中,解析出來協議格式數據。保存在MyProtoMsg mCurMsg; //當前解析中的協議消息體 //解析出來版本號 mCurMsg.head.version = *pData; pData++; //解析出用於校驗的魔數 mCurMsg.head.magic = *pData; pData++; //判斷校驗信息 if(MY_PROTO_MAGIC != mCurMsg.head.magic) return false; //數據出錯 //解析服務號 mCurMsg.head.server = *(uint16_t*)pData; pData+=2; //解析協議消息體長度 mCurMsg.head.len = *(uint32_t*)pData; //判斷數據長度是否超過指定的大小 if(mCurMsg.head.len > MY_PROTO_MAX_SIZE) return false; //將解析指針向前移動到消息體位置,跳過消息頭大小 (*curData) += MY_PROTO_HEAD_SIZE; curLen -= MY_PROTO_HEAD_SIZE; parserLen += MY_PROTO_HEAD_SIZE; mCurParserStatus = ON_PARSER_HEAD; return true; } //用於解析消息體 bool MyProtoDecode::parserBody(uint8_t** curData,uint32_t& curLen, uint32_t& parserLen,bool& parserBreak) { uint32_t JsonSize = mCurMsg.head.len - MY_PROTO_HEAD_SIZE; //消息體的大小 if(curLen<JsonSize) { parserBreak = true; //數據還沒有完全到達,我們還要等待一會數據到了,再解析消息體。由於標志沒變,一會還是解析消息體 return true; } Json::Reader reader; //Json解析類 if(!reader.parse((char*)(*curData), (char*)((*curData)+JsonSize),mCurMsg.body,false)) //false表示丟棄注釋 return false; //解析數據到body中 //數據指針向前移動 (*curData)+=JsonSize; curLen -= JsonSize; parserLen += JsonSize; mCurParserStatus = ON_PARSER_BODY; return true; }
(六)實現對應用層封裝、解析的測試
int main(int argc,char* argv[]) { uint32_t len=0; uint8_t* pData = NULL; MyProtoMsg msg1; MyProtoMsg msg2; MyProtoDecode myDecode; MyProtoEncode myEncode; //------放入第一個消息 msg1.head.server = 1; msg1.body["op"] = "set"; msg1.body["key"] = "id"; msg1.body["value"] = "6666"; pData = myEncode.encode(&msg1,len); myDecode.init(); if(!myDecode.parser(pData,len)) { cout<<"parser msg1 failed!"<<endl; } else { cout<<"parser msg1 successful!"<<endl; } //------放入第二個消息 msg2.head.server = 2; msg2.body["op"] = "get"; msg2.body["key"] = "id"; pData = myEncode.encode(&msg2,len); if(!myDecode.parser(pData,len)) { cout<<"parser msg2 failed!"<<endl; } else { cout<<"parser msg2 successful!"<<endl; } //------解析兩個消息 MyProtoMsg* pMsg = NULL; while(!myDecode.empty()) { pMsg = myDecode.front(); printMyProtoMsg(*pMsg); myDecode.pop(); } return 0; }
文件結構:

編譯:
g++ testApp.cpp ./myproto.cpp ./lib_json/*.cpp -I ./ -o test

三:實現傳輸層TCP編程
(一)TCP回顧

(二)客戶端代碼實現
#include <sys/types.h> #include <sys/socket.h> #include <unistd.h> #include <stdlib.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <string.h> #include "myproto.h" int myprotoSend(int sock); int main(int argc,char* argv[]) { if(argc != 3) { printf("USage:%s ip port\n", argv[0]); return 0; } //開始創建socket int sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(sock < 0) { printf("socket create failure\n"); return -1; } //使用connect與服務器地址,端口連接,需要定義服務端信息:地址結構體 struct sockaddr_in server; server.sin_family = AF_INET; //IPV4 server.sin_port = htons(atoi(argv[2])); //atoi將字符串轉數字 server.sin_addr.s_addr = inet_addr(argv[1]); //不直接使用htonl,因為傳入的是字符串IP地址,使用inet_addr正好對字符串IP,轉網絡大端所用字節序 unsigned int len = sizeof(struct sockaddr_in); //獲取socket地址結構體長度 if(connect(sock,(struct sockaddr*)&server,len)<0) { printf("socket connect failure\n"); return -2; } //連接成功,進行數據發送-------------這里可以改為循環發送 len = myprotoSend(sock); close(sock); return 0; } int myprotoSend(int sock) //-----------這里改為字符串解析,發送自己解析的Json數據 { uint32_t len=0; uint8_t* pData = NULL; MyProtoMsg msg1; MyProtoEncode myEncode; //------放入消息 msg1.head.server = 1; msg1.body["op"] = "set"; msg1.body["key"] = "id"; msg1.body["value"] = "6666"; pData = myEncode.encode(&msg1,len); return send(sock,pData,len,0); }
補充:如果不進行解析,直接按照一般的服務端接收程序接收我們的自定義數據:

其中47是輸出的應用層數據大小(協議頭+協議體),但是沒有對協議進行解碼,所以無法顯示!!
(三)服務器端實現
#include<stdio.h> #include<sys/types.h> #include<sys/socket.h> #include<stdlib.h> #include<unistd.h> #include <netinet/in.h> #include <arpa/inet.h> #include "myproto.h" int startup(char* _port,char* _ip); int myprotoRecv(int sock,char* buf,int max_len); int main(int argc,char* argv[]) { if(argc!=3) { printf("Usage:%s local_ip local_port\n",argv[0]); return 1; } //獲取監聽socket信息 int listen_sock = startup(argv[2],argv[1]); //設置結構體,用於接收客戶端的socket地址結構體 struct sockaddr_in remote; unsigned int len = sizeof(struct sockaddr_in); while(1) { //開始阻塞方式接收客戶端鏈接 int sock = accept(listen_sock,(struct sockaddr*)&remote,&len); if(sock<0) { printf("client accept failure!\n"); continue; } //開始接收客戶端消息 printf("get connect from %s:%d\n",inet_ntoa(remote.sin_addr),ntohs(remote.sin_port)); //inet_ntoa將網絡地址轉換成“.”點隔的字符串格式 char buf[1024]; len = myprotoRecv(sock,buf,1024); //len復用,這里作為接收長度------這里可以改為循環 close(sock); } return 0; } int startup(char* _port,char* _ip) { int sock = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(sock < 0) { printf("socket create failure!\n"); exit(-1); } //綁定服務端的地址信息,用於監聽當前服務的某網卡、端口 struct sockaddr_in local; local.sin_family = AF_INET; local.sin_port = htons(atoi(_port)); local.sin_addr.s_addr = inet_addr(_ip); int len = sizeof(local); if(bind(sock,(struct sockaddr*)&local,len)<0) { printf("socket bind failure!\n"); exit(-2); } //開始監聽sock,設置同時並發數量 if(listen(sock,5)<0) //允許最大連接數量5 { printf("socket listen failure!\n"); exit(-3); } return sock; //返回文件句柄 } int myprotoRecv(int sock,char* buf,int max_len) { unsigned int len; len = recv(sock,buf,sizeof(char)*max_len,0); MyProtoDecode myDecode; myDecode.init(); if(!myDecode.parser(buf,len)) { cout<<"parser msg failed!"<<endl; } else { cout<<"parser msg successful!"<<endl; } //------解析消息 MyProtoMsg* pMsg = NULL; while(!myDecode.empty()) { pMsg = myDecode.front(); printMyProtoMsg(*pMsg); myDecode.pop(); } return len; } /* inet_addr 將字符串形式的IP地址 -> 網絡字節順序 的整型值 inet_ntoa 網絡字節順序的整型值 ->字符串形式的IP地址 */
四:編譯測試自定義協議
(一)編譯TCP程序
g++ tcpServer.cpp ./myproto.cpp ./lib_json/*.cpp -I ./ -o ts g++ tcpClient.cpp ./myproto.cpp ./lib_json/*.cpp -I ./ -o tc

(二)進行測試


完成自定義協議!!!
(三)全部代碼見:GitHub(500行不到)
五:補充協議頭設計
(一)如果基於UDP實現,則需要在服務端設置應答(含有包序號、返回接受的數據大小...),以防止數據丟失
(二)協議頭的其它設計方案
方案1:包含大多數信息,但是出現:如果length數據丟失或者移位....

方案2:設置開始標志(同我們設置的magic標識),符合標志以后,開始解析協議


(三)數據類型type

