RPC(遠過程調用)在分布式系統中是很常用的基礎通訊手段,核心思想是將不同進程之間的通訊抽象為函數調用,基本的過程是調用端通過將參數序列化到流中並發送給服務端,服務端從流中反序列化出參數並完成實際的處理,然后將結果序列化后返回給調用端。通常的RPC由接口形式來定義,接口定義服務的名字,接口方法定義每個請求的輸入參數和返回結果。RPC內部的序列化、網絡通訊等實現細節則由框架來完成,對用戶來說是完全透明的。之前我使用.net開發過一套輕量級的分布式框架(PPT看這里,視頻看這里),這套框架經過2年多的持續開發和改進已經運用到數款產品中(包括網絡游戲和分布式應用),取得了不錯的效果,等未來框架成熟后會考慮開源,本文討論的RPC基於這套框架展開。
通常我們的函數調用都是同步的,也就是調用方在發起請求后就能得到結果(成功返回結果失敗則拋出異常),中間不能去干其他事情,與這種模式對應的RPC稱之為請求-響應式模式。請求-響應式的優點在於時序清晰,邏輯簡單,和普通的函數調用完全等價。比如我們可以這樣定義RPC接口:
1 [Protocol(ID=1)] 2 public interface ICalculate 3 { 4 [DispId(1)] 5 int Add(int p1, int p2); 6 }
客戶端就可以像這樣使用接口:
1 var calculate = new ICalculateProxy();//ICalculateProxy為框架生成的接口代理類 2 calculate.Connect(url); 3 var result = calculate.Add(1, 2);
但是在分布式中這種模式的缺點也是非常的明顯,第一個問題是網絡通訊的延遲會嚴重的制約請求-響應式RPC的響應速度,使得請求吞吐量無法滿足性能需要,大量的CPU時間會阻塞在等待請求的響應上;第二個問題是請求-響應式只有由客戶端向服務端發起請求,服務端不能主動向客戶端發送事件通知,也就是缺乏一種callback機制。
針對請求-響應式的缺點我們可以用雙向通訊機制來改進,首先去掉請求的返回值,所有的方法請求都定義為無返回結果,這樣調用方在發出請求之后就可以繼續干后面的事情了,而不需要再等待服務返回結果。同時針對服務接口定義一個Callback接口用於服務端向客戶端發送請求結果和事件通知,這樣服務器就可以主動向客戶端發送消息了。這種RPC模式可以稱之為雙向會話式,接口可以這樣定義:
1 [Protocol(ID=1), Callback(typeof(ICalculateCallback))] 2 public interface ICalculate 3 { 4 [DispId(1)] 5 void Add(int p1, int p2); 6 } 7 8 public interface ICalculateCallback : IServiceCallback 9 { 10 [DispId(1)] 11 void OnAdd(int result); 12 }
服務端可以這樣實現服務接口:
1 public class CaculateService : ICaculateImpl //這里ICaculateImpl為框架生成的服務實現接口 2 { 3 ICaculateImpl.Add(Session session, int p1, int p2) 4 { 5 var result = p1 + p2; 6 session.Cllback.OnAdd(result); 7 } 8 }
雙向會話式解決了請求的異步處理以及服務器的雙向通訊問題,但是也給調用者帶來了一些不便,例如上例中如果調用方發起多個Add請求,在收到OnAdd消息后如何將結果與請求關聯起來呢?一種解決方案是在Add請求中多加一個request id參數,服務器在處理完Add之后將request id放到OnAdd方法中和結果一起傳給客戶端,客戶端根據request id來關聯請求與結果。這種手工處理的方式代碼寫起來很麻煩,那么有沒有一種更好的RPC模式來解決這個問題呢?這就是下面給大家介紹的支持異步調用的RPC設計。
異步調用的主要設計思想是在雙向會話式的基礎上讓調用方通過一個回調函數來獲得請求的結果,而不再通過Callback接口來獲得結果。采用回調函數的好處在於我們可以使用閉包來隱式的關聯請求和響應之間的上下文,這樣就不需要顯式的傳遞request id來手工關聯上下文了。並且服務器仍然可以通過Callback接口向客戶端主動發送消息,保留了原來雙向通訊的優點。但是需要注意的是由於請求在服務器上可能是異步執行的,所以服務器不保證請求的響應是按順序返回的,這可能造成一些隱含的亂序問題,需要客戶端在調用時特別注意。如果響應需要嚴格的按照請求順序返回客戶端,那么服務端需要同步處理請求,或者引入隊列機制對異步的響應進行排隊有序返回響應。
之前的ICalculate就可以這樣定義:
[Protocol(ID=1), Callback(typeof(ICalculateCallback))] public interface ICalculate { [DispId(1), Async] void Add(int p1, int p2, Action<int> OnResult, Action<int,string> OnError = null); }
用Async這個標簽表示這個請求為異步請求,調用者用OnResult回調函數來接收請求的結果,OnError則為返回錯誤的回調函數,如果調用者不關心錯誤返回,那么可以不傳遞OnError,而在IServiceCallback的OnError方法中接收錯誤信息。
調用者可以很方便的使用閉包來處理結果,同時隱藏異步的實現細節,像這樣:
1 void TestAdd(ICalculateProxy calculate, int p1, int p2) 2 { 3 calculate.Add(p1, p2, result => MessageBox.Show(string.Format("{0} + {1} = {2}", p1, p2, result), (errCode, errMsg) => MessageBox.Show("Add failed:" + errMsg)); 4 }
服務器端的實現是這樣的:
1 public class CaculateService : ICaculateImpl 2 { 3 ICaculateImpl.Add(Session session, int p1, int p2, ICaculate_AddAsyncReply reply) 4 { 5 try 6 { 7 var result = p1 + p2; 8 reply.OnResult(result); 9 } 10 catch(OverflowException e) 11 { 12 reply.OnError(-1, e.Message); 13 } 14 } 15 }
ICaculate_AddAsyncReply為框架生成的返回異步結果的對象,有一個OnResult和一個OnError方法。有了這個reply對象之后,服務器的請求處理也可以實現異步處理,客戶端請求不需要在請求函數里一次完成,而是可以放到其他線程或者異步方法中處理,稍后在通過reply向客戶端返回結果。
下面我們來看看框架在背后為我們做的一些實現細節,首先是客戶端的Proxy:
1 //在Proxy中使用一個RequestContext結構保存請求的上下文信息,上下文中記錄某個請求的唯一id,在調用時一起發送到服務器: 2 struct RequestContext 3 { 4 public int reqId; 5 public Delegate OnResult; 6 public Action<int, string> OnError; 7 8 public RequestContext(int id, Delegate onResult, Action<int, string> onError) 9 { 10 reqId = id; 11 OnResult = onResult; 12 OnError = onError; 13 } 14 } 15 16 //服務器返回響應之后proxy就找出reqId對應的請求上下文,然后調用對應的回調函數傳遞結果
17 void OnAddReply(BinaryStreamReader __reader) 18 { 19 int reqId; 20 int ret; 21 __reader.Read(out reqId); 22 __reader.Read(out ret); 23 if(ret == 0) 24 { 25 int p0; 26 __reader.Read(out p0); 27 RequestContext ctx = PopAsyncRequest(reqId); 28 var __onResult = ctx.OnResult as Action<int>; 29 __onResult(p0); 30 } 31 else 32 { 33 RequestContext ctx = PopAsyncRequest(reqId); 34 string msg; 35 __reader.Read(out msg); 36 if(ctx.OnError != null) 37 ctx.OnError(ret, msg); 38 else 39 _handler.OnError(ret, msg); 40 } 41 }
服務端的一些實現細節:
1 //框架生成請求對應的異步響應類 2 public class ICaculate_AddAsyncReply : AsyncReply 3 { 4 public ICaculate_AddAsyncReply(int reqId, Connection conn) 5 { 6 _reqId = reqId; 7 _connection = conn; 8 } 9 10 public void OnError(int error, string msg) 11 { 12 var stream = new BinaryStreamWriter(); 13 stream.Write(1); 14 stream.Write(_reqId); 15 stream.Write(error); 16 stream.Write(msg); 17 _connection.Write(stream.BuildSendBuffer()); 18 } 19 public void OnResult(int result) 20 { 21 var stream = new BinaryStreamWriter(); 22 stream.Write(1); 23 stream.Write(_reqId); 24 stream.Write(0); 25 stream.Write(result); 26 _connection.Write(stream.BuildSendBuffer()); 27 } 28 }
框架生成的Stub類將收到的請求數據進行解析然后調用具體服務類來處理請求:
1 void AddInvoke(ICaculateImpl __service, Session __client, BinaryStreamReader __reader) 2 { 3 int p1; 4 int p2; 5 int __reqId; 6 __reader.Read(out __reqId); 7 __reader.Read(out p1); 8 __reader.Read(out p2); 9 var reply = new ICaculate_AddAsyncReply(__reqId, __client.Connection); 10 try 11 { 12 __service.Add(__client, p1, p2, reply); 13 } 14 catch(ServiceException e) 15 { 16 reply.OnError(e.ErrCode, e.Message); 17 Log.Info("Service Invoke Failed. clientId:{0} error message:{1}", __client.ID, e.Message); 18 } 19 catch(Exception e) 20 { 21 reply.OnError((int)ServiceErrorCode.Generic, "generic service error."); 22 Log.Error("Generic Service Invoke Failed, clientId:{0} error message:{1}\nCall Stack: {2}", __client.ID, e.Message, e.StackTrace); 23 } 24 }
由於完整的框架代碼比較龐大,所以上面只貼了關鍵部分的實現細節。從實現細節我們可以看到,框架實際上也是通過request id來關聯請求和響應函數之間的上下文的,但是通過代碼生成機制隱藏了實現的細節,給使用者提供了一種優雅的抽象。
總結:在雙向會話式的RPC基礎上,引入了一種新的異步請求調用模式,讓調用者可以通過閉包來方便的異步處理請求的響應結果,同時服務器端的請求處理也可以實現異步處理。