我的新浪微博:http://weibo.com/freshairbrucewoo。
歡迎大家相互交流,共同提高技術。
這個協議是Thrift支持的默認二進制協議,它以二進制的格式寫所有的數據,基本上直接發送原始數據。因為它直接從TVirtualProtocol類繼承,而且是一個模板類。它的模板參數就是一個封裝具體傳輸發送的類,這個類才是真正實現數據傳輸的。這個類的定義上一節舉例已經出現過了就不在列出來了。
下面我就結合scribe的Log函數執行的具體過程來分析使用這個協議所執行的功能,看看二進制協議是怎樣工作的。
RPC調用使用到協議部分主要是在發送函數相關信息到服務器和接收服務器返回結果。現在我就結合Log函數的實現代碼具體分析。首先看看Log函數的發送相關信息函數send_log(在文件scribe.cpp):
1 void scribeClient::send_Log(const std::vector<LogEntry> & messages) 2 3 { 4 5 int32_t cseqid = 0; 6 7 oprot_->writeMessageBegin("Log", ::apache::thrift::protocol::T_CALL, cseqid);//寫入函數調用消息 8 9 scribe_Log_pargs args; 10 11 args.messages = &messages; 12 13 args.write(oprot_);//調用參數類自己的寫入函數寫入參數到服務器 14 15 oprot_->writeMessageEnd();//寫入消息調用寫入 16 17 oprot_->getTransport()->writeEnd();//結束傳輸層的寫入 18 19 oprot_->getTransport()->flush();//刷新傳輸流,讓寫入馬上執行,因為RPC調用需要馬上得到結果 20 21 }
從上面代碼可以看出:首先調用具體一個協議的writeMessageBegin函數,當然這個我們分析的是二進制協議,那就看看二進制協議這個函數的實現,代碼如下:
1 template <class Transport_> 2 3 uint32_t TBinaryProtocolT<Transport_>::writeMessageBegin(const std::string& name, 4 5 const TMessageType messageType, const int32_t seqid) { 6 7 if (this->strict_write_) {//判斷是否需要強制寫入版本號 8 9 int32_t version = (VERSION_1) | ((int32_t)messageType);//本版號是協議號和消息類型的與結果 10 11 uint32_t wsize = 0;//記錄寫入的長度 12 13 wsize += writeI32(version);//寫版本號 14 15 wsize += writeString(name);//寫消息名稱,這就是函數名稱Log 16 17 wsize += writeI32(seqid);//寫調用序列號 18 19 return wsize;//返回寫入的長度 20 21 } else { 22 23 uint32_t wsize = 0; 24 25 wsize += writeString(name); 26 27 wsize += writeByte((int8_t)messageType); 28 29 wsize += writeI32(seqid); 30 31 return wsize; 32 33 } 34 35 }
根據上面代碼和注釋可以看出,根據是否需要寫入協議版本號寫入的內有所差別,寫入協議號的目的是可以堅持客戶端和服務器端是否使用相同的協議來傳輸的數據,保證數據格式的正確性。二進制的協議定義如下:
1 static const int32_t VERSION_MASK = 0xffff0000;//取得協議的掩碼 2 3 static const int32_t VERSION_1 = 0x80010000;//具體協議本版號
具體寫入又調用了自己實現的相應的數據類型寫入函數,看看writeString是怎么實現的,如下:
1 template <class Transport_> 2 3 uint32_t TBinaryProtocolT<Transport_>::writeString(const std::string& str) { 4 5 uint32_t size = str.size();//取得字符串的長度(大小) 6 7 uint32_t result = writeI32((int32_t)size);//寫入字符串的長度到服務器 8 9 if (size > 0) { 10 11 this->trans_->write((uint8_t*)str.data(), size);//調用具體某一個傳輸方式的寫入函數寫入字符串數據 12 13 } 14 15 return result + size;//返回寫入的大小 16 17 }
從上面代碼可以看出這些類型的函數就是將對應的數據類型寫入服務器,而且具體寫入在這里還沒有真正的進行,因為后面會講到的Transport相關類還會對傳輸方式進行包裝。
現在我們繼續回到send_Log函數,寫入函數調用的消息以后就開始寫函數調用需要的參數,函數參數的寫入是通過函數參數對應的封裝類進行的,Log函數的參數封裝類是scribe_Log_pargs,把對應的參數傳遞給這個類的對象,然后調用它自己的寫入函數寫入參數到服務器,代碼如下:
1 uint32_t scribe_Log_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { 2 3 uint32_t xfer = 0; 4 5 xfer += oprot->writeStructBegin("scribe_Log_pargs");//寫入參數類的名稱 6 7 xfer += oprot->writeFieldBegin("messages", ::apache::thrift::protocol::T_LIST, 1);//寫入字段名稱和類型 8 9 { 10 11 //開始寫入鏈表類型 12 13 xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, (*(this->messages)).size()); 14 15 std::vector<LogEntry> ::const_iterator _iter6; 16 17 for (_iter6 = (*(this->messages)).begin(); _iter6 != (*(this->messages)).end(); ++_iter6) 18 19 { 20 21 xfer += (*_iter6).write(oprot);//依次寫入鏈表參數類型里面的每一個 22 23 } 24 25 xfer += oprot->writeListEnd();//結束鏈表類型寫入 26 27 } 28 29 xfer += oprot->writeFieldEnd();//寫入字段結束 30 31 xfer += oprot->writeFieldStop();//停止寫入字段 32 33 xfer += oprot->writeStructEnd();//寫入參數結束 34 35 return xfer; 36 37 }
具體參數的寫入函數根據參數的類型具體處理並寫入到服務器端。這樣整個函數調用就做完了,剩下的就是處理寫入后的一些善后處理,看具體代碼有注釋。
當函數調用的消息發送出去以后就開始准備接收函數遠程調用的結果(異步調用除外),這里接收Log函數調用返回結果的函數是recv_log,代碼如下:
1 ResultCode scribeClient::recv_Log() 2 3 { 4 5 int32_t rseqid = 0; 6 7 std::string fname; 8 9 ::apache::thrift::protocol::TMessageType mtype;//接收返回消息的類型 10 11 iprot_->readMessageBegin(fname, mtype, rseqid);//讀取返回結果的消息 12 13 if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {//處理返回消息是異常的情況 14 15 ::apache::thrift::TApplicationException x; 16 17 x.read(iprot_);//讀取異常信息 18 19 iprot_->readMessageEnd(); 20 21 iprot_->getTransport()->readEnd(); 22 23 throw x;//拋出異常信息 24 25 } 26 27 if (mtype != ::apache::thrift::protocol::T_REPLY) {//處理不是正常回復的結果 28 29 iprot_->skip(::apache::thrift::protocol::T_STRUCT); 30 31 iprot_->readMessageEnd(); 32 33 iprot_->getTransport()->readEnd(); 34 35 } 36 37 if (fname.compare("Log") != 0) {//比較是否是Log函數調用返回的結果 38 39 iprot_->skip(::apache::thrift::protocol::T_STRUCT); 40 41 iprot_->readMessageEnd(); 42 43 iprot_->getTransport()->readEnd(); 44 45 } 46 47 ResultCode _return; 48 49 scribe_Log_presult result; 50 51 result.success = &_return; 52 53 result.read(iprot_);//讀取結果信息 54 55 iprot_->readMessageEnd(); 56 57 iprot_->getTransport()->readEnd(); 58 59 if (result.__isset.success) {//成功就正常返回,否則拋出異常信息 60 61 return _return; 62 63 } 64 65 throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, 66 67 "Log failed: unknown result");//拋出不知道結果的異常信息,調用失敗了 68 69 }
接收RPC調用結果的函數都是根據返回消息的類型做相應處理,不成功就拋出相應的異常信息。首先這里調用二進制協議的readMessageBegin函數讀取由二進制寫入的消息(這個當然是服務器端寫入的),這個函數代碼實現如下:
1 template <class Transport_> 2 3 uint32_t TBinaryProtocolT<Transport_>::readMessageBegin(std::string& name, 4 5 TMessageType& messageType, int32_t& seqid) { 6 7 uint32_t result = 0; 8 9 int32_t sz; 10 11 result += readI32(sz);//讀取消息的頭部(可能是協議版本號和消息類型的組合,也可能直接是消息) 12 13 if (sz < 0) {//如果小於0(就是二進制為第一位以1開頭,說明是帶有協議版本號的 14 15 // Check for correct version number 16 17 int32_t version = sz & VERSION_MASK;//取得消息的版本號 18 19 if (version != VERSION_1) {//如果不匹配二進制協議的版本號就拋出一個壞的協議異常 20 21 throw TProtocolException(TProtocolException::BAD_VERSION, "Bad version identifier"); 22 23 } 24 25 messageType = (TMessageType)(sz & 0x000000ff);//取得消息類型 26 27 result += readString(name);//取得消息名稱(也就是函數名稱) 28 29 result += readI32(seqid);//取得函數調用ID號 30 31 } else { 32 33 if (this->strict_read_) {//要求讀協議本版號,但是這種情況是不存在協議版本號的所以拋出異常 34 35 throw TProtocolException(TProtocolException::BAD_VERSION, 36 37 "No version identifier... old protocol client in strict mode?"); 38 39 } else { 40 41 int8_t type; 42 43 result += readStringBody(name, sz);//讀取消息名稱(也就是函數名稱) 44 45 result += readByte(type);//讀取消息類型 46 47 messageType = (TMessageType)type; 48 49 result += readI32(seqid);//讀取函數調用ID號 50 51 } 52 53 } 54 55 return result;//f返回讀取數據的長度 56 57 }
上面的函數代碼向我們展示了怎樣處理基於二進制協議消息的讀取和處理的過程,當然這個過程必須是建立在相應的寫入消息的過程,只有按照相應的格式才能正確的處理。還有一點需要強調一下,就是每一種數據類型的寫入和讀取函數也是相對應的,在這里我沒有具體分析每一個數據類型的寫入函數了,其實也沒有必要,也是這些代碼都是很容易的,關鍵是讀和寫必須配合起來。
到此一個完整的基於二進制協議的RPC調用分析完畢,下面對這個二進制協議進行一下簡單的總結。
(1)如果需要傳輸協議版本號,那么0-4字節就是協議版本號和消息類型;否則0-4字節就直接是消息名稱(其實就是函數的名稱)的長度,假設長度為len。
(2)如果0-4字節是協議版本號和消息類型,那么5-8字節就是消息名稱的長度,同樣假設長度為len,然后再跟着len字節的消息名稱;否則就是len字節的消息名稱。
(3)接下來如果沒有帶協議版本號的還有1字節的消息類型;
(4)然后都是4字節的請求的序列號;
(5)接着繼續寫入參數類型的結構體(但是二進制協議並沒有真正寫入,所以沒有占用字節);
(6)如果真正的有參數的話就繼續一次為每一個參數寫入1字節的參數類型(在前面已經給出了參數類型的定義,就是一個枚舉)、2字節的參數序號和具體參數需要的長度;
(7)具體參數長度的需求如下:
a) 對於以下具有固定長度的簡單數據類型的參數:
簡單數據類型 |
長度(字節) |
備注 |
T_STOP = 0 |
1 |
|
T_VOID = 1, |
1 |
|
T_BOOL = 2 |
1 |
|
T_BYTE = 3 |
1 |
|
T_I08 = 3 |
1 |
|
T_I16 = 6 |
2 |
|
T_I32 = 8 |
4 |
|
T_U64 = 9 |
8 |
二進制協議沒有實現 |
T_I64 = 10 |
8 |
|
T_DOUBLE = 4 |
8 |
b) 復合數據類型:
復合數據類型 |
長度說明 |
T_STRING = 11 |
前面4個字節:字符串的長度stringLen; 接下來的stringLen個字節:字符串的內容 |
T_STRUCT = 12 |
假設這個結構體包含m個字段:(為了便於說明問題,下面所說的字節偏移是相對於struct內部結構而言的); 0-1字節:字段的數據類型; 1-3字節:字段序號,取決於你定義的idl文件中參數所定義的序號; 接下來的k個字節:看簡單數據類型; 以此類推,直至m個字段。其實,struct的字段和函數參數具有一樣的編碼方式 |
T_MAP = 13 |
(為了便於說明問題,下面所說的字節偏移是相對於map內部結構而言的): 0-1字節:key的數據類型。注意,可能是復合數據類型; 1-2字節:value的數據類型。 假設key的數據類型的長度為k個字節,value的數據類型的長度為v個字節,那么接下來每k+v個字節作為一個key-value值,至於key的值的分析和value的值的分析,看簡單數據類型 |
T_SET = 14 |
(為了便於說明問題,下面所說的字節偏移是相對於set內部結構而言的): 0-1字節:set里面的元素的數據類型; 1-5字節:元素個數; 假設元素的數據類型的長度為k個字節,那么接下來每k個字節作為一個元素值,至於元素值的分析,看簡單數據類型; 注意,這里和函數參數/struct的區別在於,這里不存在元素的序號值 |
T_LIST = 15 |
和set類似,這里就不重復累贅了。 |