[2017-10-25]Abp系列——集成消息隊列功能(基於Rebus.Rabbitmq)



本系列目錄:Abp介紹和經驗分享-目錄

前言

由於提交給ABP作者的集成消息隊列機制的PR還未Review完成,本篇以Abplus中的代碼為基准來介紹ABP集成消息隊列機制的方案。

Why

為什么需要消息隊列機制?

  1. 發布-訂閱模式,解耦業務
  2. 非必須強一致性的業務場景,借助消息隊列剝離到離線處理
  3. 各種通知,站內通知、郵件通知、手機短信、微信推送

以上幾點,並非互相獨立,是幾個互相聯系的特點。
開發框架擁有消息隊列機制的好處,可以通過以下幾個典型場景舉例來說明:

訂單支付成功

當第三方支付平台回調通知訂單支付成功時,如果沒有消息隊列,那么:

  • 我們必須把后續的業務邏輯和修改訂單支付狀態的代碼都寫在一起,根據業務的復雜程度,這個響應時間可能會非常長,容易造成超時
  • 或者第三方支付平台短時間重試多次,造成業務邏輯重復執行
  • 也可能業務邏輯比較復雜,后續其他邏輯(通知推送之類)處理異常,導致整個支付成功邏輯全部回滾

而當我們有消息隊列機制支持時:

  • 我們一接收到第三方支付平台回調,立馬僅處理訂單狀態和核心的業務邏輯(比如支付后扣庫存),其他業務邏輯通過訂閱消息去處理
  • 甚至可以一接收到第三方支付平台回調,立即構建訂單支付成功消息放入消息隊列,這樣對於第三方支付平台可以立即收到處理成功的響應
  • 后續其他類似通知推送的非關鍵需求,其成敗不影響關鍵邏輯,而且可以一直重試直到成功

物流信息同步

做電商網站時,如果訂單比較多,每個訂單都需要訂閱第三方物流信息,類似上面支付,當物流信息(一個訂單多條運送路徑記錄)推送過來時,可能會比較密集(短時間內幾千上萬個請求),直接寫入數據庫(需要先比對,只寫入增量部分)可能不是一個好方案。
這個時候,放到消息隊列里,由消費端按自己的節奏一條條處理最為保險。

類似的數據同步方案,對實時性要求不太高,單個處理邏輯比較復雜,短時間內數量較大,都可以考慮排隊處理。

批量業務處理

復雜業務的批量處理一直是比較頭疼的事情(查詢條件、事務等),如果有消息隊列,只要查詢得到所有可能需要處理的對象,放隊列排隊,轉變成單個業務處理,可以避免很多麻煩,並且可以控制處理進度,也不必擔心其中可能發生幾個異常影響了其他正常處理。而且,轉成單個業務處理,可以繼續發布事件和消息,進行后續流程和業務的觸發,比如郵件通知或短信通知等供應商限制了調用頻率的服務。

高負載時,削峰填谷

如題,這一點是指將請求進行排隊的能力,可以臨時增加處理端以提高請求處理能力,此類方案大多涉及UI交互的變更,具體場景不展開。

這一塊,如果UI交互采用異步模式(比如用戶提交訂單,但不立馬告訴用戶訂單是否創建成功,只反饋創建訂單的請求提交成功,訂單真正創建成功以其他方式通知用戶,再讓用戶去支付),則屬於上面提到過的離線處理,本文介紹的集成方式可以直接支持;

如果采用同步模式(提交請求,經過較長時間后響應,需要用戶等待,處理依然可以通過消息隊列排隊),則可能需要具體挖掘下RabbitMQ或您采用的其他消息中間件的Request-Response 模式用法,當然也需要注意一下Http請求處理超時的問題。

其他好處

總的來說:

業務邏輯編碼方面,思維方式逐漸脫離面向數據 ,可以更加關注事件關注消息,更加面向領域面向對象 ,核心業務邏輯可維護性增強、可擴展性增強。

系統架構風格方面,支持事件驅動消息驅動可監控性可擴展性,更利於系統演化,后期任何時候可通過訂閱消息無侵入式的采集數據,拓展功能。

Tips:
消息隊列的可靠性,比如RabbitMQ,是經過電信行業檢驗的,集群、持久化、自動故障轉移等特性都支持,而且集群配置和集群升級方案都比較簡單方便且成熟。

RabbitMQ安裝請參考RabbitMQ Notes

How

前面啰嗦了一大堆,下面進入正文,ABP如何集成消息隊列機制。

如果嫌本文排版不好,也可以直接看提交給ABP作者的集成消息隊列機制的PR,PR中的說明和用法示例基本和abplus擴展庫中的一致。

1.增加命名空間

這里強調一下概念區別,消息(Messages)和事件(Events)是不同的。

  1. 在ABP中,有個EventBus體系,Events和各種Handlers,重點是進程內的業務邏輯解耦,Handlers的代碼是可以共享工作單元(UnitOfWork)的,對於允許延后驗證的邏輯,是可以隨時添加一個單獨的Handler去校驗並拋出異常,即可以回滾工作單元(事務);
  2. 而Messages體系,在Abp框架內,目前並未集成。Messages的重點是向進程外的系統(外部系統)進行通知,所以有工作單元和事務的時候,一定是事務提交成功后才對外通知消息,否則一旦對外通知了,又發生了事務回滾,再追回消息或者撤銷消息就會比較麻煩(消息關聯事務或者分布式事務是另一種情況,這里不做介紹)。

所以,我們首先在Abplus程序集中引入消息發布器的抽象概念:

IMqMessagePublisher

namespace Abp.MqMessages
{
    /// <summary>
    /// 消息發布接口
    /// </summary>
    public interface IMqMessagePublisher : ITransientDependency
    {
        /// <summary>
        /// 發布
        /// </summary>
        /// <param name="mqMessages"></param>
        void Publish(object mqMessages);

        /// <summary>
        /// 發布
        /// </summary>
        /// <param name="mqMessages"></param>
        /// <returns></returns>
        Task PublishAsync(object mqMessages);
    }
}

並且提供空實現:

NullMqMessagePublisher

限於篇幅,除簡單示意和關鍵代碼外,本文只給github源碼鏈接,不再貼出詳細代碼。

2.提供IMqMessagePublisher的RebusRabbitMQ實現

我們集成RabbitMQ,借用了Rebus框架的實現,Rebus在對接RabbitMQ上做了比較好的抽象和封裝,Rebus Wiki

程序集Abplus.MqMessages.RebusCore,核心實現:

RebusRabbitMqPublisher

其中,以反射方式,抓取了一些Session信息:

private void TryFillSessionInfo(object mqMessages)
{
    if (AbpSession.UserId.HasValue)
    {
        var operatorUserIdProperty = mqMessages.GetType().GetProperty("OperatorUserId");
        if (operatorUserIdProperty != null && (operatorUserIdProperty.PropertyType == typeof(long?)))
        {
            operatorUserIdProperty.SetValue(mqMessages, AbpSession.UserId);
        }
    }

    if (AbpSession.TenantId.HasValue)
    {
        var tenantIdProperty = mqMessages.GetType().GetProperty("TenantId");
        if (tenantIdProperty != null && (tenantIdProperty.PropertyType == typeof(int?)))
        {
            tenantIdProperty.SetValue(mqMessages, AbpSession.TenantId);
        }
    }
}

程序集Abplus.MqMessages.RebusCore是同時被Abplus.MqMessages.RebusPublisherAbplus.MqMessages.RebusRabbitMqConsumer依賴的,以便消費端依然具有發布消息的能力。

3.封裝發布模塊,便於項目使用

程序集Abplus.MqMessages.RebusPublisher,發布模塊:

RebusRabbitMqPublisherModule

這個模塊,是封裝消息隊列的配置和啟動連接,使用時,在項目啟動模塊上配好[DependsOn(typeof(RebusRabbitMqPublisherModule))],並引入命名空間Abp.Configuration.Startup,即可配置相關參數,示例如下:

namespace Sample
{
    [DependsOn(typeof(RebusRabbitMqPublisherModule))]
    public class SampleRebusRabbitMqPublisherModule : AbpModule
    {
        public override void PreInitialize()
        {
            Configuration.Modules.UseRebusRabbitMqPublisher()
                .UseLogging(c => c.NLog())
                .ConnectionTo("amqp://dev:dev@rabbitmq.local.cn/dev_host");

            Configuration.BackgroundJobs.IsJobExecutionEnabled = true;
        }

        public override void Initialize()
        {
            IocManager.RegisterAssemblyByConvention(Assembly.GetExecutingAssembly());
        }

        public override void PostInitialize()
        {
            Abp.Dependency.IocManager.Instance.IocContainer.AddFacility<LoggingFacility>(f => f.UseNLog().WithConfig("nlog.config"));

            var workManager = IocManager.Resolve<IBackgroundWorkerManager>();
            workManager.Add(IocManager.Resolve<TestWorker>());// to send TestMqMessage every 3 seconds
        }
    }
}

只要項目配置好模塊依賴,即可在代碼中通過構造函數注入或者屬性注入使用IMqMessagePublisher接口進行消息發布。

4.封裝消費端模塊

同消息發布模塊,消費模塊在程序集Abplus.MqMessages.RebusConsumer中,代碼見:

RebusRabbitMqConsumerModule

使用方式:

namespace Sample
{
    [DependsOn(typeof(RebusRabbitMqConsumerModule))]
    public class SampleRebusRabbitMqConsumerModule : AbpModule
    {
        public override void PreInitialize()
        {
            Configuration.Modules.UseRebusRabbitMqConsumer()
                .UseLogging(c => c.NLog())
                .ConnectTo("amqp://dev:dev@rabbitmq.local.cn/dev_host")
                //以當前項目名作為隊列名
                .UseQueue(Assembly.GetExecutingAssembly().GetName().Name)
                //register assembly whitch has rebus handlers
                .RegisterHandlerInAssemblys(Assembly.GetExecutingAssembly());
        }

        public override void Initialize()
        {
            base.Initialize();
        }

        public override void PostInitialize()
        {
            Abp.Dependency.IocManager.Instance.IocContainer.AddFacility<LoggingFacility>(f => f.UseNLog().WithConfig("nlog.config"));
        }
    }
}

只要項目配置好模塊依賴,消費端亦可在代碼中通過構造函數注入或者屬性注入使用IMqMessagePublisher接口進行消息發布。

消費端的RebusHanlder示例:

namespace Sample.Handlers
{
    public class TestHandler : IHandleMessages<TestMqMessage>
    {
        public ILogger Logger { get; set; }
        public IMqMessagePublisher Publisher { get; set; }
        public TestHandler()
        {
            Publisher = NullMqMessagePublisher.Instance;
        }

        public async Task Handle(TestMqMessage message)
        {
            var msg = $"{Logger.GetType()}:{message.Name},{message.Value},{message.Time}";
            Logger.Debug(msg);
            await Publisher.PublishAsync(msg);//send it again!
        }
    }
}

5.忘了說,消息格式的定義

消息的定義,不依賴任何框架,也不依賴Abp或者Abplus,因為前面發布接口IMqMessagePublisher定義時采用的類型是object

void Publish(object mqMessages);

例如:

namespace Sample.MqMessages
{
    /// <summary>
    /// Custom MqMessage Definition. No depends on any framework,this class library can be shared as nuget pkg.
    /// </summary>
    public class TestMqMessage
    {
        public string Name { get; set; }
        public string Value { get; set; }
        public DateTime Time { get; set; }
    }
}

這個消息定義可以單獨作為一個程序集,發布到私有nuget服務器中,以便團隊共享

Tips & Extended

Abplus中還提供了幾個消息隊列相關的常用實現:

  1. Abplus IMessageTracker 定義消費端處理冪等機制的接口
  2. Abplus.MqMessages EventDataPublishHandlerBase<TEventData, TMqMessage> EventData和MqMessage一對一的泛型版抽象發布Handler,這是EventDataHandler
  3. Abplus.MqMessages AbpMqHandlerBase是包含IMessageTracker屬性的消費端MqHandler抽象基類
  4. Abplus.MqMessages.AuditingStore 審計日志發布到消息隊列
  5. Abplus.MqMessages.RedisStoreMessageTracker 消費端消費行為的冪等支持,基於Redis存儲,Rebus Handler的重試機制和冪等處理

上述具體實現請參考Abplus代碼庫


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM