33 | 集成事件:使用RabbitMQ來實現EventBus
這一節我們來講解如何通過 CAP 組件和 RabbitMQ 來實現 EventBus
要實現 EventBus,我們這里借助了 RabbitMQ,它的整個安裝和使用的體驗是非常人性化的,如果是在 Windows 下開發的話,它可以有 Windows 的 installer,也可以在其它的操作系統下安裝和使用,當然它也支持 Docker 的模式,我們可以在以下的地址獲取到安裝包和安裝方法的說明
https://www.rabbitmq.com/download.html
另一個就是在 .NET Core 社區比較知名的 CAP 框架,這個框架是由我們國人開發的,它實現了開箱即用的 EventBus 的實現,我們可以通過簡單的配置,就能把 RabbitMQ 集成進來,並且實現我們的集成事件的處理
https://github.com/dotnetcore/CAP
我們來看一下 CAP 框架的實現架構
它實際上實現了一個叫 OutBox 的設計模式,就是在我們的每個微服務,比如說微服務 A 的數據庫 A,在這個數據庫內部它建立了兩張表,一張叫 publish 事件表和一張叫 receiver 事件表,這兩張事件表用來記錄微服務 A 發出的和微服務 A 收到的事件
當我們要發出事件時,我們會把事件的存儲邏輯與我們的業務邏輯的事務合並,在同一個事務里提交,也就意味着當我們的業務邏輯提交成功時,我們的事件表里面的事件是一定存在的,它是與我們的業務邏輯的事務是強綁定的
如果說我們的業務邏輯失敗了,事務回滾了,這條事件是不會出現在我們的事件表里的,這樣子就可以做到說我們要發送的事件一定是與業務邏輯是一致的
接下來由我們組件來負責將事件表里的事件全部都發送到 EventBus,比如說 RabbitMQ 消息隊列里面去,由接收方訂閱
對於訂閱的事件的話,設計的模式也是同理,當我們的應用程序在消息隊列獲取到信息的時候,它就會將這些消息持久化到我們的數據庫的 Receive 事件表里,這樣我們就可以在本地進行事件的處理,失敗重試等操作,這些都是由 CAP 框架完成的,我們僅需要去做簡單的配置,關注發布和訂閱的業務邏輯即可
我們看一下代碼,剛才有提到 CAP 的架構,關鍵的一點是需要事件的存儲與我們的業務邏輯在同一個事務里,所以說我們在處理事務的邏輯部分的話,需要嵌入 CAP 的一部分代碼,我們看一下 EFContext 的定義
public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
{
_mediator = mediator;
_capBus = capBus;
}
之前有關注到有一個叫 ICapPublisher 這個入參,關鍵的是這一行代碼我們需要關注一下
_currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);
這一行代碼的作用是創建事務,我們可以看到創建事務的過程中,我們把 ICapPublisher 也傳入給了這個方法的構造函數,實際上這個方法是由 CAP 的組件提供的,它的核心作用就是將我們要發送的事件與我們的業務的存儲都放在同一個事務內部,這樣子我們就可以使得事務提交時或者回滾時,我們的事件與業務邏輯的存取都是一致的
然后我們再來看一下配置的部分,寫在 ServiceCollectionExtensions 下面
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
services.AddTransient<ISubscriberService, SubscriberService>();
services.AddCap(options =>
{
options.UseEntityFramework<DomainContext>();
options.UseRabbitMQ(options =>
{
configuration.GetSection("RabbitMQ").Bind(options);
});
//options.UseDashboard();
});
return services;
}
我們這里定義了一個 AddEventBus,可以看到將我們之前演示的代碼訂閱服務注入進來,然后 Services 最重點的代碼是 AddCap,我們需要告訴 CAP 框架我們是針對 DomainContext 來實現我們的 EventBus,EventBus 與 DomainContext 共享我們的數據庫連接,下面一行代碼是指我們要用 RabbitMQ 來作為我們 EventBus 的消息隊列的存儲,這里可以看到使用了一個 Bind 的方法將我們的配置綁定到 RabbitMQ 的 options 上面去
我們可以看一下我們的配置
"RabbitMQ": {
"HostName": "localhost",
"UserName": "admin",
"Password": "123456",
"VirtualHost": "geektime",
"ExchangeName": "geek_queue"
}
這里可以看到我們定義了一個 RabbitMQ 的配置,然后這里面會有我們的 host,因為是本地安裝的,所以訪問地址就是 localhost,VirtualHost 是 RabbitMQ 一個比較特殊的設置,它的作用是將 RabbitMQ 的空間區分為不同的空間,你可以認為這是一個租戶,相同的 VirtualHost,大家都可以認為是一個 RabbitMQ 的集群,最下面的 ExchangeName 就是隊列需要訂閱的 Exchange 的名稱,消息的發布和訂閱都是通過這個 Exchange 來的
然后我們在 Startup 這里添加一行
services.AddEventBus(Configuration);
這樣我們就配置完成了
GitHub源碼鏈接:
https://github.com/witskeeper/geektime
本作品采用知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議進行許可。
歡迎轉載、使用、重新發布,但務必保留文章署名 鄭子銘 (包含鏈接: http://www.cnblogs.com/MingsonZheng/ ),不得用於商業目的,基於本文修改后的作品務必以相同的許可發布。
如有任何疑問,請與我聯系 (MingsonZheng@outlook.com) 。