領域事件、集成事件、事件總線區別與關系


微軟eShopOnContainers

eShopOnContainers是一個簡化版的基於.NET Core和Docker等技術開發的面向微服務架構的參考應用
其中不僅包含了很多術語、設計模式、架構風格,還使用了一系列的常見技術(RabbitMQ、EventBus、IdentityServer4、Polly、Api Gateway、Redis、CQRS、CAP、CI/CD等),還有一些相關工具(Docker、K8S等)。可以說是一份全面的技術整合實現的應用參考。學習它會對涉及到的技術的實際運用有更加清晰的了解。

相對於eshoponcontainer的其他內容,它的領域事件、集成(整合)事件、事件總線之間的協作關系還是比較難懂的。接下來我們來分析一下。

領域事件(DomainEvent)

使用域事件顯式實現域中的更改的副作用。 如果使用 DDD 術語表述,即使用域事件跨多個聚合顯式實現副作用。 (可選)為了提高可伸縮性並減小對數據庫鎖定的影響,可在相同域的聚合之間使用最終一致性。

例如在 eShopOnContainers 應用程序中,當創建訂單時,用戶會成為買家,因此 OrderStartedDomainEvent 會被引發並在 ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler 中進行處理
eshop中,一個領域事件對應一個handler,使用mediator的INotificationHandler實現

域事件和集成事件是相同的:都是對已發生事件的通知。但是,它們的實現必須不同。域事件是推送到域事件調度程序的消息,可基於IoC 容器或任何其他方法作為內存中轉存進程實現(就是Mediator)

集成事件的目的是將已提交事務和更新傳播到其他子系統,無論它們是其他微服務、綁定上下文,還是外部應用程序。(集成事件是跨服務的,領域事件則不是)

例如:

  1. 當用戶發起訂單時,訂單聚合將發送 OrderStarted 域事件。 OrderStarted 域事件基於標識微服務中的原始用戶信息(包含 CreateOrder 命令中提供的信息),由買方聚合處理,以在訂購微服務時創建買家對象。
  2. 每個 OrderItem 子實體可以在項目價格高於特定金額,或產品項目金額過高時,引發事件。 然后,聚合根可以接收這些事件,並執行全局計算Order的總額。
  3. 某系統中,當文章類目新增或者刪除時,需要刷新緩存,從而實現統一,則可以在新增類目或者刪除類目時,出發刷新換緩存的一個事件:
/// <summary>
/// 添加類目
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<CommandResult> Handle(CreateCategoryCommand request, CancellationToken cancellationToken)
{
    var rep = _uow.GetBaseRepository<Category>();
    var model = rep.GetFirstOrDefault(predicate:x => x.DisplayName == request.Name,disableTracking:true);
    if (model!=null)
    {
        return CommandResult.Fail("名稱重復");
    }
    var entity = new Category(categoryName: request.Name, displayName: request.Name);
    var res= await rep.InsertAsync(entity);
    if (res.Entity!=null)
    {
        entity.AddCacheChangeDomainEvent(new List<string> { _cacheKeyMgr.PostCategoryListlKey() }); // 發出事件后,對應的handler會刷新緩存
        return CommandResult.Success();
    }
    return CommandResult.Fail("添加失敗");
}

集成事件(IntegrationEvent)

基於事件的通信時,當值得注意的事件發生時,微服務會發布事件,例如更新業務實體時。 其他微服務訂閱這些事件。 微服務收到事件時,可以更新其自己的業務實體,這可能會導致發布更多事件。這是最終一致性概念的本質。 通常通過使用事件總線實現來執行此發布/訂閱系統。最終一致事務由一系列分布式操作組成。在每個操作中,微服務會更新業務實體,並發布可觸發下一個操作的事件。

集成事件用於跨多個微服務或外部系統保持域狀態同步。這可通過在微服務外發布集成事件完成。 將事件發布到多個接收方微服務時,每個接收方微服務中的相應事件處理程序會處理該事件。(消息隊列的生產者和消費者)

集成事件是單個應用程序級別的,不建議跨應用使用同一個集成事件,這將導致事件來源混亂(微服務必須獨立)

事件總線(EventBus)

事件總線可實現發布/訂閱式通信,無需組件之間相互顯式識別,
微服務 A 發布到事件總線,這會分發到訂閱微服務 B 和 C,發布服務器無需知道訂閱服務器。:

如何實現發布服務器和訂閱服務器之間的匿名? 一個簡單方法是讓中轉站處理所有通信。事件總線是一個這樣的中轉站。

事件總線通常由兩部分組成:

  1. 抽象或接口。
  2. 一個或多個實現。(RabbitMQ\Azure.ServiceBus\kafka等)

接口的功能很簡單,就是只有發布(發布本系統的集成事件)和訂閱(訂閱其余子系統的事件),發布和訂過程中自身無需知道時那些個子系統進行了參與。

領域事件、集成事件、事件總線的協作(以eshop中的實現為例)

三種事件相互作用,最終是為了解決整個微服務系統的最終一致性,微服務A自身數據發生了變化,那個這個變化所引起的一系列反應有可能導致整個系統產生個各種不同結果。為了確保結果與期望一致,就需要實現一致性,還有就是分布式系統當中的CAP原則。

eShopOnContainers中,領域事件激發時,對應的handler將此次事件的信息保存到集成事件的日志表中,保存這個操作,使用到了對應發生事件的領域實體所在的上下文的事務對象,以保證內部強一致性(在領域事件的handler中,通過獲取實體db上下文的事務對象,將事件保存到日志記錄表中)
具體實現:

// 此處OrderCancelledDomainEvent被激發時的處理程序 OrderCancelledDomainEventHandler.cs
public async Task Handle(OrderCancelledDomainEvent orderCancelledDomainEvent, CancellationToken cancellationToken)
        {
            _logger.CreateLogger<OrderCancelledDomainEvent>()
                .LogTrace("Order with Id: {OrderId} has been successfully updated to status {Status} ({Id})",
                    orderCancelledDomainEvent.Order.Id, nameof(OrderStatus.Cancelled), OrderStatus.Cancelled.Id);

            var order = await _orderRepository.GetAsync(orderCancelledDomainEvent.Order.Id);
            var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString());

            var orderStatusChangedToCancelledIntegrationEvent = new OrderStatusChangedToCancelledIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name);
            // 通過集成事件服務來保存此次領域事件的信息
            await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToCancelledIntegrationEvent);
        }
        
// 繼承事件服務OrderingIntegrationEventService.cs
public async Task AddAndSaveEventAsync(IntegrationEvent evt)
{
    _logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})",
evt.Id, evt);
    // _orderingContext.GetCurrentTransaction() 獲取實體發出域事件時當前上下問的事務對象
    // 由於獲取上下文事務對象的存在,導致eshop中使用了TransactionBehaviour。確保了當前事務對象的存在。
    await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction());
}

然后再TransactionBehaviour中,提交事務,實體的更改和對應域事件都會被記錄到數據庫中。然后通過集成事件服務,根據次事務id查詢出需要激發的領域事件數據,然后遍歷操作:

  1. 標記為處理中
  2. 通過事件總線(消息隊列或者其他組件進行發布)
  3. 標記為處理完成、遇到異常標記為失敗(此處可增加policy的策略重試功能,但是會影響本次操作的響應時間),由后台任務定時重試這些失敗的事件。確保最終一致性。但是消費者端的一致性無法保證(需要其他策略機行處理)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM