Thrift之TProcess類體系原理及源碼詳細解析


 

我的新浪微博:http://weibo.com/freshairbrucewoo

歡迎大家相互交流,共同提高技術。

  之前對Thrift自動生成代碼的實現細節做了詳細的分析,下面進行處理層的實現做詳細分析了!會利用到自動代碼生成的知識。

 

  這部分是協議層和用戶提供的服務實現之間的紐帶,定義了調用服務實現的接口框架,真正實現某種服務接口是通過上一章介紹的代碼生成工具生成的代碼。本章將介紹這個框架的基本原理,然后通過生成的一個實例來具體介紹怎樣完成一次完整的服務,這個可能涉及到下面章節的一些知識,對於這些知識不詳細分析其功能,只是介紹它在其中起什么作用。選擇的實例是Facebook內部用這個框架實現的一個分布式日志收集系統scribe。下面是這部分相關類的類關系圖:

 

  從上圖中可以看出TProcessor是這個部分的頂層基類,其他之類基本上都是通過Thrift代碼生成工具生成的,只有少數是為了擴展一些功能而直接寫代碼實現,如PeekProcessor類就增加了一些對原始數據處理的功能。scribeProcessor和FacebookServiceProcessor類就是用代碼生成器根據IDL文件生成的,也是我們后面需要分析的一個實例。

第一節 服務接口調用框架分析

  這個基本的框架包括三個類,一個就是抽象類TProcessor,負責調用用戶定義的服務接口,從一個接口讀入數據,寫入一個輸出接口。一個最主要的函數定義如下:

1 virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
2 
3                        boost::shared_ptr<protocol::TProtocol> out, void* connectionContext) = 0;

 

這個函數是一個純虛函數,所以繼承這個類的子類都必須實現這個函數,這個函數就是最主要的數據傳輸功能。

第二個類就是負責處理TProcessor類產生的事件的類TProcessorEventHandler,主要定義了一些當某事件發生時的處理函數,例如當讀取參數之前可以做一些處理功能。下面是這個類定義的各個成員函數,每一個函數都處理一種事件發送時的情況:

函數名稱

函數功能

getContext

調用其他回調函數之前調用,期望返回一些有序的上下文對象以便傳遞給其他回調函數使用

freeContext

期望釋放一個上下文有關的資源

preRead

在讀參數以前調用

postRead

在讀參數和處理函數之間調用

preWrite

在處理和寫響應之間調用

postWrite

在寫響應之后調用

asyncComplete

當一個異步函數成功完成調用時調用

handlerError

如果處理函數拋出沒有定義的異常就會調用此函數

最后一個類就是TProcessorContextFreer類,這個類是一個幫助類,幫助生成的代碼來釋放上下文資源。

第二節 基於框架生成的服務實例分析

本節將對scribe服務器采用的服務實現進行詳細分析。

1 接口定義語言文件(IDL)

(1)Facebook內部共用服務協議

主要有兩個文件,一個是在Thrift中定義,是用於Facebook內部的一些接口服務定義,這個不僅僅用於scribe服務器,可能還用於Facebook內部其他系統,這個文件內容如下:

 1 namespace java com.facebook.fb303
 2 
 3 namespace cpp facebook.fb303
 4 
 5 namespace perl Facebook.FB303
 6 
 7 enum fb_status {
 8 
 9   DEAD = 0,
10 
11   STARTING = 1,
12 
13   ALIVE = 2,
14 
15   STOPPING = 3,
16 
17   STOPPED = 4,
18 
19   WARNING = 5,
20 
21 }
22 
23 service FacebookService {
24 
25   string getName(),
26 
27   string getVersion(),
28 
29   fb_status getStatus(),
30 
31   string getStatusDetails(),
32 
33   map<string, i64> getCounters(),
34 
35   i64 getCounter(1: string key),
36 
37   void setOption(1: string key, 2: string value),
38 
39   string getOption(1: string key),
40 
41   map<string, string> getOptions(),
42 
43   string getCpuProfile(1: i32 profileDurationInSec),
44 
45   i64 aliveSince(),
46 
47   oneway void reinitialize(),
48 
49   oneway void shutdown(),
50 
51 }

 

上面這個IDL文件定義了一個枚舉類型用於表示服務的狀態,還定義了一個名位FacebookService的服務,里面定義了各種操作,如獲取服務狀態的操作、得到計數的操作等等。

下面我們來看看根據這個IDL文件生成的C++代碼是什么樣的一個架構。首先生成了一個基於上面服務定義的抽象類如下:

class FacebookServiceIf {

 public:

  virtual ~FacebookServiceIf() {}

  virtual void getName(std::string& _return) = 0;

  virtual void getVersion(std::string& _return) = 0;

  virtual fb_status getStatus() = 0;

  virtual void getStatusDetails(std::string& _return) = 0;

  virtual void getCounters(std::map<std::string, int64_t> & _return) = 0;

  virtual int64_t getCounter(const std::string& key) = 0;

  virtual void setOption(const std::string& key, const std::string& value) = 0;

  virtual void getOption(std::string& _return, const std::string& key) = 0;

  virtual void getOptions(std::map<std::string, std::string> & _return) = 0;

  virtual void getCpuProfile(std::string& _return, const int32_t profileDurationInSec) = 0;

  virtual int64_t aliveSince() = 0;

  virtual void reinitialize() = 0;

  virtual void shutdown() = 0;

};

 

注意觀察,除了這個類多了一個虛析構函數,其他函數就是IDL中定義的。接着定義了類FacebookServiceNull,這個是上面那個抽象類的空實現(就是所有方法都沒有做具體的事情),這樣做的好處就是我們需要重寫一些函數的時候只需要關注我們需要寫的函數,而不是重寫所有函數。接着又定義了封裝每一個函數參數的相應類,就是一個函數的參數都用一個類來封裝定義,函數的返回值也是這樣處理。這樣做的目的是統一遠程調用的實現接口,因為傳遞參數都只需要這個封裝類的對象就可以了。所以你會看到每一個服務里面定義的函數都有下面一組類的定義:

 11class FacebookService_getName_args {…}
 2 
 32class FacebookService_getName_pargs {…}
 4 
 53)typedef struct _FacebookService_getName_result__isset {…} _FacebookService_getName_result__isset;
 6 
 74class FacebookService_getName_result{…}
 8 
 95)typedef struct _FacebookService_getName_presult__isset {…} _FacebookService_getName_presult__isset;
10 
116class FacebookService_getName_presult{…}

 

上面這六個類定義就是為服務中的getName函數服務的,相應的每一個函數都會有這種類似的定義和實現。接下來就會定義三個具體實現IDL定義的功能的類,一個客戶端的類,它繼承定義的服務抽象類,每一個具體的函數實現都是同樣的方式和思路,同樣我結合getName函數的實現來看看這個過程,其他函數都是這樣實現的,代碼如下:

1 send_getName();
2 
3 recv_getName(_return);

 

由上面代碼可以看出首先調用函數發送函數名稱及相關信息到遠程,然后接受函數調用的返回值,發送函數send_getName()的代碼如下:

 1 int32_t cseqid = 0;
 2 
 3 oprot_->writeMessageBegin("getName", ::apache::thrift::protocol::T_CALL, cseqid);//寫一個函數調用消息RPC
 4 
 5 FacebookService_getName_pargs args;
 6 
 7 args.write(oprot_);//寫入參數
 8 
 9 oprot_->writeMessageEnd();
10 
11 oprot_->getTransport()->writeEnd();
12 
13 oprot_->getTransport()->flush();//保證這次寫入過程立即生效

 

上面代碼就完成了函數名稱以及參數的傳輸,調用的是TProtocol相關的類的函數實現,具體的實現內容和方式會在TProtocol部分介紹。下面接着看一下接收返回值的函數recv_getName的代碼:

 1   int32_t rseqid = 0;//接收的消息序列號
 2 
 3   std::string fname;//函數名稱
 4 
 5   ::apache::thrift::protocol::TMessageType mtype;//消息的類型(調用(T_CALL)、異常(T_EXCEPTION)等)
 6 
 7   iprot_->readMessageBegin(fname, mtype, rseqid);//從返回消息讀取函數名稱、消息類型
 8 
 9   if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {//處理異常消息
10 
11     ::apache::thrift::TApplicationException x;
12 
13     x.read(iprot_);
14 
15     iprot_->readMessageEnd();
16 
17     iprot_->getTransport()->readEnd();
18 
19     throw x;
20 
21   }
22 
23   if (mtype != ::apache::thrift::protocol::T_REPLY) {//處理返回消息
24 
25     iprot_->skip(::apache::thrift::protocol::T_STRUCT);
26 
27     iprot_->readMessageEnd();
28 
29     iprot_->getTransport()->readEnd();
30 
31   }
32 
33   if (fname.compare("getName") != 0) {//看是否是我們需要的函數名,不是就跳過消息讀取
34 
35     iprot_->skip(::apache::thrift::protocol::T_STRUCT);
36 
37     iprot_->readMessageEnd();
38 
39     iprot_->getTransport()->readEnd();
40 
41   }
42 
43   FacebookService_getName_presult result;
44 
45   result.success = &_return;
46 
47   result.read(iprot_);//讀取函數返回值
48 
49   iprot_->readMessageEnd();
50 
51   iprot_->getTransport()->readEnd();
52 
53   if (result.__isset.success) {//成功就返回結果(已經在_return里面),否則拋出異常
54 
55     return;
56 
57   }
58 
59   throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "getName failed: unknown result");

 

上面代碼就是處理遠程調用的返回結果,代碼里面有注釋。一個服務函數的實現大概流程已經展現在我們面前了,處理的過程也已經清晰。這個只是用於客戶端的處理流程,必須通過有效的機制來通知服務器端調用相應的函數(這就是RPC)在服務器端完成相應功能並將結果返回。這種機制就是通過我們這部分介紹的TProcessor類實現,這就是上面提到三個類中的第二個類,在這個實例中是FacebookServiceProcessor類,它從TProcessor類繼承,重點實現兩個函數process和process_fn,其中process會調用process_fn函數來處理客戶端具體調用的那個服務函數,process函數定義如下:

 1 bool FacebookServiceProcessor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, 
 2 
 3 boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot, void* callContext) {
 4 
 5 ::apache::thrift::protocol::TProtocol* iprot = piprot.get();
 6 
 7 ::apache::thrift::protocol::TProtocol* oprot = poprot.get();
 8 
 9 std::string fname;
10 
11 ::apache::thrift::protocol::TMessageType mtype;
12 
13 int32_t seqid;
14 
15   iprot->readMessageBegin(fname, mtype, seqid);//讀取得到函數名稱、消息類型和函數序列號
16 
17 //處理不是函數調用消息的情況
18 
19 if (mtype != ::apache::thrift::protocol::T_CALL && mtype != ::apache::thrift::protocol::T_ONEWAY) {
20 
21     iprot->skip(::apache::thrift::protocol::T_STRUCT);
22 
23     iprot->readMessageEnd();
24 
25     iprot->getTransport()->readEnd();
26 
27     ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);
28 
29 //寫入(返回)一個異常信息給調用客戶端,客戶端會根據返回結果處理異常
30 
31     oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
32 
33     x.write(oprot);
34 
35     oprot->writeMessageEnd();
36 
37     oprot->getTransport()->writeEnd();
38 
39     oprot->getTransport()->flush();
40 
41     return true;
42 
43 }
44 
45 return process_fn(iprot, oprot, fname, seqid, callContext);//調用實際的函數處理
46 
47 }

 

上面代碼有比較詳細的注釋,還需要說明一點的就是如果傳遞的不是函數調用的消息類型就會返回給客戶端一個異常的消息,客戶端的接收返回值的函數就會根據收到的異常消息做相應處理,上面getName函數的接收返回值函數就是拋出一個服務器端給的異常信息。下面繼續看最終服務器端調用相應映射函數的處理,這個是通過process_fn函數實現:具體定義如下:

 1 bool FacebookServiceProcessor::process_fn(::apache::thrift::protocol::TProtocol* iprot,
 2 
 3 ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid, void* callContext) {
 4 
 5 //定義個map的迭代器,用於接收在函數映射查找到的映射函數
 6 
 7 std::map<std::string, void (FacebookServiceProcessor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, 
 8 
 9 ::apache::thrift::protocol::TProtocol*, void*)>::iterator pfn;
10 
11   pfn = processMap_.find(fname);//根據函數名稱查找對應的映射處理函數
12 
13   if (pfn == processMap_.end()) {//如果沒有找到,做下面的處理
14 
15     iprot->skip(::apache::thrift::protocol::T_STRUCT);
16 
17     iprot->readMessageEnd();
18 
19     iprot->getTransport()->readEnd();
20 
21 //拋出一個不知道的方法的異常
22 
23     ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, 
24 
25 "Invalid method name: '"+fname+"'");
26 
27 //寫入到調用客戶端
28 
29     oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
30 
31     x.write(oprot);
32 
33     oprot->writeMessageEnd();
34 
35     oprot->getTransport()->writeEnd();
36 
37     oprot->getTransport()->flush();
38 
39     return true;
40 
41   }
42 
43   (this->*(pfn->second))(seqid, iprot, oprot, callContext);//調用具體的函數(RPC過程完成)
44 
45   return true;
46 
47 }

 

上面這個函數最終完成了RPC的過程,那個函數與映射函數的對應關系的map結構是在構造函數中初始化的,所以可以找到,例如我們舉例的getName函數是下面這樣初始化的:

1 processMap_["getName"] = &FacebookServiceProcessor::process_getName;

 

和getName函數一樣,對於IDL定義的每一個函數在FacebookServiceProcessor類中都有一個映射的處理函數,為了展示一個完整的處理過程我們在看看getName函數的映射處理函數process_getName,它的定義如下:

 1 void FacebookServiceProcessor::process_getName(int32_t seqid,
 2 
 3 ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
 4 
 5 {
 6 
 7 void* ctx = NULL;
 8 
 9 if (eventHandler_.get() != NULL) {
10 
11 //得到上下文調用環境
12 
13     ctx = eventHandler_->getContext("FacebookService.getName", callContext);
14 
15   }
16 
17 //定義並初始化一個用於釋放資源的幫助類對象
18 
19   ::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, "FacebookService.getName");
20 
21   if (eventHandler_.get() != NULL) {
22 
23     eventHandler_->preRead(ctx, "FacebookService.getName");//讀之前事件處理
24 
25   }
26 
27   FacebookService_getName_args args;
28 
29   args.read(iprot);
30 
31   iprot->readMessageEnd();
32 
33   uint32_t bytes = iprot->getTransport()->readEnd();
34 
35   if (eventHandler_.get() != NULL) {
36 
37     eventHandler_->postRead(ctx, "FacebookService.getName", bytes);//讀取和讀完之間的事件處理
38 
39   }
40 
41   FacebookService_getName_result result;
42 
43   try {
44 
45     iface_->getName(result.success);//這是重點:調用服務器端的getName函數
46 
47     result.__isset.success = true;
48 
49   } catch (const std::exception& e) {
50 
51     if (eventHandler_.get() != NULL) {
52 
53       eventHandler_->handlerError(ctx, "FacebookService.getName");//錯誤處理
54 
55     }
56 
57 //寫入具體的異常到客戶端
58 
59     ::apache::thrift::TApplicationException x(e.what());
60 
61     oprot->writeMessageBegin("getName", ::apache::thrift::protocol::T_EXCEPTION, seqid);
62 
63     x.write(oprot);
64 
65     oprot->writeMessageEnd();
66 
67     oprot->getTransport()->writeEnd();
68 
69     oprot->getTransport()->flush();
70 
71     return;
72 
73   }
74 
75   if (eventHandler_.get() != NULL) {
76 
77     eventHandler_->preWrite(ctx, "FacebookService.getName");//寫入之前事件處理
78 
79   }
80 
81 //寫入調用返回值(T_REPLY)消息到調用客戶端
82 
83   oprot->writeMessageBegin("getName", ::apache::thrift::protocol::T_REPLY, seqid);
84 
85   result.write(oprot);
86 
87   oprot->writeMessageEnd();
88 
89   bytes = oprot->getTransport()->writeEnd();
90 
91   oprot->getTransport()->flush();
92 
93   if (eventHandler_.get() != NULL) {
94 
95     eventHandler_->postWrite(ctx, "FacebookService.getName", bytes);//寫相應之后處理
96 
97   }
98 
99 }

 

上面這個函數就是真正完成服務器端調用客戶端傳遞過來的函數的處理過程,有事件處理類處理相應的事件(不過,目前都還是空實現,以后可以繼承這個處理類重寫需要處理事件的函數,例如:在調用服務器真正的處理函數之前可以先處理一下參數,驗證參數是否正確之類的),也有幫助釋放資源的幫助類。

(2)scribe服務IDL文件

 1 include "/home/brucewoo/thrift-0.6.1/contrib/fb303/if/fb303.thrift"
 2 
 3 namespace cpp scribe.thrift
 4 
 5 namespace java scribe.thrift
 6 
 7 namespace perl Scribe.Thrift
 8 
 9 enum ResultCode
10 
11 {
12 
13   OK,
14 
15   TRY_LATER
16 
17 }
18 
19 struct LogEntry
20 
21 {
22 
23   1:  string category,
24 
25   2:  string message
26 
27 }
28 
29 service scribe extends fb303.FacebookService
30 
31 {
32 
33   ResultCode Log(1: list<LogEntry> messages);
34 
35 }

 

這個IDL文件只定義了一個服務接口,就是用完成日志文件傳輸的幾個Log,不過這個服務繼承FacebookService服務,所以上面介紹FacebookService服務的功能它也具備,傳輸日志的結構就是分類和具體的消息。這個服務的具體實現和上面介紹的FacebookService流程都是一樣的,不在詳細介紹,只要知道一點就是:客戶端在調用Log寫日志到scribe服務器的時候就會傳遞到服務器端來調用同名的函數處理日志。

第三節 總結

TProcessor類體系主要定義一個服務生產的框架,通過這個框架生產的各種語言的代碼可以實現RPC調用,具體的傳輸細節、協議和方式是通過后面講解的內容實現的。

第二節對一個具體服務的實現內容做詳細分析,不過都是基於文字描述和代碼分析,下面根據scribe服務提供的Log函數怎樣完成一次具體的處理過程用下面的圖形展示:

 

這個圖形並沒有展示內部數據通信的細節,只是簡單的說明了一個客戶端的調用是怎樣完成的,服務器處理還涉及到很多相關細節,將在后面章節中詳細分析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM