這個用例的邏輯很簡單, 服務器運行一個管理個人信息的服務, 提供如下的四個服務:
(1) 添加一個個人信息
注: 對應於Unary RPCs, 客戶端發送單一消息給服務器, 服務器返回單一消息
(2) 添加多個個人信息
注: 對應於Client streaming RPCs, 客戶端使用提供的stream發送多個消息給服務端, 等客戶端寫完了所有的消息, 就會等待服務器讀取這些消息, 然后返回響應消息. gRPC保證在一次RPC調用中, 消息是順序的.
(3) 獲取最多N個個人信息
注: 對應於Server streaming RPCs, 客戶端發送一條消息給服務端, 然后獲取一個stream來讀取一系列的返回消息. 客戶端會一直讀取消息, 知道沒有消息可讀為止, gRPC保證在一次RPC調用中,消息是順序的.
(4) 獲取指定名字的所有個人信息
注: 對應於Bidirectional streaming RPCs, 這種rcp, 客戶端和服務端通過一個read-write stream來發送一系列的消息. 這兩個消息流可以獨立操作, 就是說, 客戶端和服務端可以以任意它們所想的順序操作這兩個消息流. 例如, 服務器可以等待接收到所有的客戶端消息時,才開始向客戶端發送消息, 或者它可以讀一條消息, 然后給客戶端發送一條消息, 或者別的想要的方式. 在兩個消息流的其中一個中, 消息是順序的.
在給出代碼之前, 先說明一件事, 在grpc中, 請求參數和返回值類型都需要是message類型, 而不能是string, int32等類型.下面給出proto文件的定義:
// [START declaration] syntax = "proto3"; package tutorial; import "google/protobuf/timestamp.proto"; // [END declaration] // [START messages] message Person { string name = 1; int32 id = 2; // Unique ID number for this person. string email = 3; enum PhoneType { MOBILE = 0; HOME = 1; WORK = 2; } message PhoneNumber { string number = 1; PhoneType type = 2; } repeated PhoneNumber phones = 4; google.protobuf.Timestamp last_updated = 5; } // Our address book file is just one of these. message AddressBook { repeated Person people = 1; } // rpc調用的結果 message Result { bool success = 1; } // rpc請求的個數 message ReqNum { int32 num = 1; } message ReqName { string name = 1; } // [END messages] // Interface exported by the server. service Manage { // 添加一個人 rpc AddPerson(Person) returns (Result) {} // 添加很多人 rpc AddPersons(stream Person) returns (Result) {} // 獲取指定數目的個人列表 rpc GetPersonsLimit(ReqNum) returns (stream Person) {} // 獲取名字為輸入的個人列表 rpc GetPersons(stream ReqName) returns (stream Person) {} }
Person的定義和之前的protobuf中一致, 新加了一些用於grpc調用的結構體, 這些結構體很簡單, 就不講了. service Manage中定義的是這個服務提供的rpc調用接口.
(1) 添加一個個人信息 對應的是 AddPerson
(2) 添加多個個人信息 對應的是 AddPersons
(3) 獲取最多N個個人信息 對應的是 GetPersonsLimit
(4) 獲取指定名字的所有個人信息 對應的是 GetPersons
rpc定義很直觀, 應該可以參照寫出需要的rpc, 按照我了解的, 每個rpc有一個輸入參數和一個輸出參數, 這個需要注意.
person.proto文件生成文件包括person.pb.h與 person.pb.cc和person.grpc.pb.h與person.grpc.pb.cc, 其中的person.pb.h和person.pb.cc文件是proto文件中的結構體等生成的文件, 所以主要關注person.grpc.pb.h和person.grpc.pb.cc文件.
我們查看一下person.grpc.pb.*文件中的內容, 這個文件中只有一個類, 就是class Manage, 這個類名和proto文件中的Service是同一個名字. 下面我們查看Manage類中的內容:
(1) 函數service_full_name用來返回這個服務的名字, 命名方式是: package + “.” + service_name(包名+”.”+服務名).
(2) class StubInterface內部類, 這個類是定義客戶端操作的存根(stub)的接口類. 這個類中有如下函數:
1) AddPerson相關的函數, 對應於proto文件中的rpc AddPerson(Person) returns (Result) {}函數:
virtual Status AddPerson(ClientContext *context, const tutorial::Person& request, ::tutorial::Result* response) = 0; std::unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>> AsyncAddPerson(ClientContext* context, const tutorial::Person& request, CompletionQueue *cq) { return unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>>(AsyncAddPersonRaw(context, request, cq)); } unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>> PrepareAsyncAddPerson(ClientContext *context, const tutorial::Person& request, Completion* cq) { return unique_ptr<ClientAsyncResponseReaderInterface<tutorial::Result>>(PrepareAsyncAddPersonRaw(context, request, cq)); }
2) AddPersons相關函數, 對應於proto文件中的rpc AddPersons(stream Person) returns (Result) {}函數:
unique_ptr<ClientWriteInterface<tutorial::Person>> AddPersons(ClientConext* context, tutorial::Result *response) { return unique_ptr<ClientWriterInterface<tutorial::Person>(AddPersonsRaw(context, response); } ...
3) GetPersonsLimit相關函數, 對應於proto文件中的rpc GetPersonsLimit(ReqNum) returns (stream Person) {}函數:
unique_ptr<ClientReaderInterface<tutorial::Person>> GetPersonsLimit(ClientContext* context, const tutorial::ReqNum& request) { return unique_ptr<ClientReaderInterface<tutorial::Person>>(GetPersonsLimitRaw(context, request)); } ...
4) GetPersons相關函數, 對應於proto文件中的rpc GetPersons(stream ReqName) returns (stream Person) {}函數:
unique_ptr<ClientReaderWriterInterface<tutorial::ReqNum, tutorial::Person>> GetPersons(ClientContext *context) { return unique_ptr<ClientReaderWriterInterface<tutorial::ReqName, tutorial::Person>>(GetPersonsRaw(context)); } ...
5) class experimental_async_interface應該是實驗性的異步調用類, 以及獲取這個類對象的函數, experimental_async.
6) 實現用的虛函數: AsyncAddPersonRaw, PrepareAsyncAddPersonRaw, AddPersonsRaw, AsyncAddPersonsRaw, PrepareAsyncAddPersonsRaw, AsyncGetPersonsLimitRaw, PrepareAsyncGetPersonsLimitRaw, GetPersonsRaw, AsyncGetPersonsRaw, PrepareAsyncGetPersonsRaw.
(3) class Stub是Manage類的內部類. 這個類是定義客戶端操作的存根(stub)的具體實現類. 實現了上面的StubInterface類的各種接口.
(4) 創建客戶端存根的函數:
static std::unique_ptr<Stub> NewStub(const shared_ptr<ChannelInterface>& channel, const StubOptions& options = StubOptions()); unique_ptr<Manage::Stub> Manage::NewStub(const shared_ptr<ChannelInterface>& channel, const StubOptions& options) { (void)options; unique_ptr<Manage::Stub> stub(new Manage::Stub(channel)); return stub; }
(1) class Service內部類, 這個是生成的grpc服務端接口, 服務端主要需要實現的就是這個接口類的接口. 這個類的函數包括:
1) 構造函數與析構函數: Service和~Service虛函數, 下面是構造函數實現:
Manage::Service::Service() { AddMethod(new internal::RpcServiceMethod( Manage_method_names[0], internal::RpcMethod::NORMAL_RPC, new internal::RcpMethodHandler<Manage::Service, tutorial::Person, tutorial::Result> ( std::mem_fn(&Manage::Service::AddPerson), this))); AddMethod(new internal::RpcServiceMethod( Manage_method_names[1], internal::RpcMethod::CLIENT_STREAMING, new internal::ClientStreamingHandler<Manage::Service, tutorial::Person, tutorial::Result>( std::mem_fn(&Manage::Service::AddPersons), this))); AddMethod(new internal::RpcServiceMethod( Manage_method_names[2], internal::RpcMethod::SERVER_STREAMING, new internal::ServerStreamingHandler<Manage::Service, tutorial::ReqNum, tutorial::Person>( std::mem_fn(&Manage::Service::GetPersonsLimit), this))); AddMethod(new internal::RpcServiceMethod( Manage_method_names[3], internal::RpcMethod::BIDI_STREAMING, new internal::BidiStreamingHandler<Manage::Service, tutorial::ReqName, tutorial::Person>( std;:mem_fn(&Manage::Service::GetPersons), this))); }
2) 虛接口函數:
virtual grpc::Status AddPerson(grpc::ServerContext *context, const tutorial::Person* request, tutorial::Result* response); virtual grpc::Status AddPersons(grpc::ServerContext *context, grpc::ServerReader<tutorial::Person>* reader, tutorial::Result* response); virtual grpc::Status GetPersonsLimit(grpc::ServerContext *context, const tutorial::ReqNum* request, grpc::ServerWriter<tutorial::Person> *writer); virtual grpc::Status GetPersons(grpc::ServerContext* context, grpc::ServerReaderWriter<tutorial::Person, tutorial::ReqName>* stream);
(6) 內部模板類WithAsyncMethod_AddPerson, WithAsyncMethod_AddPersons, WithAsyncMethod_GetPersonsLimit, WithAsyncMethod_GetPersons:
template <class BaseClass> class WithAsyncMethod_AddPerson : public BaseClass template <class BaseClass> class WithAsyncMethod_AddPerson : public BaseClass template <class BaseClass> class WithAsyncMethod_GetPersonsLimit : public BaseClass template <class BaseClass> class WithAsyncMethod_GetPersons : public BaseClass
(7) 異步的服務類:
typedef WithAsyncMethod_AddPerson<WithAsyncMethod_AddPersons<WithAsyncMethod_GetPersonsLimit<WithAsyncMethod_GetPersons<Service>>>> AsyncService;
(8) 內部模板類 ExperimentalWithCallbackMethod_AddPerson, ExperimentalWithCallbackMethod_AddPersons, ExperimentalWithCallback_GetPersonsLimit, ExperimentalWithCallbackMethod_GetPersons:
template <class BaseClass> class ExperimentalWithCallbackMethod_AddPerson : public BaseClass template <class BaseClass> class ExperimentalWithCallbackMethod_AddPersons : public BaseClass template <class BaseClass> class ExperimentalWithCallbackMethod_GetPersonsLimit : public BaseClass template <class BaseClass> class ExperimentalWithCallbackMethod_GetPersons : public BaseClass
(9) 實驗性的帶回調函數的服務類:
typedef ExperimentalWithCallbackMethod_AddPerson<ExperimentalWithCallbackMethod_AddPersons<ExperimentalWithCallbackMethod_GetPersonsLimit<ExperimentalWithCallbackMethod_GetPersons<Service>>>> ExperimentalCallbackService;
(10) 內部模板類, WithGenericMethod_AddPerson, WithGenericMethod_AddPersons, WithGenericMethod_GetPersonsLimit, WithGenericMethod_GetPersons:
template <class BaseClass> class WithGenericMethod_AddPerson : public BaseClass template <class BaseClass> class WithGenericMethod_AddPersons : public BaseClass template <class BaseClass> class WithGenericMethod_GetPersonsLimit : public BaseClass template <class BaseClass> class WithGenericMethod_GetPersons : public BaseClass
(11) 內部模板類, WithRawMethod_AddPerson, WithRawMethod_AddPersons, WithRawMethod_GetPersonsLimit, WithRawMethod_GetPersons:
template <class BaseClass> class WithRawMethod_AddPerson : public BaseClass template <class BaseClass> class WithRawMethod_AddPersons : public BaseClass template <class BaseClass> class WithRawMethod_GetPersonsLimit : public BaseClass template <class BaseClass> class WithRawMethod_GetPersons : public BaseClass
(12) 內部模板類, ExperimentalWithRawCallbackMethod_AddPerson, ExperimentalWithRawCallbackMethod_AddPersons, ExperimentalWithRawCallbackMethod_GetPersonsLimit, ExperimentalWithRawCallbackMethod_GetPersons:
template <class BaseClass> class ExperimentalWithRawCallbackMethod_AddPerson : public BaseClass template <class BaseClass> class ExperimentalWithRawCallbackMethod_AddPersons : public BaseClass template <class BaseClass> class ExperimentalWithRawCallbackMethod_GetPersonsLimit : public BaseClass template <class BaseClass> class ExperimentalWithRawCallbackMethod_GetPersons : public BaseClass
(13) 內部模板類, WithStreamedUnaryMethod_AddPerson, WithSplitStreamingMethod_GetPersonsLimit:
template <class BaseClass> class WithStreamedUnaryMethod_AddPerson : public BaseClass template <class BaseClass> class WithSplitStreamingMethod_GetPersonsLimit : public BaseClass
(14) 額外類型的服務定義:
typedef WithStreamedUnaryMethod_AddPerson<Service > StreamedUnaryService; typedef WithSplitStreamingMethod_GetPersonsLimit<Service > SplitStreamedService; typedef WithStreamedUnaryMethod_AddPerson<WithSplitStreamingMethod_GetPersonsLimit<Service > > StreamedService;
關於生成文件的講解, 就差不多這些了, 有空應該講一下grpc內部調用的邏輯.
下面給出服務端重載proto的Manage服務的代碼:
#include <string> #include <grpc/grpc.h> #include <grpcpp/server.h> #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> #include <folly/concurrency/ConcurrentHashMap.h> #include "person.grpc.pb.h" class PersonManager { public: explicit PersonManager() { } // AddPerson 用來添加一個人 bool AddPerson(const tutorial::Person& p) { m_persons.insert(p.name(), p); return true; } // GetPerson 用來查找一個人 tutorial::Person GetPerson(const std::string& name) const { return m_persons.at(name); } // GetPersons 用來獲取多個人 std::vector<tutorial::Person> GetPersons(int num) const { std::vector<tutorial::Person> personList; auto it = m_persons.begin(); while (it != m_persons.end()) { if (static_cast<int>(personList.size()) > num) { return personList; } personList.push_back(it->second); ++it; } return personList; } private: folly::ConcurrentHashMap<std::string, tutorial::Person> m_persons; }; class PersonService : public tutorial::Manage::Service { public: explicit PersonService() { } // AddPerson 用來添加一個人 grpc::Status AddPerson(grpc::ServerContext* context, const tutorial::Person *person, tutorial::Result *res) override { m_mgr.AddPerson(*person); res->set_success(true); return grpc::Status::OK; } // AddPersons 用來添加多個用戶 grpc::Status AddPersons(grpc::ServerContext* context, grpc::ServerReader<tutorial::Person>* reader, tutorial::Result *res) override { tutorial::Person person; while (reader->Read(&person)) { m_mgr.AddPerson(person); } res->set_success(true); return grpc::Status::OK; } // GetPersonsLimit 用來查詢一個人 grpc::Status GetPersonsLimit(grpc::ServerContext* context, const tutorial::ReqNum *num, grpc::ServerWriter<tutorial::Person>* writer) override { auto persons = m_mgr.GetPersons(num->num()); for (const auto& person : persons) { writer->Write(person); } return grpc::Status::OK; } // GetPersons 用來根據人名獲取所有的人 grpc::Status GetPersons(grpc::ServerContext *context, grpc::ServerReaderWriter<tutorial::Person, tutorial::ReqName>* stream) override { tutorial::ReqName name; while (stream->Read(&name)) { try { stream->Write(m_mgr.GetPerson(name.name())); } catch (const std::out_of_range& ex) { // 如果出現越界的問題, 則說明不存在 } } return grpc::Status::OK; } private: PersonManager m_mgr; };
下面給出創建grpc服務器的代碼:
#include <grpcpp/resource_quota.h> #include "person_manage.h" // maxThreadNum 根據計算機硬件設置 const int maxThreadNum = 20; void RunServer() { std::string server_address("localhost:50001"); PersonService service; grpc::ServerBuilder builder; grpc::ResourceQuota quota; quota.SetMaxThreads(maxThreadNum); builder.SetResourceQuota(quota); builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); } int main(int argc, char** argv) { RunServer(); return 0; }
下面給出客戶端對proto中的Manage服務的封裝代碼:
#include <memory> #include <vector> #include <thread> #include <grpc/grpc.h> #include <grpcpp/channel.h> #include <grpcpp/client_context.h> #include <grpcpp/create_channel.h> #include <grpcpp/security/credentials.h> #include "person.grpc.pb.h" class PersonManip { public: PersonManip(std::shared_ptr<grpc::Channel> channel) : m_stub(tutorial::Manage::NewStub(channel)) { } // 添加一個用戶 bool AddPerson(const tutorial::Person& person) { grpc::ClientContext context; tutorial::Result res; grpc::Status status = m_stub->AddPerson(&context, person, &res); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; } return res.success(); } // 添加多個用戶, 當前的服務端實現可能造成部分插入的情況 bool AddPersons(const std::vector<tutorial::Person>& persons) { grpc::ClientContext context; tutorial::Result res; std::unique_ptr<grpc::ClientWriter<tutorial::Person>> writer( m_stub->AddPersons(&context, &res)); for (const auto& person : persons) { if (!writer->Write(person)) { // Broken stream. break; } } writer->WritesDone(); grpc::Status status = writer->Finish(); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; } return res.success(); } // 獲取限定數目的用戶 bool GetPersonsLimit(int limitNum, std::vector<tutorial::Person>& persons) { grpc::ClientContext context; tutorial::ReqNum limit; limit.set_num(limitNum); std::unique_ptr<grpc::ClientReader<tutorial::Person>> reader( m_stub->GetPersonsLimit(&context, limit)); tutorial::Person person; while (reader->Read(&person)) { persons.push_back(person); } grpc::Status status = reader->Finish(); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; } return true; } // 獲取所有指定名字的用戶 bool GetPersons(const std::vector<std::string>& personNames, std::vector<tutorial::Person>& persons) { grpc::ClientContext context; std::shared_ptr<grpc::ClientReaderWriter<tutorial::ReqName, tutorial::Person>> stream( m_stub->GetPersons(&context)); std::thread writer([stream, &personNames]() { for (const auto& personName : personNames) { tutorial::ReqName name; name.set_name(personName); stream->Write(name); } stream->WritesDone(); }); tutorial::Person person; while (stream->Read(&person)) { persons.push_back(person); } writer.join(); grpc::Status status = stream->Finish(); if (!status.ok()) { std::cout << "status error: " << status.error_message() << std::endl; return false; } return true; } private: std::unique_ptr<tutorial::Manage::Stub> m_stub; };
下面給出客戶端測試的代碼:
#include "person_manip.h" tutorial::Person makePerson(const std::string& name, int id, const std::string& email) { tutorial::Person person; person.set_name(name); person.set_id(id); person.set_email(email); return person; } void printPersons(const std::vector<tutorial::Person>& persons) { for (const auto& p : persons) { std::cout << "name: " << p.name() << " " << "id: " << p.id() << " " << "email: " << p.email() << std::endl; } std::cout << std::endl; } int main(int argc, char **argv) { PersonManip manip( grpc::CreateChannel("localhost:50001", grpc::InsecureChannelCredentials())); auto person = makePerson("Tom", 1, "tom@gmail.com"); auto suc = manip.AddPerson(person); if (!suc) { std::cout << "manip.AddPerson failed." << std::endl; return -1; } person = makePerson("Lilly", 2, "lilly@gmail.com"); auto person2 = makePerson("Jim", 3, "jim@gmail.com"); std::vector<tutorial::Person> persons{person, person2}; suc = manip.AddPersons(persons); if (!suc) { std::cout << "manip.AddPersons failed." << std::endl; return -1; } std::vector<tutorial::Person> resPersons; suc = manip.GetPersonsLimit(5, resPersons); if (!suc) { std::cout << "manip.GetPersonsLimit failed." << std::endl; return -1; } std::cout << "manip.GetPersonsLimit output:" << std::endl; printPersons(resPersons); resPersons.clear(); std::vector<std::string> personNames; for (const auto& p : persons) { personNames.push_back(p.name()); } suc = manip.GetPersons(personNames, resPersons); if (!suc) { std::cout << "manip.GetPersons failed." << std::endl; return -1; } std::cout << "manip.GetPersons output:" << std::endl; printPersons(resPersons); return 0; }
這個我沒有使用單元測試, 可能使用單元測試會更好, 不過根據客戶端代碼和輸出, 也可以驗證服務的正確性.
完整的代碼參考: https://github.com/ss-torres/person-service.git
如果有什么建議或者提議, 歡迎提出