NServiceBus+Saga開發分布式應用


前言

當你在處理異步消息時,每個單獨的消息處理程序都是一個單獨的handler,每個handler之間互不影響。這時如果一個消息依賴另一個消息的狀態呢? 這時業務邏輯怎么處理?

image.png

借用我們上篇文章的業務場景,如果在Ship項目里需要發送一個ShipOrder Command。這個ShipOrder需要依賴Sales.OrderPlaced和Bill.OrderBilled Command的狀態,目前我們的兩個單獨的Message Handler都沒有保持任何的狀態字段,所以這時如果我們需要完成這個業務模型,就需要跟蹤他們的狀態。

什么是Saga

這個就是本篇文章要提的saga,定義在NServiceBus框架里,他的本質是一個消息驅動模型里的狀態機,或者也可以理解為一系列消息處理程序用來共享狀態的業務模型。我理解在消息隊列里如果我們要保證消息一致性通常會自己創建一張Event表,這里saga維持狀態的角色有點像我們這里的Event表。
   
屏幕快照 2019-10-23 20.46.15.png
   好的,回到正題上,如果我們需要在Shipping Service里發送一個ShipOrder,發送他之前需要確定OrderPlaced和OrderBilled的狀態,確保這兩個消息都收到以后才能發送ShipOrder。

如何使用Saga

當然,我暫且理解Saga的目的是為了處理在長時間運行的任務里保證數據一致性這樣的一個角色。

Saga狀態

saga狀態主要是告訴NServiceBus在處理數據一致性的判斷邏輯,這里需要繼承抽象類ContainSagaData,在我們這個業務場景中則主要是判斷OrderPlaced和OrderBilled消息是否已經接收到並處理。

public class ShippingPolicyData:ContainSagaData
{
   public string OrderId { get; set; }
   public bool IsOrderPlaced { get; set; }
   public bool IsOrderBilled { get; set; }
}

Saga如何工作

有了狀態以后,我們還需要一個“handler”來告訴NServiceBus,在這個handler里主要用來處理消息數據一致性,我看了官方文檔后,他們建議我們這里的handler角色使用Policy后綴命名,當然我覺的也可以用Saga后綴命名,比如ShippingPolicy或者ShippingSaga。
   同時這里我們這個handler覺色還要繼承Saga 類,Saga類主要重寫方法ConfigureHowToFindSaga,這個方法的作用主要是在接受的消息和我們的Saga實體之間建立映射關系。

 public class ShipPolicy:Saga<ShippingPolicyData>,
        IAmStartedByMessages<OrderPlaced>,
        IAmStartedByMessages<OrderBilled> //都可以創建Saga實例
    {
        private static ILog log = LogManager.GetLogger<ShipPolicy>();

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper)
        {
            mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
            mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
            
        }

        public Task Handle(OrderPlaced message, IMessageHandlerContext context)
        {
            log.Info("OrderPlaced message received ");
            this.Data.IsOrderPlaced = true;
            return ProcessOrder(context);
        }

        public Task Handle(OrderBilled message, IMessageHandlerContext context)
        {
            
            log.Info("OrderBilled message received");
            this.Data.IsOrderBilled = true;
            return ProcessOrder(context);
        }

        private async Task ProcessOrder(IMessageHandlerContext context)
        {
            if (Data.IsOrderBilled && Data.IsOrderPlaced)
            {
                await context.SendLocal(new ShipOrder()
                {
                    OrderId = Data.OrderId
                });
                
                MarkAsComplete();
            }
        }
    }

這個類里你會發現還實現了接口**IAmStartedByMessages , **這個接口主要是告訴Saga,不論是那種消息類型先進來,都可以創建一個Saga實例,就比如是Event表,不管那個消息進來,都需要先插入一條數據,后續消息再進來時要更新數據狀態,當然,這里的Saga實例也好,Event表也好,關鍵問題就是有效標識,或者叫主鍵,我們這個業務模型里,OrderPlaced和OrderBilled都包含一個屬性OrderId, 這里Saga實例則使用這個OrderId做關鍵屬性。

發送ShipOrder Command

到這里也就是我們的OrderPlaced和OrderBIlled消息都收到了,業務邏輯符合要求,可以發送ShipOrder消息了,也就是用戶創建了訂單,付了款,可以發貨了。
image.png

新建ShipOrder類

public class ShipOrder:ICommand
{
    public string OrderId { get; set; }
}

新建ShipOrderHandler

public class ShipOrderHandler:IHandleMessages<ShipOrder>
{
   private static ILog log = LogManager.GetLogger<ShipOrderHandler>();
   public Task Handle(ShipOrder message, IMessageHandlerContext context)
   {
       log.Info($"Order [{message.OrderId}] - Successfully shipped");
       return Task.CompletedTask;
   }
}

運行Shipping項目,看到下圖,則說明程序運行成功,我們這個業務場景里OrderPlaced消息肯定先接受到,OrderBilled消息后接受到。
屏幕快照 2019-10-23 20.29.05.png

參考鏈接

https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/
https://docs.particular.net/nservicebus/sagas/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM