一般tars客戶端使用方式:
我們用客戶端進行tars rpc調用時候,一般如下面這樣寫:
方式一、 //直連方式 TC_Endpoint ep; AdminFPrx pAdminPrx; //服務管理代理 string sAdminPrx = "AdminObj@"+_serverObjectPtr->getLocalEndpoint().toString(); pAdminPrx = Application::getCommunicator()->stringToProxy<AdminFPrx>(sAdminPrx); sResult = pAdminPrx->notify(_msg); 方式二、 PatchPrx proxy = Application::getCommunicator()->stringToProxy<PatchPrx>(_patchRequest.patchobj); proxy->tars_timeout(60000); 方式三、 NotifyPrx pNotifyPrx = Application::getCommunicator()->stringToProxy<NotifyPrx>(ServerConfig::Notify); if (pNotifyPrx && sResult != "") { pNotifyPrx->async_reportServer(NULL, sServerId, "", sResult); }
這里只是列舉了幾種,還有更多的寫法。其實都是分成兩步:
1由Application::getCommunicator()->stringToProxy(strObjName)得到一個proxyPtr。
2再用proxyPtr,同步,或者異步等各種rpc調用對應的方法.
其實上面的調用,也等同於:
"ApplicationName.ServerName.ServerObjName@tcp -h ip1 -t 60000 -p port1;tcp -h ip2 -t 60000 -p port1"
可以根據servantname去直連對應的機器
那么核心流程就是stringToProxy和proxyPtr里面的實現。
獲取Proxy的實現
stringToProxy的偽代碼實現是: stringToProxy(const string& objectName,const string& setName="") { return ServantProxyFactory->getServantProxy(objectName,setName); } getServantProxy的實現偽代碼是:
getServantProxy(const string& name,const string& setName) { if _servantProxy.find(name+":"+setName) return findvalue; ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()]; for(size_t i = 0; i < _comm->getClientThreadNum(); ++i) { ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName); }
ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum()); //設置同步調用超時3ss。異步調用超時5s。連接超時1.5s sp->tars_timeout(_comm->getProperty("sync-invoke-timeout", "3000")); sp->tars_async_timeout(_comm->getProperty("async-invoke-timeout", "5000")); sp->tars_connect_timeout(_comm->getProperty("connect-timeout", "1500")); _servantProxy[name+":"+setName]=sp; return sp; }
上面代碼說白了就是建立一個ServantProxy.並且他上面會帶getClientThreadNum個數的ObjectProxy.一個ObjectProxy對應一個線程。然后緩存起來。
那么就有個問題getClientThreadNum和sync-invoke-timeout這些東東是配在哪的呢?
其實是在#define CONFIG_ROOT_PATH "/tars/application/client" 下面的對應內容.
getClientThreadNum值是netthread字段。默認是1。也就是基本上大部分tars服務一般就一個客戶端線程.
ServantProxy部分內容查看 《ServantProxy部分模塊》
rpc調用的實現
每個.tars文件最終都會被tars parser編譯生成一個.h及.hpp文件,單就interface部分而已,會生成帶對應interfacename字符的一些類:
客戶端用:
PrxCallback類(普通異步回調處理類),
PrxCallbackPromise類(promise異步回調處理類),
CoroPrxCallback類(協程異步回調處理類),
客戶端Proxy實現類。上面3個類的處理流程都會在這個類的異步調用中用到
服務端用:
Servant類。包括 下面幾個部分:
目標接口的純虛函數。
以及目標接口的被調用回復結果async_response函數(里面自動生成代碼 實現了對響應結果進行統計)。
onDispatch函數,此函數自動生成代碼,實現了對發送過來的請求,根據請求TarsCurrentPtr中的接口名找到對應的函數(用的是一個vector來 switch case 函數在接口組中的序號,加快匹配速度);從TarsCurrentPtr中解包出對應結構體;執行剛才找到的函數,並將執行結果返回給結果參數
這幾塊類的具體實現的內容,參考下面AdminReg部分的介紹
比如AdminReg.tars中的interface AdminReg生成有:
//這3個是異步回調處理類
class AdminRegPrxCallback: public tars::ServantProxyCallback;
class AdminRegPrxCallbackPromise: public tars::ServantProxyCallback;
class AdminRegCoroPrxCallback: public AdminRegPrxCallback;
//下面是主要功能實現類
class AdminRegProxy : public tars::ServantProxy; 客戶端類
class AdminReg : public tars::Servant; 服務端類
這一大坨代碼很長很長。都是.tars文件自動生成的一套通用模板類。里面的具體實現介紹如下:
class AdminRegProxy : public tars::ServantProxy;類
給每個接口都實現了如下模式代碼,詳解看下面注釋:
typedef map<string, string> TARS_CONTEXT; //同步調用接口. tars::Int32 addTaskReq(const tars::TaskReq & taskReq,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL) { //將請求參數打包到TarsOutputStream<tars::BufferWriter>,其實是序列化 tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); tars::ResponsePacket rep; std::map<string, string> _mStatus; //調用tars_invoke。將對應函數名和請求,返回等參數傳入 tars_invoke(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, rep); //返回請求context結果..具體作用后面再分析 if(pResponseContext) { *pResponseContext = rep.context; } //將返回結果從TarsOutputStream<tars::BufferWriter>反序列化出來 tars::TarsInputStream<tars::BufferReader> _is; _is.setBuffer(rep.sBuffer); tars::Int32 _ret; _is.read(_ret, 0, true); //返回調用返回值 return _ret; } //異步調用接口. void async_addTaskReq(AdminRegPrxCallbackPtr callback,const tars::TaskReq &taskReq,const map<string, string>& context = TARS_CONTEXT()) { //打包參數 tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); std::map<string, string> _mStatus; //調用tars_invoke_async.異步調用 接口函數,將參數傳入 tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback); } //promise調用. promise::Future< AdminRegPrxCallbackPromise::PromiseaddTaskReqPtr > promise_async_addTaskReq(const tars::TaskReq &taskReq,const map<string, string>& context) { promise::Promise< AdminRegPrxCallbackPromise::PromiseaddTaskReqPtr > promise; AdminRegPrxCallbackPromisePtr callback = new AdminRegPrxCallbackPromise(promise); tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); std::map<string, string> _mStatus; tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback); return promise.getFuture(); } //協程調用接口. void coro_addTaskReq(AdminRegCoroPrxCallbackPtr callback,const tars::TaskReq &taskReq,const map<string, string>& context = TARS_CONTEXT()) { tars::TarsOutputStream<tars::BufferWriter> _os; _os.write(taskReq, 1); std::map<string, string> _mStatus; tars_invoke_async(tars::TARSNORMAL,"addTaskReq", _os.getByteBuffer(), context, _mStatus, callback, true); }
看上面的實現方式。雖然看起來有4種方式,但實際應該只算是兩種:
通過 tars_invoke同步調用 或 tars_invoke_async 異步調用實現。只是異步調用時候,回調處理的模式分三種類,AdminRegPrxCallback,AdminRegPrxCallbackPromise,AdminRegCoroPrxCallback
另外還包括:
tars_hash,tars_consistent_hash,tars_set_timeout這幾個默認接口
class AdminRegPrxCallback: public tars::ServantProxyCallback;
class AdminRegPrxCallbackPromise: public tars::ServantProxyCallback;
class AdminRegCoroPrxCallback: public AdminRegPrxCallback;
這幾個類都實現了每個函數的默認回調處理函數和異常處理函數,並且也實現了OnDispatch。
類似於上面Servant類的OnDispatch處理流程。
如果結果報錯,將對應回調的異常處理函數執行下;結果正確,把返回結果反序列化出來,執行下前面生成的回調處理函數即可。3種異步處理類都類似
其中 tars的作者大佬 更推薦用協程方式處理異步調用,這樣使用方便很多。所以親,基本上你用的時候,看同步調用,普通異步調用及協程調用就可。
ServantProxy::invoke的具體實現流程
在ServantProxy類中,tars_invoke與tars_invoke_async其實都只是個中間函數,其實都是將參數打包到ReqMessage msg,然后傳給invoke.重點流程都是在invoke實現
在invoke流程中:
1、先讀取ServantProxyThreadData線程共享數據pSptd.並將對應 染色,哈希方式,超時配置 設置給msg參數. ServantProxyThreadData類的實現細節,參照
2、根據pSptd,調用selectNetThreadInfo選取一個ObjectProxy和ReqInfoQueue。具體實現參考ServantProxy::selectNetThreadInfo部分章節.
3、如果當前objProxy是set調用模式,更改msg中參數成objProxy中對應set
4、若是同步調用。。若是pSptd->_sched協程有值,則設置msg中協程為pSptd->_sched;若非協程,創建一個ReqMonitor並賦值給msg中pMonitor(這個好像就是一個lock)。
5、若是異步調用並且也是協程調用:錯誤檢測;設置msg中協程為pSptd->_sched
6、往ReqInfoQueue隊尾插入msg並調用pObjProxy->getCommunicatorEpoll()->notify(序號,ReqInfoQueue)通知epoll去消費;如果隊列滿了插入msg失敗,打log,還是會通知epoll去消費隊此隊列,並拋出異常
7、如果是異步調用,流程已完成,返回;
如果是同步調用,阻塞在此等結果返回。如果是協程模式,yield阻塞在此;如果是普通阻塞,pMonitor->wait()在此等待直到網絡線程通知過來。
退出阻塞狀態時,檢查退出原因。正常退出返回;超時拋出超時異常,異常拋出對應異常。
而CommunicatorEpoll->notify()中的代碼也簡單。如果CommunicatorEpoll中的對應序號的請求通知NotifyInfo是有效的,則將傳入的ReqInfoQueue用epoll的EPOLL_CTL_ADD傳入EPOLLIN事件插入epoll那邊隊列;如果NotifyInfo是無效的,則直接調用EPOLL_CTL_MOD傳入一個EPOLLIN事件。
至此,關鍵流程就走到CommunicatorEpoll那邊的流程了。
CommunicatorEpoll中對應的調用流程:
CommunicatorEpoll類的成員描述和參數細節,可看《Communicator通信器相關部分》中CommunicatorEpoll部分的內容。此處只介紹上面消息包到此處后的處理流程。
CommunicatorEpoll::run().這是線程循環總流程.偽代碼如下:
while(){ try{ //epoll.wait() int num = _ep.wait(iTimeout); //先處理epoll的網絡事件 for (int i = 0; i < num; ++i){ const epoll_event& ev = _ep.get(i); handle((FDInfo*)ev.data.u64, ev.events); } //處理超時請求 doTimeout(); //數據上報 doStat(); } catch (...){ ... } }
無論是ServantProxy::invoke()中投遞到epoll中要發送的內容,還是收到服務器那邊給的返回,都會在此epoll隊列中等待處理.很明顯,handle()函數很重要。
CommunicatorEpoll::handle()的偽代碼如下:
try{ //隊列有消息通知過來。這類消息就是對ServantProxy::invoke()中投遞過來消息的處理環節 if(FDInfo::ET_C_NOTIFY == pFDInfo->iType) { ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p; ReqMessage * msg = NULL; try { while(pInfoQueue->pop_front(msg)) { //線程退出 if(ReqMessage::THREAD_EXIT == msg->eType) { _ep.del(_notify[pFDInfo->iSeq].notify.getfd(),(long long)&_notify[pFDInfo->iSeq].stFDInfo, EPOLLIN); delete pInfoQueue; 清 _notify[pFDInfo->iSeq]相關操作..完全看不懂 這么設置有啥好的 .... return; } try { //真正處理 msg->pObjectProxy->invoke(msg); } catch(...) { ... } } } catch{ ..... } } else { Transceiver *pTransceiver = (Transceiver*)pFDInfo->p; //先收包 if (events & EPOLLIN) { try { handleInputImp(pTransceiver); } catch.... } //發包 if (events & EPOLLOUT) { try { handleOutputImp(pTransceiver); } catch.... } //連接出錯 直接關閉連接 if(events & EPOLLERR) { try { pTransceiver->close(); } catch... } } } catch{ .... }
tars代碼中,這些重要環節全部用try-catch 挨個環節包起來,這樣哪步報錯,出啥錯,都一清二楚.
上面流程中,收包發包后續再分析,先看下pObjectProxy->invoke(msg)流程。
ObjectProxy::invoke流程的實現
偽代碼如下:
//選擇一個遠程服務的Adapter來調用 AdapterProxy * pAdapterProxy = NULL; //根據請求策略從可用的服務列表選擇一個服務節點 bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy); //這段是啥意思呢?其實是初始化的時候是無效的數據 //只有請求過主控或者從文件緩存加載的數據才是有效數據,這里判斷是否請求過主控 if(bFirst) { //未請求過主控,無效數據. //則把數據緩存在obj _reqTimeoutQueue里面 bool bRet = _reqTimeoutQueue.push(msg,msg->request.iTimeout+msg->iBeginTime); assert(bRet); return; } //未選到AdapterProxy if(!pAdapterProxy) { msg->response.iRet = TARSADAPTERNULL; doInvokeException(msg); return ; } msg->adapter = pAdapterProxy; pAdapterProxy->invoke(msg);
selectAdapterProxy很重要,是關鍵流程。一般我們rpc調用,被調服務端可能會有很多台機器,具體選擇哪一台(通過哈希?還是輪詢?還是set? 都是通過selectAdapterProxy來實現),這塊內容,看《selectAdapterProxy實現流程》章節詳細介紹。
另外這里還有個函數叫,ObjectProxy::doInvoke,跟ObjectProxy::invoke大體一致。其實這個函數就是上面selectAdapterProxy返回為true時。這個東東其實是,第一次請求(或者此servant信息被清除掉時){完全看不懂系列,此處貌似有問題,重看代碼,感覺是每次ObjectProxy::invoke都會跑到此處並refreshReg(),這個就有點奇怪了。}這個servant時,並沒有建立好與此Servant后面提供功能的服務器相關的內容,那么就要先去registry請求調用servant相關信息,才能找到servant提供功能的服務器上。那么去registry請求數據,這個過程是阻塞式的,不可能一直在此等着。所以_reqTimeoutQueue.push先把消息扔隊列里,等請求結果返回后QueryEpBase::doEndpoints(),再回ObjectProxy::doInvoke,這里執行一下該有的發送的流程(跟ObjectProxy::invoke處理一致)。
如果未選到AdapterProxy,先給msg->response.iRet = TARSADAPTERNULL 表示"客戶端選路為空,服務不存在或者所有服務down掉了";再走到ObjectProxy::doInvokeException().
這個函數的實現,簡單來說就是同步的發通知給msg->pMonitor之前的wait解除;異步的調用之前msg上設置的回調函數onDispatch;協程的調用msg->sched->put(msg->iCoroId)。反正就是對應通知的結果已搞完的意思。
如果選擇好正確的pAdapterProxy之后,調用pAdapterProxy->invoke(msg)。
這個函數的流程:
1、自身未發鏈表長度檢查,長度跟communicatorEpoll中的nosendqueuelimit一樣,默認是1000。過了就丟掉本次請求,並執行finishInvoke()跟上面doInvokeException差不多流程的代碼通知本次調用結果。
2、調用_objectProxy->getProxyProtocol().requestFunc。此函數是ObjectProxy章節介紹中的協議解析器設置的 請求發包前 協議的一些處理函數,默認requestFunc=ProxyProtocol::tarsRequest()。如果你有定制協議,此處就會走到你定制協議的壓請求包流程。
3、如果超時隊列中有數據未發送完,則走下面失敗流程;隊列空,立即執行Transceiver::sendRequest()發送(這個東東就是tcp執行tcp的,udp執行udp的了send),如果發送失敗,則走下面失敗流程,成功,刪掉msg指針,返回成功。注意這里真正執行發送時候,數據過長會緩存到發送TcpTransceiver的_sendBuffer中。在下次發送之前,或者是下次epoll的EPOLLOUT寫請求時候,會再次發送,清空此發送隊列。這個不是重點,細節就不講了
4、第3步如果失敗,走到這里,把這個消息塞入超時隊列。如果插入隊列失敗,那估計是過載了,調用finishInvoke()。返回失敗碼
5、到這里 還剩下2個問題沒弄清楚:
1]超時隊列中數據 后面會怎么處理.
超時隊列中有兩種數據,一種是 未發送成功的數據,其包含在未發送數據列表中;一種是 已發送數據。
對於未發送數據。這里有個很坑的地方,我看了很久才看懂。在發送成功時候往超時列表里塞數據時,這個超時列表的未發送數據隊列是不會把這個數據加進未發送標記的,只有發送失敗的才會加入未發送標記隊列,然后被AdapterProxy::doInvoke中會循環去讀取超時列表中的未發送數據,調用Transceiver發送。(第一遍看代碼很容易認為此次寫的有問題,發送成功的也往這個隊列里塞再發一次,其實是不會的)。
對於已發送數據。因為Transceiver::connect的時候,會把操作的fd設置到CommunicatorEpoll對應的epoll中。所以如果服務端請求結果回來了,首先原有套接字會收到一個EPOLLIN,再調用到CommunicatorEpoll::handleInputImp,這里最終會調用到Transceiver::doResponse。
這里會執行readv(tcp)或者recv(udp)收數據;再調用前面設的ProxyProtocol.responseFunc解包函數將數據內容讀取到一個list<ResponsePacket>中;再調用finishInvoke(ResponsePacket)輪詢處理每個ResponsePacket,找到對應的ReqMessage,將請求結果狀態,及結果設定好。后面流程跟前面講過的finishInvoke(msg)一致了。
另外,對這個超時隊列的處理,還有個超時流程,在AdapterProxy::doTimeout()中會把超時沒處理的數據,不管是否發送成功過的,都移除掉,並置異常標志,再調用finishInvoke()。
2]epoll中的發包事件EPOLLOUT 從哪而來.
EPOLLOUT 發包事件,在鏈接創建成功時會設置epoll監聽此事件,除此之外。沒看到啥rpc調用的邏輯會跑到這里來。。有點怪呀。完全沒看懂系列。
至此,客戶端請求發送rpc的流程,基本上走通了。