分布式事件總線
分布式事件總線系統允許發布和訂閱跨應用/服務邊界傳輸的事件. 你可以使用分布式事件總線在微服務或應用程序之間異步發送和接收消息.
提供程序
分布式事件總線系統提供了一個可以被任何提供程序實現的抽象. 有兩種開箱即用的提供程序:
LocalDistributedEventBus
是默認實現,實現作為進程內工作的分布式事件總線. 是的!如果沒有配置真正的分布式提供程序,默認實現的工作方式與本地事件總線一樣.RabbitMqDistributedEventBus
通過RabbitMQ實現分布式事件總線. 請參閱RabbitMQ集成文檔了解如何配置它.
使用本地事件總線作為默認具有一些重要的優點. 最重要的是:它允許你編寫與分布式體系結構兼容的代碼. 您現在可以編寫一個整體應用程序,以后可以拆分成微服務. 最好通過分布式事件而不是本地事件在邊界上下文之間(或在應用程序模塊之間)進行通信.
例如,預構建的應用模塊被設計成在分布式系統中作為服務工作,同時它們也可以在獨立應用程序中作為模塊工作,而不依賴於外部消息代理.
發布事件
以下介紹了兩種發布分布式事件的方法.
IDistributedEventBus
可以注入 IDistributedEventBus
並且使用發布分布式事件.
示例: 產品的存貨數量發生變化時發布分布式事件
using System;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace AbpDemo
{
public class MyService : ITransientDependency
{
private readonly IDistributedEventBus _distributedEventBus;
public MyService(IDistributedEventBus distributedEventBus)
{
_distributedEventBus = distributedEventBus;
}
public virtual async Task ChangeStockCountAsync(Guid productId, int newCount)
{
await _distributedEventBus.PublishAsync(
new StockCountChangedEvent
{
ProductId = productId,
NewCount = newCount
}
);
}
}
}
PublishAsync
方法需要一個參數:事件對象,它負責保持與事件相關的數據,是一個簡單的普通類:
using System;
namespace AbpDemo
{
[EventName("MyApp.Product.StockChange")]
public class StockCountChangedEto
{
public Guid ProductId { get; set; }
public int NewCount { get; set; }
}
}
即使你不需要傳輸任何數據也需要創建一個類(在這種情況下為空類).
Eto
是我們按照約定使用的Event Transfer Objects(事件傳輸對象)的后綴. s雖然這不是必需的,但我們發現識別這樣的事件類很有用(就像應用層上的DTO 一樣).
事件名稱
EventName
attribute是可選的,但建議使用. 如果不聲明,事件名將事件名稱將是事件類的全名. 這里是 AbpDemo.StockCountChangedEto
.
關於序列化的事件對象
事件傳輸對象必須是可序列化的,因為將其傳輸到流程外時,它們將被序列化/反序列化為JSON或其他格式.
避免循環引用,多態,私有setter,並提供默認(空)構造函數,如果你有其他的構造函數.(雖然某些序列化器可能會正常工作),就像DTO一樣.
實體/聚合根類
實體不能通過依賴注入注入服務,但是在實體/聚合根類中發布分布式事件是非常常見的.
示例: 在聚合根方法內發布分布式事件
using System;
using Volo.Abp.Domain.Entities;
namespace AbpDemo
{
public class Product : AggregateRoot<Guid>
{
public string Name { get; set; }
public int StockCount { get; private set; }
private Product() { }
public Product(Guid id, string name)
: base(id)
{
Name = name;
}
public void ChangeStockCount(int newCount)
{
StockCount = newCount;
//ADD an EVENT TO BE PUBLISHED
AddDistributedEvent(
new StockCountChangedEto
{
ProductId = Id,
NewCount = newCount
}
);
}
}
}
AggregateRoot
類定義了 AddDistributedEvent
來添加一個新的分布式事件,事件在聚合根對象保存(創建,更新或刪除)到數據庫時發布.
如果實體發布這樣的事件,以可控的方式更改相關屬性是一個好的實踐,就像上面的示例一樣 -
StockCount
只能由保證發布事件的ChangeStockCount
方法來更改.
IGeneratesDomainEvents 接口
實際上添加分布式事件並不是 AggregateRoot
類獨有的. 你可以為任何實體類實現 IGeneratesDomainEvents
. 但是 AggregateRoot
默認實現了它簡化你的工作.
不建議為不是聚合根的實體實現此接口,因為它可能不適用於此類實體的某些數據庫提供程序. 例如它適用於EF Core,但不適用於MongoDB.
它是如何實現的?
調用 AddDistributedEvent
不會立即發布事件. 當你將更改保存到數據庫時發布該事件;
- 對於 EF Core, 它在
DbContext.SaveChanges
中發布. - 對於 MongoDB, 它在你調用倉儲的
InsertAsync
,UpdateAsync
或DeleteAsync
方法時發由 (因為MongoDB沒有更改跟蹤系統).
訂閱事件
一個服務可以實現 IDistributedEventHandler<TEvent>
來處理事件.
示例: 處理上面定義的StockCountChangedEto
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace AbpDemo
{
public class MyHandler
: IDistributedEventHandler<StockCountChangedEto>,
ITransientDependency
{
public async Task HandleEventAsync(StockCountChangedEto eventData)
{
var productId = eventData.ProductId;
}
}
}
這就是全部.
MyHandler
由ABP框架自動發現,並在發生StockCountChangedEto
事件時調用HandleEventAsync
.- 如果你使用的是分布式消息代理,比如RabbitMQ,ABP會自動訂閱消息代理上的事件,獲取消息執行處理程序.
- 如果事件處理程序成功執行(沒有拋出任何異常),它將向消息代理發送確認(ACK).
你可以在處理程序注入任何服務來執行所需的邏輯. 一個事件處理程序可以訂閱多個事件,但是需要為每個事件實現 IDistributedEventHandler<TEvent>
接口.
事件處理程序類必須注冊到依賴注入(DI),示例中使用了
ITransientDependency
. 參閱DI文檔了解更多選項.
預定義的事件
如果你配置,ABP框架會為實體自動發布創建,更新和刪除分布式事件.
事件類型
有三種預定義的事件類型:
EntityCreatedEto<T>
是實體T
創建后發布.EntityUpdatedEto<T>
是實體T
更新后發布.EntityDeletedEto<T>
是實體T
刪除后發布.
這些都是泛型的, T
實際上是Event Transfer Object (ETO)的類型,而不是實體的類型,因為實體對象不能做為事件數據傳輸,所以通常會為實體類定義一個ETO類,如為 Product
實體定義 ProductEto
.
訂閱事件
訂閱自動事件與訂閱常規分布式事件相同.
示例: 產品更新后獲取通知
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EventBus.Distributed;
namespace AbpDemo
{
public class MyHandler :
IDistributedEventHandler<EntityUpdatedEto<ProductEto>>,
ITransientDependency
{
public async Task HandleEventAsync(EntityUpdatedEto<ProductEto> eventData)
{
var productId = eventData.Entity.Id;
//TODO
}
}
}
MyHandler
實現了IDistributedEventHandler<EntityUpdatedEto<ProductEto>>
.
配置
你可以在模塊的 ConfigureServices
中配置 AbpDistributedEntityEventOptions
添加選擇器.
示例: 配置示例
Configure<AbpDistributedEntityEventOptions>(options =>
{
//Enable for all entities
options.AutoEventSelectors.AddAll();
//Enable for a single entity
options.AutoEventSelectors.Add<IdentityUser>();
//Enable for all entities in a namespace (and child namespaces)
options.AutoEventSelectors.AddNamespace("Volo.Abp.Identity");
//Custom predicate expression that should return true to select a type
options.AutoEventSelectors.Add(
type => type.Namespace.StartsWith("MyProject.")
);
});
- 最后一個提供了靈活性來決定是否應該針對給定的實體類型發布事件. 返回
true
代表為該Type
發布事件.
你可以添加多個選擇器. 如果選擇器之一與實體類型匹配,則將其選中.
事件傳輸對象
一旦你為一個實體啟用了自動事件,ABP框架就會為實體上的更改發布事件. 如果你沒有為實體指定對應的Event Transfer Object(ETO), ABP框架會使用一個標准類型 EntityEto
,它只有兩個屬性:
EntityType
(string
): 實體類的全名(包括命令空間).KeysAsString
(string
): 已更改實體的主鍵.如果它只有一個主鍵,這個屬性將是主鍵值. 對於復合鍵,它包含所有用,
(逗號)分隔的鍵.
因此可以實現 IDistributedEventHandler<EntityUpdatedEto<EntityEto>>
訂閱事件. 但是訂閱這樣的通用事件不是一個好方法,你可以為實體類型定義對應的ETO.
示例: 為 Product
聲明使用 ProductDto
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.AutoEventSelectors.Add<Product>();
options.EtoMappings.Add<Product, ProductEto>();
});
在這個示例中;
- 添加選擇器允許發布
Product
實體的創建,更新和刪除事件. - 配置為使用
ProductEto
作為事件傳輸對象來發布與Product
相關的事件.
分布式事件系統使用對象到對象的映射系統來映射 Product
對象到 ProductEto
對象,你需要配置映射. 請參閱可以對象到對象映射文檔了解所有選項,下面的示例展示了如何使用AutoMapper庫配置它.
示例: 使用AutoMapper配置 Product
到 ProductEto
映射
using System;
using AutoMapper;
using Volo.Abp.Domain.Entities.Events.Distributed;
namespace AbpDemo
{
[AutoMap(typeof(Product))]
public class ProductEto : EntityEto
{
public Guid Id { get; set; }
public string Name { get; set; }
}
}
此示例使用AutoMapper的 AutoMap
屬性配置的映射. 你可以創建一個配置文件類代替. 請參閱AutoMapper文檔了解更多選項.