本系列目錄:Abp介紹和經驗分享-目錄
前言
由於提交給ABP作者的集成消息隊列機制的PR還未Review完成,本篇以Abplus中的代碼為基准來介紹ABP集成消息隊列機制的方案。
Why
為什么需要消息隊列機制?
- 發布-訂閱模式,解耦業務
- 非必須強一致性的業務場景,借助消息隊列剝離到離線處理
- 各種通知,站內通知、郵件通知、手機短信、微信推送
以上幾點,並非互相獨立,是幾個互相聯系的特點。
開發框架擁有消息隊列機制的好處,可以通過以下幾個典型場景舉例來說明:
訂單支付成功
當第三方支付平台回調通知訂單支付成功時,如果沒有消息隊列,那么:
- 我們必須把后續的業務邏輯和修改訂單支付狀態的代碼都寫在一起,根據業務的復雜程度,這個響應時間可能會非常長,容易造成超時
- 或者第三方支付平台短時間重試多次,造成業務邏輯重復執行
- 也可能業務邏輯比較復雜,后續其他邏輯(通知推送之類)處理異常,導致整個支付成功邏輯全部回滾
而當我們有消息隊列機制支持時:
- 我們一接收到第三方支付平台回調,立馬僅處理訂單狀態和核心的業務邏輯(比如支付后扣庫存),其他業務邏輯通過訂閱消息去處理
- 甚至可以一接收到第三方支付平台回調,立即構建訂單支付成功消息放入消息隊列,這樣對於第三方支付平台可以立即收到處理成功的響應
- 后續其他類似通知推送的非關鍵需求,其成敗不影響關鍵邏輯,而且可以一直重試直到成功
物流信息同步
做電商網站時,如果訂單比較多,每個訂單都需要訂閱第三方物流信息,類似上面支付,當物流信息(一個訂單多條運送路徑記錄)推送過來時,可能會比較密集(短時間內幾千上萬個請求),直接寫入數據庫(需要先比對,只寫入增量部分)可能不是一個好方案。
這個時候,放到消息隊列里,由消費端按自己的節奏一條條處理最為保險。類似的數據同步方案,對實時性要求不太高,單個處理邏輯比較復雜,短時間內數量較大,都可以考慮排隊處理。
批量業務處理
復雜業務的批量處理一直是比較頭疼的事情(查詢條件、事務等),如果有消息隊列,只要查詢得到所有可能需要處理的對象,放隊列排隊,轉變成單個業務處理,可以避免很多麻煩,並且可以控制處理進度,也不必擔心其中可能發生幾個異常影響了其他正常處理。而且,轉成單個業務處理,可以繼續發布事件和消息,進行后續流程和業務的觸發,比如郵件通知或短信通知等供應商限制了調用頻率的服務。
高負載時,削峰填谷
如題,這一點是指將請求進行排隊的能力,可以臨時增加處理端以提高請求處理能力,此類方案大多涉及UI交互的變更,具體場景不展開。
這一塊,如果UI交互采用異步模式(比如用戶提交訂單,但不立馬告訴用戶訂單是否創建成功,只反饋創建訂單的請求提交成功,訂單真正創建成功以其他方式通知用戶,再讓用戶去支付),則屬於上面提到過的離線處理,本文介紹的集成方式可以直接支持;
如果采用同步模式(提交請求,經過較長時間后響應,需要用戶等待,處理依然可以通過消息隊列排隊),則可能需要具體挖掘下RabbitMQ或您采用的其他消息中間件的Request-Response 模式用法,當然也需要注意一下Http請求處理超時的問題。
其他好處
總的來說:
業務邏輯編碼方面,思維方式逐漸脫離面向數據 ,可以更加關注事件、關注消息,更加面向領域、面向對象 ,核心業務邏輯可維護性增強、可擴展性增強。
系統架構風格方面,支持事件驅動、消息驅動、可監控性、可擴展性,更利於系統演化,后期任何時候可通過訂閱消息無侵入式的采集數據,拓展功能。
Tips:
消息隊列的可靠性,比如RabbitMQ,是經過電信行業檢驗的,集群、持久化、自動故障轉移等特性都支持,而且集群配置和集群升級方案都比較簡單方便且成熟。
RabbitMQ安裝請參考RabbitMQ Notes
How
前面啰嗦了一大堆,下面進入正文,ABP如何集成消息隊列機制。
如果嫌本文排版不好,也可以直接看提交給ABP作者的集成消息隊列機制的PR,PR中的說明和用法示例基本和abplus擴展庫中的一致。
1.增加命名空間
這里強調一下概念區別,消息(Messages)和事件(Events)是不同的。
- 在ABP中,有個EventBus體系,Events和各種Handlers,重點是進程內的業務邏輯解耦,Handlers的代碼是可以共享工作單元(UnitOfWork)的,對於允許延后驗證的邏輯,是可以隨時添加一個單獨的Handler去校驗並拋出異常,即可以回滾工作單元(事務);
- 而Messages體系,在Abp框架內,目前並未集成。Messages的重點是向進程外的系統(外部系統)進行通知,所以有工作單元和事務的時候,一定是事務提交成功后才對外通知消息,否則一旦對外通知了,又發生了事務回滾,再追回消息或者撤銷消息就會比較麻煩(消息關聯事務或者分布式事務是另一種情況,這里不做介紹)。
所以,我們首先在Abplus程序集
中引入消息發布器的抽象概念:
namespace Abp.MqMessages
{
/// <summary>
/// 消息發布接口
/// </summary>
public interface IMqMessagePublisher : ITransientDependency
{
/// <summary>
/// 發布
/// </summary>
/// <param name="mqMessages"></param>
void Publish(object mqMessages);
/// <summary>
/// 發布
/// </summary>
/// <param name="mqMessages"></param>
/// <returns></returns>
Task PublishAsync(object mqMessages);
}
}
並且提供空實現:
限於篇幅,除簡單示意和關鍵代碼外,本文只給github源碼鏈接,不再貼出詳細代碼。
2.提供IMqMessagePublisher的RebusRabbitMQ實現
我們集成RabbitMQ,借用了Rebus框架的實現,Rebus在對接RabbitMQ上做了比較好的抽象和封裝,Rebus Wiki。
程序集Abplus.MqMessages.RebusCore
,核心實現:
其中,以反射方式,抓取了一些Session信息:
private void TryFillSessionInfo(object mqMessages)
{
if (AbpSession.UserId.HasValue)
{
var operatorUserIdProperty = mqMessages.GetType().GetProperty("OperatorUserId");
if (operatorUserIdProperty != null && (operatorUserIdProperty.PropertyType == typeof(long?)))
{
operatorUserIdProperty.SetValue(mqMessages, AbpSession.UserId);
}
}
if (AbpSession.TenantId.HasValue)
{
var tenantIdProperty = mqMessages.GetType().GetProperty("TenantId");
if (tenantIdProperty != null && (tenantIdProperty.PropertyType == typeof(int?)))
{
tenantIdProperty.SetValue(mqMessages, AbpSession.TenantId);
}
}
}
程序集Abplus.MqMessages.RebusCore
是同時被Abplus.MqMessages.RebusPublisher
和Abplus.MqMessages.RebusRabbitMqConsumer
依賴的,以便消費端依然具有發布消息的能力。
3.封裝發布模塊,便於項目使用
程序集Abplus.MqMessages.RebusPublisher
,發布模塊:
這個模塊,是封裝消息隊列的配置和啟動連接,使用時,在項目啟動模塊上配好[DependsOn(typeof(RebusRabbitMqPublisherModule))]
,並引入命名空間Abp.Configuration.Startup
,即可配置相關參數,示例如下:
namespace Sample
{
[DependsOn(typeof(RebusRabbitMqPublisherModule))]
public class SampleRebusRabbitMqPublisherModule : AbpModule
{
public override void PreInitialize()
{
Configuration.Modules.UseRebusRabbitMqPublisher()
.UseLogging(c => c.NLog())
.ConnectionTo("amqp://dev:dev@rabbitmq.local.cn/dev_host");
Configuration.BackgroundJobs.IsJobExecutionEnabled = true;
}
public override void Initialize()
{
IocManager.RegisterAssemblyByConvention(Assembly.GetExecutingAssembly());
}
public override void PostInitialize()
{
Abp.Dependency.IocManager.Instance.IocContainer.AddFacility<LoggingFacility>(f => f.UseNLog().WithConfig("nlog.config"));
var workManager = IocManager.Resolve<IBackgroundWorkerManager>();
workManager.Add(IocManager.Resolve<TestWorker>());// to send TestMqMessage every 3 seconds
}
}
}
只要項目配置好模塊依賴,即可在代碼中通過構造函數注入或者屬性注入使用IMqMessagePublisher
接口進行消息發布。
4.封裝消費端模塊
同消息發布模塊,消費模塊在程序集Abplus.MqMessages.RebusConsumer
中,代碼見:
使用方式:
namespace Sample
{
[DependsOn(typeof(RebusRabbitMqConsumerModule))]
public class SampleRebusRabbitMqConsumerModule : AbpModule
{
public override void PreInitialize()
{
Configuration.Modules.UseRebusRabbitMqConsumer()
.UseLogging(c => c.NLog())
.ConnectTo("amqp://dev:dev@rabbitmq.local.cn/dev_host")
//以當前項目名作為隊列名
.UseQueue(Assembly.GetExecutingAssembly().GetName().Name)
//register assembly whitch has rebus handlers
.RegisterHandlerInAssemblys(Assembly.GetExecutingAssembly());
}
public override void Initialize()
{
base.Initialize();
}
public override void PostInitialize()
{
Abp.Dependency.IocManager.Instance.IocContainer.AddFacility<LoggingFacility>(f => f.UseNLog().WithConfig("nlog.config"));
}
}
}
只要項目配置好模塊依賴,消費端亦可在代碼中通過構造函數注入或者屬性注入使用IMqMessagePublisher
接口進行消息發布。
消費端的RebusHanlder示例:
namespace Sample.Handlers
{
public class TestHandler : IHandleMessages<TestMqMessage>
{
public ILogger Logger { get; set; }
public IMqMessagePublisher Publisher { get; set; }
public TestHandler()
{
Publisher = NullMqMessagePublisher.Instance;
}
public async Task Handle(TestMqMessage message)
{
var msg = $"{Logger.GetType()}:{message.Name},{message.Value},{message.Time}";
Logger.Debug(msg);
await Publisher.PublishAsync(msg);//send it again!
}
}
}
5.忘了說,消息格式的定義
消息的定義,不依賴任何框架,也不依賴Abp或者Abplus,因為前面發布接口IMqMessagePublisher
定義時采用的類型是object
void Publish(object mqMessages);
例如:
namespace Sample.MqMessages
{
/// <summary>
/// Custom MqMessage Definition. No depends on any framework,this class library can be shared as nuget pkg.
/// </summary>
public class TestMqMessage
{
public string Name { get; set; }
public string Value { get; set; }
public DateTime Time { get; set; }
}
}
這個消息定義可以單獨作為一個程序集,發布到私有nuget服務器中,以便團隊共享。
Tips & Extended
Abplus中還提供了幾個消息隊列相關的常用實現:
- Abplus
IMessageTracker
定義消費端處理冪等機制的接口 - Abplus.MqMessages
EventDataPublishHandlerBase<TEventData, TMqMessage>
EventData和MqMessage一對一的泛型版抽象發布Handler,這是EventDataHandler
- Abplus.MqMessages
AbpMqHandlerBase
是包含IMessageTracker
屬性的消費端MqHandler抽象基類 - Abplus.MqMessages.AuditingStore 審計日志發布到消息隊列
- Abplus.MqMessages.RedisStoreMessageTracker 消費端消費行為的冪等支持,基於Redis存儲,Rebus Handler的重試機制和冪等處理
上述具體實現請參考Abplus代碼庫。