基於asp.net core 從零搭建自己的業務框架(三)


 

前言

根據業務處理部分,單體馬上就能得知錯誤與否,快速做出處理,而分布式系統,會因為各種原因,無法如同單體一樣立刻處理,所以這個時候需要 處理異常 的,做 補償轉移人工干預

當然也可以直接在消費端做重試/限流和熔斷,但是個人理解,不建議,處理失敗的轉移到低優先順序的隊列,由專門處理失敗消費的部分來處理問題,在實際操作中,可以是單獨的服務器,不影響業務流程線,處理失敗則發送消息,人為干預。


消費/執行錯誤的處理流程

 

 

無論的消費訂閱還是Rpc遠程調用,一旦處理失敗,都應當有 轉移錯誤補償以及人工干預 的異常處理

Masstransit默認是有重試的,所以可以根據獲取到的重試次數和錯誤信息,選擇是 轉移 還是直接 人工干預補償則一般應用於支付相關服務,失敗則立刻退款/人工處理,這種失敗了,重試還是失敗


實例編寫

 

基於發布訂閱的失敗處理

        private async Task Execute(ConsumeContext context, ExceptionInfo exceptionInfo)
        {
            await Console.Out.WriteLineAsync($"Type:{exceptionInfo.ExceptionType} Message:{exceptionInfo.Message}");

            if (exceptionInfo.InnerException != null)
            {
                await Execute(context, exceptionInfo.InnerException);
            }
        }

        public async Task Consume(ConsumeContext<Fault<PayOrderEvent>> context)
        {
            var retryCount = context.GetRetryCount();

            if (retryCount == 5)
            {
                if (EndpointConvention.TryGetDestinationAddress<PayOrderEvent>(out Uri endPointUrl))
                {
                    await context.Forward(endPointUrl);
                }
            }
            else
            {
                var exceptions = context.Message.Exceptions;

                foreach (var exceptionInfo in exceptions)
                {
                    await Execute(context, exceptionInfo);
                }
            }
        }

 

這是一段很簡單的業務邏輯,PayOrderEvent 的消息消費失敗后,則會進入這個專門處理 PayOrderEvent 消費失敗的方法

當重試處理第三次,獲取錯誤信息,嘗試處理錯誤信息,當重試第五次則轉移隊列

 

基於Rpc的消息異常處理

以上這段代碼,只適用於發布訂閱,Masstransit的Rpc模式則 無法捕獲異常,只能在代碼段上做try catch處理,或者引入AspectCore做捕獲到異常

基於Rpc模式的異常捕獲

基於代碼而言try catch足以,但是出於項目管理的角度,應該采用AOP(Aspect-Oriented Programming)做代碼的職責分離,預留出足夠多的通用層,這里AOP部分采用Lemon的作品 AspectCore

 

AOP特性部分實現如下

    public class RpcConsumerAttribute : AbstractInterceptorAttribute
    {
        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            try
            {
                await next(context);
            }
            catch
            {
                var _arg = context.Parameters[0];

                if (_arg is ConsumeContext consume)
                {
                    var consumeType = consume.GetType().GetGenericArguments()[1];
                    var property = consume.GetType().GetProperty("Message").GetReflector();
                    var arg = property.GetValue(consume);
                    var argType = arg.GetType();

                    var faultType = typeof(FaultMessage<>).MakeGenericType(argType);
                    var constructor = faultType.GetConstructor(new[] { argType });
                    var constructorInfo = constructor.GetReflector();
                    var instance = constructorInfo.Invoke(arg);

                    var busControl = context.ServiceProvider.GetRequiredService<IBusControl>();
                    await busControl.Publish(instance);
                }
            }
        }
    }

 

簡單的try catch,在異常處理部分使用反射獲取出未正常執行的實體,然后發布 FaultMessage<Message> ,其中 Message 是未正常處理的實體類型

其中反射里攜帶的 GetReflector 取自於Lemon的 AspectCore.Extensions.Reflection ,原理是構建Emit代碼,構建委托,然后換成下來,從單純的反射,變成調用委托執行反射代碼,避免的傳統的反射調用的嵌套反射執行

 

    public class FaultMessage<TEntity>
        where TEntity:class
    {
        public TEntity Entity { get; }

        public FaultMessage(TEntity entity)
        {
            Entity = entity;
        }
    }

 

處理業務部分代碼如下

        [RpcConsumer]
public virtual async Task Consume(ConsumeContext<PayOrderEvent> context) { //... 業務代碼段不再贅述 } public Task Consume(ConsumeContext<FaultMessage<PayOrderEvent>> context) { var message = context.Message; var sourceMessage = message.Entity; return Task.CompletedTask; }

 

調用部分遵循Rpc調用即可

                var entity = new PayOrderEvent
                {
                    SourceId = 1,
                    TargetId = 2,
                    Money = 2000
                };
                var response = await busControl.Request<PayOrderEvent, PayOrderResponse>(entity);

消費 PayOrderEvent 的 部分,只要正常的Ef Core原封不動的寫數據即可,執行兩次,造成 主鍵沖突,則被 RpcConsumerAttribute 捕獲異常,觸發發布到 FaultMessage<PayOrderEvent> 的消費端即可

 


后話

引入 AspectCore 的這部分,踩了一些雷,感謝社區的 茶姨以及檸檬的幫助 深感個人實力非常薄弱,很多東西未能深入理解,學到用時方恨少

 

打個小廣告

如果有技術交流可以加NCC的群 24791014、436035237,我在群里,有任何關於asp.net core/Masstransit的問題或者建議都可以與我交流,非常歡迎

 

示例代碼:

https://github.com/htrlq/Crud.Sample


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM