Protocol Buffer僅僅是提供了一套序列化和反序列化結構數據的機制,本身不具有RPC功能,但是可以基於其實現一套RPC框架。
Services
protocol buffer的Services類型是專門用來給RPC實現定義服務用的。
定義示例如下:
service SearchService { rpc Search (SearchRequest) returns (SearchResponse); }
Search是方法名,SearchRequest是參數,SearchResponse是返回類型,SearchRequest、SearchResponse分別都是預先定義的Message類型。這個Service經過編譯后會生成一個SearchService類和其對應的stub實現SearchService_Stub。SearchService_Stub把調用都轉給RpcChannel處理,RpcChannel是一個接口類,RPC系統中一般自己重載RpcChannel,例如你可以在重載類中把調用請求序列化后通過網絡傳輸到服務端。然后客戶端就可以像下面的代碼一樣進行RPC調用了:
using google::protobuf; protobuf::RpcChannel* channel; protobuf::RpcController* controller; SearchService* service; SearchRequest request; SearchResponse response; void DoSearch() { // You provide classes MyRpcChannel and MyRpcController, which implement // the abstract interfaces protobuf::RpcChannel and protobuf::RpcController. channel = new MyRpcChannel("somehost.example.com:1234"); controller = new MyRpcController; // The protocol compiler generates the SearchService class based on the // definition given above. service = new SearchService::Stub(channel); // Set up the request. request.set_query("protocol buffers"); // Execute the RPC. service->Search(controller, request, response, protobuf::NewCallback(&Done)); } void Done() { delete service; delete channel; delete controller; }
服務端這邊要實現Service接口,就是負責具體RPC函數的實現。並且在一網絡接口上監聽請求,處理請求,反序列化收到的網絡數據后轉調到這個函數的實現,之后把返回值序列化發回客戶端作為調用結果。像下面的代碼:
using google::protobuf; class ExampleSearchService : public SearchService { public: void Search(protobuf::RpcController* controller, const SearchRequest* request, SearchResponse* response, protobuf::Closure* done) { if (request->query() == "google") { response->add_result()->set_url("http://www.google.com"); } else if (request->query() == "protocol buffers") { response->add_result()->set_url("http://protobuf.googlecode.com"); } done->Run(); } }; int main() { // You provide class MyRpcServer. It does not have to implement any // particular interface; this is just an example. MyRpcServer server; protobuf::Service* service = new ExampleSearchService; server.ExportOnPort(1234, service); server.Run(); delete service; return 0; }
一個RPC實現
代碼在這(https://github.com/persistentsnail/easy_pb_rpc)
協議
package RPC; option cc_generic_services = true; message RpcRequestData { required uint32 service_id = 1; // 對應Service required uint32 method_id = 2; // 對應Service中的函數 required uint32 call_id = 3; // 對應本次調用(可能同一函數短時間有多次請求調用) required bytes content = 4; // 對應已經序列化了的函數參數Message } message RpcResponseData { required uint32 call_id = 1; // 對應請求的call_id required bytes content = 2; // 對應已經序列化了的返回值Message }
RPC請求是RpcRequestData Message,返回是RpcResponseData Message。service_id定義在一個配置文件services.cfg中,一個service_id對應一個服務名字,由服務端客戶端共享,在程序啟動時初始化一個一一映射的map。(這樣的實現不太好,后面會提到)。在網絡上傳遞的數據格式比較簡單:
| Length of Encoding Binary Data (unsigned int) | RpcRequestData or RpcResponseData |。
客戶端
支持RPC同步異步調用,例如:
void Foo(::google ::protobuf:: RpcController* controller , const ::FooRequest * request, :: FooResponse* response , :: google::protobuf ::Closure* done);
以回調參數Closure為准,若為NULL則是同步調用,反之異步回調之。內部實現上創建了一個底層工作線程,重載的RpcChannel實現把每次調用結構化一個一個msg放到msg queue中,工作線程從msg queue中取msg處理,具體來說就是把msg序列化通過網絡接口把請求傳出去。邏輯上一個RpcChannel實例代表一個網絡連接,所以可以重復使用一個RpcChannel對象。下面是同步調用一個EchoService的Foo方法的客戶端代碼示例:
RpcClient client ; RpcChannel channel(&client , "127.0.0.1:18669"); EchoService::Stub echo_clt(& channel); FooRequest request ; request.set_text ("test1"); request.set_times (1); FooResponse response ; RpcController controller ; echo_clt.Foo (&controller, & request, &response , NULL);
RpcClient管理所有連接會話,管理消息隊列,工作線程,只需要一個實例對象,RpcChannel 使用RpcClient完成連接和轉調。
服務端
首先注冊服務,就是創建Service的實現類對象,放到容器里面。然后在一個網絡端口上監聽連接,解析網絡數據包,根據不同請求在服務容器里面找合適的service調用相應method。實現的比較簡單,一個單線程服務器,同時只能處理一個請求。一個提供EchoService服務的server代碼看起來是這樣:
EchoServiceImpl *impl = new EchoServiceImpl(); RpcServer rpc_server ; rpc_server.RegisterService (impl); rpc_server.Start ();
服務端客戶端網絡數據處理使用的都是libevent。
一些protocol buffer的細節
1. 之前用到了service_id,要在雙端同時維護一份service id和name互相對應的配置文件,不利於部署和更新。protocol buffer可以通過DescriptorPool自省出自己有哪些服務和方法的,可以參見http://www.cnblogs.com/Solstice/archive/2011/04/03/2004458.html。所以在定義協議的時候可以直接用service name而不是id,而那份配置文件自然也不需要。客戶端用服務名字做一個RPC請求,服務端通過名字判斷是否自己存在這個服務。相應的method_id也可以考慮用method name。但是用id也是有好處,id是數值類型使用的Base 128 Varints變長編碼比字符串表示的name生成的數據包更小,另外數值做的哈希應該比DescriptorPool通過名字查找服務類更快。
2.應該充分使用protocol buffer錯誤處理方式,那就是使用RpcController 來做錯誤跟蹤。
3.協議字段類型多使用optional,因為required字段是必須有數據的,相反optional卻不一定需要,如果沒有就是一個默認值。optional類型通常用來升級協議,比如一個Message添加了一個新的optinal字段,以前使用老的Message格式的代碼序列出來的Message仍然能夠被使用新的Message格式的代碼正確解析,因為optional字段不存在,他會使用默認值;類似的,使用新的Message格式的代碼序列出來的Message也能夠被使用老的Message格式的代碼正確解析,因為他會忽略不認識的字段,而且他不丟掉這個字段,也就是這個Message還能被繼續正確的傳輸。