前言
簡單介紹一下EventBus.
正文
EventBus 也就是集成事件,用於服務與服務之間的通信。
比如說我們的訂單處理事件,當訂單處理完畢后,我們如果通過api馬上去調用后續接口。
比如說訂單完成給用戶通知的話,如果是大量訂單,即使我們使用異步async await 這種模式,在這個訂單服務中將會大量占用資源,因為async await 本身是線程池。
因為里面的資源是有限的,如果創建訂單還有完成訂單占用大量資源的話,發送郵件還加入到競爭中,那么可以想象,機器掉入資源用盡就更大了。如果是訂單完成之后要調用多個服務,那么可想而知,壓力多大。
那么EventBus 就是通過發布訂閱這種模式來緩存起來,EventBus 發布一個事件,然后其他微服務可以進行訂閱,這樣就緩解了機器的負擔。
.net core 實現EventBus 的框架如下:https://github.com/dotnetcore/CAP
可以去看下上面這個地址,這里就不詳細介紹了,因為沒有什么比文檔更加詳細了。
在.net core 中加入EventBus:
// 注冊 EventBus
services.AddEventBus(Configuration);
AddEventBus 如下:
/// <summary>
/// 注冊EventBus(集成事件處理服務)
/// </summary>
/// <param name="services"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
{
// 注入集成事件訂閱服務
services.AddTransient<ISubscriberService, SubscriberService>();
// 注入CAP服務
services.AddCap(options =>
{
// 指定CAP組件所使用的數據庫上下文,當前設置表示EventBus與領域驅動共享數據庫鏈接
options.UseEntityFramework<DomainContext>();
// package: DotNetCore.CAP.RabbitMQ
// 指定RabbitMQ作為我們EventBus的消息隊列的存儲,並注入配置
options.UseRabbitMQ(options =>
{
configuration.GetSection("RabbitMQ").Bind(options);
});
// options.UseDashboard();
});
return services;
}
可以看到上面使用RabbitMQ作為Eventbus的消息隊列處理。當然除了RabbitMq,這個框架還支持其他的,如Kafka, AzureService, AmazonSQS等,看公司或者項目需要什么就行。
因為使用了RabbitMq,那么上面的寫了加載RabbitMq:configuration.GetSection("RabbitMQ").Bind(options),配置RabbitMQ:
"RabbitMQ": {
"HostName": "127.0.0.1",
"UserName": "mq",
"Password": "123456",
"VirtualHost": "blog",
"ExchangeName": "blog_queue"
}
那么看一下EventBus發布:
/// <summary>
/// 創建Order領域事件處理
/// </summary>
public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
{
ICapPublisher _capPublisher;
public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
{
_capPublisher = capPublisher;
}
/// <summary>
/// 領域事件處理
/// </summary>
/// <param name="notification"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
{
// 當創建新訂單時,向 EventBus 發布一個事件
await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
}
}
前面訂單創建完畢的時候,經過調用相關的領域事件,然后向EventBus 發送了一個事件。那么先看下這個事件。
/// <summary>
/// 集成事件:訂單創建完成
/// </summary>
public class OrderCreatedIntegrationEvent
{
public long OrderId { get; }
public OrderCreatedIntegrationEvent(long orderId)
{
OrderId = orderId;
}
}
這樣就創建了一個集成事件,並且發布出去了。
然后創建訂閱服務:
/// <summary>
/// 集成事件訂閱服務
/// </summary>
public class SubscriberService : ISubscriberService, ICapSubscribe
{
IMediator _mediator;
public SubscriberService(IMediator mediator)
{
_mediator = mediator;
}
/// <summary>
/// 訂閱訂單創建成功集成事件
/// </summary>
/// <param name="event"></param>
[CapSubscribe("OrderCreated")]
public void OrderCreatedSucceeded(OrderCreatedIntegrationEvent @event)
{
// do something...
}
/// <summary>
/// 訂閱訂單支付成功集成事件
/// </summary>
/// <param name="event"></param>
[CapSubscribe("OrderPaymentSucceeded")]
public void OrderPaymentSucceeded(OrderPaymentSucceededIntegrationEvent @event)
{
// do something...
}
}
這樣即可,框架會幫我們處理。
總結
-
集成事件是跨服務的事件,在領域模式中,集成事件也是跨服務的領域事件
-
在領域模型中,集成事件一般由領域事件驅動觸發
-
集成事件可以實現一致性,補充事務不能跨服務
-
集成事件會有一些額外的負擔,使用的時候思考是否有必要
結
因為這個文檔比較清晰,直接看文檔更加通常,就不演示具體運行。
再次放一下文檔:https://github.com/dotnetcore/CAP
下一節HttpClientFactory,以上只是個人整理一下,如有錯誤,望請指出。