Thrift之TProtocol類體系原理及源碼詳細解析之緊湊協議類TCompactProtocolT(TCompactProtocol)


我的新浪微博:http://weibo.com/freshairbrucewoo

歡迎大家相互交流,共同提高技術。

 

這個協議類采用了zigzag 編碼,這種編碼是基於Variable-length quantity編碼提出來的,因為Variable-length quantity編碼對於負數的編碼都需要很長的字節數,而zigzag 編碼對於絕對值小的數字,無論正負都可以采用較少的字節來表示,充分利用了 Varint技術。所以這個協議類采用zigzag 編碼可以節省傳輸空間,使數據的傳輸效率更高。至於zigzag具體的編碼實現方式可以網上查查,其實就是把從低位到最后一個還存在1(二進制)的最高位表示出來就可以了。這個協議類對外提供的方法和上面介紹的二進制協議相同,這樣可以很方便使用者從一種協議改變到另一種協議。

下面我同樣結合scribe提供的Log方法來分析這個協議類的功能,不過不會像上面二進制協議在把整個過程分析了,我只會分析與協議相關的部分了,分析一些比較難懂的一些函數功能,分析的思路還是按照函數調用過程來分析。

首先還是分析writeMessageBegin函數,下面是這個函數的實現代碼:

 1 template <class Transport_> uint32_t TCompactProtocolT<Transport_>::writeMessageBegin(
 2 
 3     const std::string& name, const TMessageType messageType, const int32_t seqid) {
 4 
 5   uint32_t wsize = 0;
 6 
 7   wsize += writeByte(PROTOCOL_ID);//寫入這個協議的產品ID號:為0x82
 8 
 9   wsize += writeByte((VERSION_N & VERSION_MASK) | (((int32_t)messageType << TYPE_SHIFT_AMOUNT) & TYPE_MASK));//寫入此協議的版本號和消息類型:前3位是消息類型,后面5位是協議版本號
10 
11   wsize += writeVarint32(seqid);//寫入請求序列號
12 
13   wsize += writeString(name);//寫入消息名稱(也就是函數調用名稱)
14 
15   return wsize;//返回寫入的大小,多少字節
16 
17 }

 

因為這些協議類都是模板類,所以每一個函數也就是模板函數了。函數具體的功能代碼里有詳細注釋,其中的writeByte函數就是寫入一個字節到服務器。這里與二進制協議不同的是這里寫入請求序列號(也就是對於所有的整型數)都調用的writeVarint32函數,這個函數就是采用zigzag編碼寫入整型數到服務器,代碼如下:

 1 template <class Transport_> uint32_t TCompactProtocolT<Transport_>::writeVarint32(uint32_t n) {
 2 
 3   uint8_t buf[5];//對於一個整數,zigzag編碼最大采用5個字節保存
 4 
 5   uint32_t wsize = 0;
 6 
 7   while (true) {
 8 
 9     if ((n & ~0x7F) == 0) {//判斷除了最低7位是否還有其他高位為1(二進制)
10 
11       buf[wsize++] = (int8_t)n;//沒有了代表着就是最后一個字節
12 
13       break;//退出循環
14 
15     } else {
16 
17       buf[wsize++] = (int8_t)((n & 0x7F) | 0x80);//取最低7位加上第8位(為1代表后續還有字節屬於這個整數,為0代表這是這個整數的最后一個字節了。
18 
19       n >>= 7;//移走已經編碼的位數
20 
21     }
22 
23   }
24 
25   trans_->write(buf, wsize);//寫入編碼的字節數
26 
27   return wsize;//返回寫入的字節數
28 
29 }

 

這個函數的功能就是對整數進行Variable-length quantity編碼后寫入,如果為負數需要處理。如果不處理那么每一個負數都需要5個字節來編碼,因為最高位表示符號位,而負數的符號位用1表示(也就是說負數的最高位永遠為1)。處理的方式也很簡單(就是zigzag編碼),就是把最高位(符號位)移動到最低位,最低位到次高位一次向高位移動一位,代碼如下(就一句就實現了):

1 template <class Transport_>
2 
3 uint32_t TCompactProtocolT<Transport_>::i32ToZigzag(const int32_t n) {
4 
5   return (n << 1) ^ (n >> 31);
6 
7 }

 

上面寫入整數和處理負數都是針對的32位的,當然也有64位的相應函數,實現方式相同。我們在回到writeMessageBegin函數,里面還有一個writeString函數用來寫入一個字符串的,與二進制不同的是寫入字符串長度也是采用了可變長度編碼的方式寫入,然后寫入字符串的具體數據,它是調用另一個函數writeBinary寫入,writeBinary實現代碼如下:

 1 template <class Transport_>
 2 
 3 uint32_t TCompactProtocolT<Transport_>::writeBinary(const std::string& str) {
 4 
 5   uint32_t ssize = str.size();
 6 
 7   uint32_t wsize = writeVarint32(ssize) + ssize;//寫入字符串的長度並計算寫入的長度(包括字符串的長度)
 8 
 9   trans_->write((uint8_t*)str.data(), ssize);//寫入字符串的數據
10 
11   return wsize;
12 
13 }

 

寫消息函數分析完畢以后我們在來看看對應的讀消息函數readMessageBegin,看這個函數必須和寫入消息的函數對應起來看,不然就不能理解它讀取和處理的流程代碼,具體實現如下代碼:

 1 template <class Transport_> uint32_t TCompactProtocolT<Transport_>::readMessageBegin(
 2 
 3     std::string& name, TMessageType& messageType, int32_t& seqid) {
 4 
 5   uint32_t rsize = 0;
 6 
 7   int8_t protocolId;
 8 
 9   int8_t versionAndType;
10 
11   int8_t version;
12 
13   rsize += readByte(protocolId);//讀取協議產品ID號
14 
15   if (protocolId != PROTOCOL_ID) {//判斷是不是這個協議的產品ID號,不是就拋出異常
16 
17     throw TProtocolException(TProtocolException::BAD_VERSION, "Bad protocol identifier");
18 
19   }
20 
21   rsize += readByte(versionAndType);//讀取此協議的版本號和消息類型
22 
23   version = (int8_t)(versionAndType & VERSION_MASK);//取出協議版本號
24 
25   if (version != VERSION_N) {//判斷是不是對應的協議版本號,不是拋出異常
26 
27     throw TProtocolException(TProtocolException::BAD_VERSION, "Bad protocol version");
28 
29   }
30 
31   messageType = (TMessageType)((versionAndType >> TYPE_SHIFT_AMOUNT) & 0x03);//取出消息類型
32 
33   rsize += readVarint32(seqid);//讀取請求序列號
34 
35   rsize += readString(name);//讀取消息名稱(函數名稱)
36 
37   return rsize;//返回讀取的長度(字節)
38 
39 }

 

通過對照寫入消息的函數就很容易理解,因為你寫入什么我就讀什么並且判斷是不是相同協議寫入的,具體分析可以看上面的代碼和詳細的注釋。而且還有一點就是具體的寫入數據類型的函數也是采用對應類型的讀函數,例如讀可變長整型寫入就是采用可變長讀函數readVarint32,寫字符串對應讀字符串函數readString,對照相應的寫入函數來看這些讀數據函數就非常好理解了,就不具體分析這些讀函數了。

下面在分析幾個復合數據類型的寫入函數,因為這些寫入函數存在一定技巧不容易(或者說不那么直觀吧)理解清楚。首先看看struct類型的數據寫入的過程,它分為寫入開始、中間處理和寫入結束。下面是開始寫入struct的代碼:

 1 template <class Transport_>
 2 
 3 uint32_t TCompactProtocolT<Transport_>::writeStructBegin(const char* name) {
 4 
 5   (void) name;
 6 
 7   lastField_.push(lastFieldId_);//把最后寫入的字段ID壓入堆棧
 8 
 9   lastFieldId_ = 0;//重新設置為0
10 
11   return 0;
12 
13 }

 

這開始寫入的函數沒有做什么具體的工作,只是把最后寫入的字段ID壓入堆棧,這樣做的目的是處理那種struct嵌套的數據結構類型。

Struct里面的是一個一個的字段,所以根據struct的字段個數分別調用字段寫入函數依次寫入,字段寫入函數定義如下:

 1 template <class Transport_> int32_t TCompactProtocolT<Transport_>::writeFieldBeginInternal(
 2 
 3     const char* name, const TType fieldType, const int16_t fieldId, int8_t typeOverride) {
 4 
 5   (void) name;//為了防止編譯器產生警告信息
 6 
 7   uint32_t wsize = 0;
 8 
 9   // 如果存在對於對應的類型就轉換為對應的
10 
11   int8_t typeToWrite = (typeOverride == -1 ? getCompactType(fieldType) : typeOverride);
12 
13   // 檢查字段ID是否使用了增量編碼
14 
15   if (fieldId > lastFieldId_ && fieldId - lastFieldId_ <= 15) {//如果使用了增量編碼並增量且小於等於15
16 
17     wsize += writeByte((fieldId - lastFieldId_) << 4 | typeToWrite);//字段ID和數據類型一起寫入
18 
19   } else {//否則單獨寫入
20 
21     wsize += writeByte(typeToWrite);//寫入數據類型
22 
23     wsize += writeI16(fieldId);//寫入字段ID
24 
25   }
26 
27   lastFieldId_ = fieldId;//保存寫入字段ID為最后一個寫入的ID
28 
29   return wsize;//返回寫入的長度
30 
31 }

 

當結構體里面的每一個字段都寫入以后還需要調用writeStructEnd函數來處理結束一個struct的寫入,主要處理是字段ID的相關內容,實現代碼如下:

1 template <class Transport_> uint32_t TCompactProtocolT<Transport_>::writeStructEnd() {
2 
3   lastFieldId_ = lastField_.top();//取得最后一次壓入堆棧的字段ID號
4 
5   lastField_.pop();//彈出以取得的字段ID
6 
7   return 0;
8 
9 }

 

同樣的結構體也有對應的讀取函數,具體實現就不在具體分析了!下面繼續分析一些特殊的處理代碼,首先看看負數在進行zigzag編碼前怎樣處理,對於32位和64位都是一句代碼就搞定,如下代碼:

1 return (n >> 1) ^ -(n & 1);

 

這句代碼的作用就是把最高位的符號位移動到最低位,然后最低位到次高位依次向高位移動一位,這樣就避免了所有負數都需要最長的字節來編碼。在看看讀可變長編碼寫入整型數的函數,32位和64位都是相同的實現,因為32位也是調用64位的函數實現的,實現代碼如下:

 1 template <class Transport_> uint32_t TCompactProtocolT<Transport_>::readVarint64(int64_t& i64) {
 2 
 3   uint32_t rsize = 0;
 4 
 5   uint64_t val = 0;
 6 
 7   int shift = 0;
 8 
 9   uint8_t buf[10];  // 64 位采用zigzag編碼最長可能是10字節
10 
11   uint32_t buf_size = sizeof(buf);
12 
13   const uint8_t* borrowed = trans_->borrow(buf, &buf_size);//並不是所有transport都支持 
14 
15   if (borrowed != NULL) {// 快路徑,要讀的數據已經在緩存中
16 
17     while (true) {
18 
19       uint8_t byte = borrowed[rsize];
20 
21       rsize++;
22 
23       val |= (uint64_t)(byte & 0x7f) << shift;//取得對應編碼數據的7位
24 
25       shift += 7;//后7位
26 
27       if (!(byte & 0x80)) {//是否還有屬於這個數的編碼字節,字節的最高位表示:0表示沒有了
28 
29         i64 = val;//讀取解碼后的真正有效值
30 
31         trans_->consume(rsize);//消耗了多少字節,即表示這個編碼用了多少字節
32 
33         return rsize;
34 
35       }
36 
37       // 檢查編碼數據是否超過了最長限制,是就拋出一個無效的異常
38 
39       if (UNLIKELY(rsize == sizeof(buf))) {
40 
41         throw TProtocolException(TProtocolException::INVALID_DATA, "Variable-length int over 10 bytes.");
42 
43       }
44 
45     }
46 
47   } 
48 
49   else {// 慢路徑,要讀的數據還沒有存在緩存中
50 
51     while (true) {
52 
53       uint8_t byte;
54 
55       rsize += trans_->readAll(&byte, 1);//讀取一個字節
56 
57       val |= (uint64_t)(byte & 0x7f) << shift;//取得7位的編碼數據
58 
59       shift += 7;
60 
61       if (!(byte & 0x80)) {
62 
63         i64 = val;
64 
65         return rsize;
66 
67       } 
68 
69       if (UNLIKELY(rsize >= sizeof(buf))) {//同樣檢查數據的有效性:最大字節長度不超過10個字節
70 
71         throw TProtocolException(TProtocolException::INVALID_DATA, "Variable-length int over 10 bytes.");
72 
73       }
74 
75     }
76 
77   }
78 
79 }

 

由於采用了可變長度編碼的原因,所以不知道一次性應該讀取多少個字節是一個完整的數據。為了讀取效率所以一次性直接讀取最長可能的字節數量,也就是10字節,因為64位最長的可變長編碼就是10字節長,然后根據實際消耗的字節數從讀取元跳過已經消耗的字節數。不過底層的傳輸層,有些協議可能不支持這種預讀取方式,所以就只有一個字節一個字節的讀取。

這個協議最大的特點就是采用了可變長度編碼,並且采用zigzag編碼處理負數總是需要采用最長的編碼字節的問題,所以相對於比較二進制而言效率提高了不少。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM