grpc使用記錄(三)簡單異步服務實例
目錄
編寫異步服務和編寫同步服務的基本流程都差不多,稍有點區別。
同步服務你只需要實現相關服務接口的實現即可,不需要管理太多東西。異步服務GRPC運行時會把讀取到的客戶端請求放入CompletionQueue中,需要主動從中取出,然后進行相關的處理,可以多線程也可以單線程。
1、編寫proto文件,定義服務
這里和grpc使用記錄(二)簡單同步服務實例中的一樣,這里就不多說了。
2、編譯proto文件,生成代碼
這里也是和grpc使用記錄(二)簡單同步服務實例中的一樣的。
3、編寫服務端代碼
這里可以復用前面同步服務的代碼,只需要做簡單的修改即可。
簡單說一下創建一個GRPC異步服務的要點:
- 1、創建服務對象的時候要創建
AsyncService
,而不是Service
。 - 2、至少需要添加一個
grpc::ServerCompletionQueue
用於異步任務操作。 - 3、必須要通過
AsyncService::RequestXXXX
來注冊XXXX
接口的處理。 - 4、一個客戶端請求的處理可簡單的分為兩個步驟:1、構建返回給客戶端的響應數據;2、發送響應數據給客戶端。
- 5、完成隊列和注冊請求處理都可以有多個,不一定非得是一個。
async_service.cpp
下面代碼簡單的創建了3個HandlerContext
的結構體類型,用於保存三個接口請求處理過程中的數據,實際的請求處理還是和之前同步服務的一樣,這里只是寫成了Test1
、Test2
、Test3
三個函數的形式。
// > g++ -o aservice async_service.cpp simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated
#include "simple.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <memory>
#include <iostream>
#include <strstream>
struct HandlerContext {
// 當前處理狀態(處理分為兩步:1處理請求構建響應數據;2發送響應)
// 這里記錄一下完成到哪一步了,以便進行相關操作
int status_; // (1構建響應完成;2發送完成)
// rpc的上下文,允許通過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。
grpc::ServerContext ctx_;
};
struct HandlerTest1Context:public HandlerContext {
// 用於接收客戶端發送的請求
Simple::TestRequest req_;
// 用於發送響應給客戶端
Simple::TestNull rep_;
// 發送到客戶端的方法對象
grpc::ServerAsyncResponseWriter<Simple::TestNull> responder_;
// 構造函數
HandlerTest1Context()
:responder_(&ctx_)
{}
};
struct HandlerTest2Context:public HandlerContext {
// 用於接收客戶端發送的請求
Simple::TestNull req_;
// 用於發送響應給客戶端
Simple::TestReply rep_;
// 發送到客戶端的方法對象
grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
// 構造函數
HandlerTest2Context()
:responder_(&ctx_)
{}
};
struct HandlerTest3Context:public HandlerContext {
// 用於接收客戶端發送的請求
Simple::TestRequest req_;
// 用於發送響應給客戶端
Simple::TestReply rep_;
// 發送到客戶端的方法對象
grpc::ServerAsyncResponseWriter<Simple::TestReply> responder_;
// 構造函數
HandlerTest3Context()
:responder_(&ctx_)
{}
};
// Test1 實現都是差不都的,這里只是為了測試,就隨便返回點數據了
grpc::Status Test1(grpc::ServerContext* context,
const Simple::TestRequest* request,
Simple::TestNull* response)
{
printf("%s %d\n",__func__,__LINE__);
std::ostrstream os;
os << "Client Name = " << request->name() << '\n';
os << "Clinet ID = " << request->id() << '\n';
os << "Clinet Value= " << request->value()<< '\n';
std::string message = os.str();
// grpc狀態可以設置message,所以也可以用來返回一些信息
return grpc::Status(grpc::StatusCode::OK,message);
}
// Test2
grpc::Status Test2(grpc::ServerContext* context,
const Simple::TestNull* request,
Simple::TestReply* response)
{
printf("%s %d\n",__func__,__LINE__);
response->set_tid(100);
response->set_svrname("Simple Server");
response->set_takeuptime(0.01);
return grpc::Status::OK;
}
// Test3
grpc::Status Test3(grpc::ServerContext* context,
const Simple::TestRequest* request,
Simple::TestReply* response)
{
printf("%s %d\n",__func__,__LINE__);
std::ostrstream os;
os << "Client Name = " << request->name() << '\n';
os << "Clinet ID = " << request->id() << '\n';
os << "Clinet Value= " << request->value()<< '\n';
std::string message = os.str();
response->set_tid(__LINE__);
response->set_svrname(__FILE__);
response->set_takeuptime(1.234);
// grpc狀態可以設置message
return grpc::Status(grpc::StatusCode::OK,std::move(message));
}
int main()
{
// 服務構建器,用於構建同步或者異步服務
grpc::ServerBuilder builder;
// 添加監聽的地址和端口,后一個參數用於設置認證方式,這里選擇不認證
builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
// 創建一個異步服務對象
Simple::Server::AsyncService service;
// 注冊服務
builder.RegisterService(&service);
// 添加一個完成隊列,用於與 gRPC 運行時異步通信
std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();
// 構建服務器
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout<<"Server Runing"<<std::endl;
// 這里用一個map來記錄一下下面要進行處理的請求
// 因為這里也是單線程的,所以不加鎖了
std::map<HandlerContext*,int> handlerMap; // value用於記錄是Test1還是2、3
{
// 先創建三個類型接口的請求處理上下文對象
HandlerTest1Context* htc1 = new HandlerTest1Context;
htc1->status_ = 1; // 設置狀態為1(因為只需要區分是否已經發送響應完成)
HandlerTest2Context* htc2 = new HandlerTest2Context;
htc2->status_ = 1;
HandlerTest3Context* htc3 = new HandlerTest3Context;
htc3->status_ = 1;
// 將三個上下文對象存入map中
handlerMap[htc1] = 1; // 值用於區分是哪個類型
handlerMap[htc2] = 2;
handlerMap[htc3] = 3;
// 進入下面死循環前需要先注冊一下請求
service.RequestTest1(
&htc1->ctx_ /*服務上下文對象*/,
&htc1->req_ /*用於接收請求的對象*/,
&htc1->responder_ /*異步寫響應對象*/,
cq_ptr.get() /*新的調用使用的完成隊列*/,
cq_ptr.get() /*通知使用的完成隊列*/,
htc1 /*唯一標識tag*/);
service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
}
// 異步服務這里不能使用 server.Wait() 來等待處理,因為是異步服務
// 服務器會把到達的請求放入隊列,需要自己從完成隊列取出請求進行處理
// 所以這里需要一個死循環來獲取請求並進行處理
while(true){
// 前面已經注冊了請求處理,這里阻塞從完成隊列中取出一個請求進行處理
HandlerContext* htc = NULL;
bool ok = false;
GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
GPR_ASSERT(ok);
// 根據tag判斷是哪一個請求
// 因為前面注冊請求處理的時候使用的就是對象地址
// 所以這里直接從map里面取出來判斷即可
int type = handlerMap[htc];
// 判斷狀態,看是不是已經響應發送了
if(htc->status_ == 2) {
// 從map中移除
handlerMap.erase(htc);
// 因為這里並不是多態類,必須根據類型操作
switch(type) {
case 1:
{
// 釋放對象(這里未對這個對象進行復用)
delete (HandlerTest1Context*)htc;
}
break;
case 2:
{
delete (HandlerTest2Context*)htc;
}
break;
case 3:
{
delete (HandlerTest3Context*)htc;
}
break;
}
continue; // 回到從完成隊列獲取下一個
}
// 根據type進行相應的處理
switch(type) {
case 1: /*Test1的處理*/
{
// 重新創建一個請求處理上下文對象(以便不影響下一個請求的處理)
HandlerTest1Context* htc1 = new HandlerTest1Context;
htc1->status_ = 1; // 設置狀態為1
handlerMap[htc1] = 1; // 保存到handlerMap中
service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
cq_ptr.get(),cq_ptr.get(),htc1);
HandlerTest1Context* h = (HandlerTest1Context*)htc;
grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
// 設置狀態為發送響應
h->status_ = 2;
// 調用responder_進行響應發送(異步)
h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
}
break;
case 2: /*Test2的處理*/
{
HandlerTest2Context* htc2 = new HandlerTest2Context;
htc2->status_ = 1; // 設置狀態為1
handlerMap[htc2] = 2; // 保存到handlerMap中
service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
cq_ptr.get(),cq_ptr.get(),htc2);
HandlerTest2Context* h = (HandlerTest2Context*)htc;
grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
// 設置狀態為發送響應
h->status_ = 2;
// 調用responder_進行響應發送(異步)
h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
}
break;
case 3: /*Test3的處理*/
{
HandlerTest3Context* htc3 = new HandlerTest3Context;
htc3->status_ = 1; // 設置狀態為1
handlerMap[htc3] = 3; // 保存到handlerMap中
service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
cq_ptr.get(),cq_ptr.get(),htc3);
HandlerTest3Context* h = (HandlerTest3Context*)htc;
grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
// 設置狀態為發送響應
h->status_ = 2;
// 調用responder_進行響應發送(異步)
h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
}
break;
}
}
return 0;
}
async_service2.cpp
上面雖然是使用到了grpc的異步服務機制,但是只是為了描述清楚異步服務的創建過程,是一個單線程的簡陋實現。下面寫一個使用線程池的實現。
// > g++ -o aservice2 async_service2.cpp simple.grpc.pb.cc simple.pb.cc -std=c++11 -I. -lgrpc++ -lgrpc -lprotobuf -lgpr -lz -lcares -laddress_sorting -lpthread -Wno-deprecated
// 線程池的代碼可見 https://www.cnblogs.com/oloroso/p/5881863.html
#include "threadpool.h"
#include "simple.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <memory>
#include <iostream>
#include <strstream>
#include <chrono>
struct HandlerContextBase {
// 當前對象類型,用於確定是Test1/2/3哪一個請求的
int type_;
// 當前處理狀態(處理分為兩步:1處理請求構建響應數據;2發送響應)
// 這里記錄一下完成到哪一步了,以便進行相關操作
int status_; // (1構建響應完成;2發送完成)
// rpc的上下文,允許通過它進行諸如壓縮、身份驗證,以及把元數據發回客戶端等。
grpc::ServerContext ctx_;
};
template<typename RequestType,typename ReplyType>
struct HandlerContext:public HandlerContextBase {
// 用於接收客戶端發送的請求
RequestType req_;
// 用於發送響應給客戶端
ReplyType rep_;
// 發送到客戶端的方法對象
grpc::ServerAsyncResponseWriter<ReplyType> responder_;
//================================================
// 構造函數
HandlerContext()
:responder_(&ctx_)
{}
};
typedef HandlerContext<Simple::TestRequest,Simple::TestNull> HandlerTest1Context;
typedef HandlerContext<Simple::TestNull,Simple::TestReply> HandlerTest2Context;
typedef HandlerContext<Simple::TestRequest,Simple::TestReply> HandlerTest3Context;
unsigned long get_tid()
{
std::thread::id tid = std::this_thread::get_id();
std::ostrstream os;
os << tid;
unsigned long tidx = std::stol(os.str());
return tidx;
}
// Test1 實現都是差不都的,這里只是為了測試,就隨便返回點數據了
grpc::Status Test1(grpc::ServerContext* context,
const Simple::TestRequest* request,
Simple::TestNull* response)
{
printf("%s %d\n",__func__,__LINE__);
std::ostrstream os;
os << "Client Name = " << request->name() << '\n';
os << "Clinet ID = " << request->id() << '\n';
os << "Clinet Value= " << request->value()<< '\n';
std::string message = os.str();
// grpc狀態可以設置message,所以也可以用來返回一些信息
return grpc::Status(grpc::StatusCode::OK,message);
}
// Test2
grpc::Status Test2(grpc::ServerContext* context,
const Simple::TestNull* request,
Simple::TestReply* response)
{
printf("%s %d\n",__func__,__LINE__);
response->set_tid(100);
response->set_svrname("Simple Server");
response->set_takeuptime(0.01);
return grpc::Status::OK;
}
// Test3
grpc::Status Test3(grpc::ServerContext* context,
const Simple::TestRequest* request,
Simple::TestReply* response)
{
printf("%s %d\n",__func__,__LINE__);
int tid = get_tid();
std::ostrstream os;
os << "Client Name = " << request->name() << '\n';
os << "Clinet ID = " << request->id() << '\n';
os << "Clinet Value= " << request->value()<< '\n';
os << "Server TID = " << tid<<'\n';
std::string message = os.str();
// 休眠0.5秒,以便觀察異步執行的效果
std::this_thread::sleep_for(std::chrono::milliseconds(500));
response->set_tid(tid);
response->set_svrname(__FILE__);
response->set_takeuptime(1.234);
// grpc狀態可以設置message
return grpc::Status(grpc::StatusCode::OK,std::move(message));
}
int main()
{
// 服務構建器,用於構建同步或者異步服務
grpc::ServerBuilder builder;
// 添加監聽的地址和端口,后一個參數用於設置認證方式,這里選擇不認證
builder.AddListeningPort("0.0.0.0:33333",grpc::InsecureServerCredentials());
// 創建一個異步服務對象
Simple::Server::AsyncService service;
// 注冊服務
builder.RegisterService(&service);
// 添加一個完成隊列,用於與 gRPC 運行時異步通信
std::unique_ptr<grpc::ServerCompletionQueue> cq_ptr = builder.AddCompletionQueue();
// 構建服務器
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout<<"Server Runing"<<std::endl;
// 下面可以有幾個工作線程就先注冊幾個,也可以僅注冊一個(至少一個)
/*for(int i=0;i<4;++i)*/ {
// 先創建三個類型接口的請求處理上下文對象
HandlerTest1Context* htc1 = new HandlerTest1Context;
htc1->status_ = 1; // 設置狀態為1(因為只需要區分是否已經發送響應完成)
htc1->type_ = 1; // 設置類型為1
HandlerTest2Context* htc2 = new HandlerTest2Context;
htc2->status_ = 1;
htc2->type_ = 2;
HandlerTest3Context* htc3 = new HandlerTest3Context;
htc3->status_ = 1;
htc3->type_ = 3;
// 進入下面死循環前需要先注冊一下請求
service.RequestTest1(
&htc1->ctx_ /*服務上下文對象*/,
&htc1->req_ /*用於接收請求的對象*/,
&htc1->responder_ /*異步寫響應對象*/,
cq_ptr.get() /*新的調用使用的完成隊列*/,
cq_ptr.get() /*通知使用的完成隊列*/,
htc1 /*唯一標識tag*/);
service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,cq_ptr.get(),cq_ptr.get(),htc2);
service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,cq_ptr.get(),cq_ptr.get(),htc3);
}
// 創建線程池,使用4個工作線程,用於構建請求的響應
ThreadPool pool(4);
// 異步服務這里不能使用 server->Wait() 來等待處理,因為是異步服務
// 服務器會把到達的請求放入隊列,需要自己從完成隊列取出請求進行處理
// 所以這里需要一個死循環來獲取請求並進行處理
while(true){
// 前面已經注冊了請求處理,這里阻塞從完成隊列中取出一個請求進行處理
HandlerContextBase* htc = NULL;
bool ok = false;
GPR_ASSERT(cq_ptr->Next((void**)&htc, &ok));
GPR_ASSERT(ok);
// 根據tag判斷是哪一個請求
// 因為前面注冊請求處理的時候使用的就是對象地址
// 所以這里直接從map里面取出來判斷即可
int type = htc->type_;
// 判斷狀態,看是不是已經響應發送了
if(htc->status_ == 2) {
// 因為這里並不是多態類,必須根據類型操作
switch(type) {
case 1:
{
// 釋放對象(這里未對這個對象進行復用)
delete (HandlerTest1Context*)htc;
}
break;
case 2:
{
delete (HandlerTest2Context*)htc;
}
break;
case 3:
{
delete (HandlerTest3Context*)htc;
}
break;
}
continue; // 回到從完成隊列獲取下一個
}
// 重新創建一個請求處理上下文對象(以便能夠接受下一個請求進行處理)
switch(type) {
case 1:
{
HandlerTest1Context* htc1 = new HandlerTest1Context;
htc1->status_ = 1; // 設置狀態為1
htc1->type_ = 1; // 設置類型為1
service.RequestTest1(&htc1->ctx_,&htc1->req_,&htc1->responder_,
cq_ptr.get(),cq_ptr.get(),htc1);
}
break;
case 2:
{
HandlerTest2Context* htc2 = new HandlerTest2Context;
htc2->status_ = 1; // 設置狀態為1
htc2->type_ = 1; // 設置類型為2
service.RequestTest2(&htc2->ctx_,&htc2->req_,&htc2->responder_,
cq_ptr.get(),cq_ptr.get(),htc2);
}
break;
case 3:
{
HandlerTest3Context* htc3 = new HandlerTest3Context;
htc3->status_ = 1; // 設置狀態為1
htc3->type_ = 3; // 設置類型為3
service.RequestTest3(&htc3->ctx_,&htc3->req_,&htc3->responder_,
cq_ptr.get(),cq_ptr.get(),htc3);
}
break;
}
pool.enqueue([type,htc](){
// 根據type進行相應的處理
switch(type) {
case 1: /*Test1的處理*/
{
HandlerTest1Context* h = (HandlerTest1Context*)htc;
grpc::Status status = Test1(&h->ctx_,&h->req_,&h->rep_);
// 設置狀態為發送響應
h->status_ = 2;
// 調用responder_進行響應發送(異步)
h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
}
break;
case 2: /*Test2的處理*/
{
HandlerTest2Context* h = (HandlerTest2Context*)htc;
grpc::Status status = Test2(&h->ctx_,&h->req_,&h->rep_);
// 設置狀態為發送響應
h->status_ = 2;
// 調用responder_進行響應發送(異步)
h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
}
break;
case 3: /*Test3的處理*/
{
HandlerTest3Context* h = (HandlerTest3Context*)htc;
grpc::Status status = Test3(&h->ctx_,&h->req_,&h->rep_);
// 設置狀態為發送響應
h->status_ = 2;
// 調用responder_進行響應發送(異步)
h->responder_.Finish(h->rep_/*發送的響應*/,status/*狀態碼*/,htc/*請求處理的唯一tag*/);
}
break;
}
});
}
return 0;
}