前言
根據業務處理部分,單體馬上就能得知錯誤與否,快速做出處理,而分布式系統,會因為各種原因,無法如同單體一樣立刻處理,所以這個時候需要 處理異常 的,做 補償、轉移、人工干預。
當然也可以直接在消費端做重試/限流和熔斷,但是個人理解,不建議,處理失敗的轉移到低優先順序的隊列,由專門處理失敗消費的部分來處理問題,在實際操作中,可以是單獨的服務器,不影響業務流程線,處理失敗則發送消息,人為干預。
消費/執行錯誤的處理流程
無論的消費訂閱還是Rpc遠程調用,一旦處理失敗,都應當有 轉移錯誤、補償以及人工干預 的異常處理
Masstransit默認是有重試的,所以可以根據獲取到的重試次數和錯誤信息,選擇是 轉移 還是直接 人工干預,補償則一般應用於支付相關服務,失敗則立刻退款/人工處理,這種失敗了,重試還是失敗
實例編寫
基於發布訂閱的失敗處理
private async Task Execute(ConsumeContext context, ExceptionInfo exceptionInfo) { await Console.Out.WriteLineAsync($"Type:{exceptionInfo.ExceptionType} Message:{exceptionInfo.Message}"); if (exceptionInfo.InnerException != null) { await Execute(context, exceptionInfo.InnerException); } } public async Task Consume(ConsumeContext<Fault<PayOrderEvent>> context) { var retryCount = context.GetRetryCount(); if (retryCount == 5) { if (EndpointConvention.TryGetDestinationAddress<PayOrderEvent>(out Uri endPointUrl)) { await context.Forward(endPointUrl); } } else { var exceptions = context.Message.Exceptions; foreach (var exceptionInfo in exceptions) { await Execute(context, exceptionInfo); } } }
這是一段很簡單的業務邏輯,PayOrderEvent 的消息消費失敗后,則會進入這個專門處理 PayOrderEvent 消費失敗的方法
當重試處理第三次,獲取錯誤信息,嘗試處理錯誤信息,當重試第五次則轉移隊列
基於Rpc的消息異常處理
以上這段代碼,只適用於發布訂閱,Masstransit的Rpc模式則 無法捕獲異常,只能在代碼段上做try catch處理,或者引入AspectCore做捕獲到異常
基於Rpc模式的異常捕獲
基於代碼而言try catch足以,但是出於項目管理的角度,應該采用AOP(Aspect-Oriented Programming)做代碼的職責分離,預留出足夠多的通用層,這里AOP部分采用Lemon的作品 AspectCore
AOP特性部分實現如下
public class RpcConsumerAttribute : AbstractInterceptorAttribute { public override async Task Invoke(AspectContext context, AspectDelegate next) { try { await next(context); } catch { var _arg = context.Parameters[0]; if (_arg is ConsumeContext consume) { var consumeType = consume.GetType().GetGenericArguments()[1]; var property = consume.GetType().GetProperty("Message").GetReflector(); var arg = property.GetValue(consume); var argType = arg.GetType(); var faultType = typeof(FaultMessage<>).MakeGenericType(argType); var constructor = faultType.GetConstructor(new[] { argType }); var constructorInfo = constructor.GetReflector(); var instance = constructorInfo.Invoke(arg); var busControl = context.ServiceProvider.GetRequiredService<IBusControl>(); await busControl.Publish(instance); } } } }
簡單的try catch,在異常處理部分使用反射獲取出未正常執行的實體,然后發布 FaultMessage<Message> ,其中 Message 是未正常處理的實體類型
其中反射里攜帶的 GetReflector 取自於Lemon的 AspectCore.Extensions.Reflection ,原理是構建Emit代碼,構建委托,然后換成下來,從單純的反射,變成調用委托執行反射代碼,避免的傳統的反射調用的嵌套反射執行
public class FaultMessage<TEntity> where TEntity:class { public TEntity Entity { get; } public FaultMessage(TEntity entity) { Entity = entity; } }
處理業務部分代碼如下
[RpcConsumer]
public virtual async Task Consume(ConsumeContext<PayOrderEvent> context) { //... 業務代碼段不再贅述 } public Task Consume(ConsumeContext<FaultMessage<PayOrderEvent>> context) { var message = context.Message; var sourceMessage = message.Entity; return Task.CompletedTask; }
調用部分遵循Rpc調用即可
var entity = new PayOrderEvent { SourceId = 1, TargetId = 2, Money = 2000 }; var response = await busControl.Request<PayOrderEvent, PayOrderResponse>(entity);
消費 PayOrderEvent 的 部分,只要正常的Ef Core原封不動的寫數據即可,執行兩次,造成 主鍵沖突,則被 RpcConsumerAttribute 捕獲異常,觸發發布到 FaultMessage<PayOrderEvent> 的消費端即可
后話
引入 AspectCore 的這部分,踩了一些雷,感謝社區的 茶姨以及檸檬的幫助 深感個人實力非常薄弱,很多東西未能深入理解,學到用時方恨少
打個小廣告
如果有技術交流可以加NCC的群 24791014、436035237,我在群里,有任何關於asp.net core/Masstransit的問題或者建議都可以與我交流,非常歡迎
示例代碼: