微服務實戰(二):落地微服務架構到直銷系統(構建消息總線框架接口)


從上一篇文章大家可以看出,實現一個自己的消息總線框架是非常重要的內容,消息總線可以將界限上下文之間進行解耦,也可以為大並發訪問提供必要的支持。

消息總線的作用:

1.界限上下文解耦:在DDD第一波文章中,當更新了訂單信息后,我們通過調用經銷商界限上下文的領域模型和倉儲,進行了經銷商信息的更新,這造成了耦合。通過一個消息總線,可以在訂單界限上下文的WebApi服務(來源微服務-生產者)更新了訂單信息后,發布一個事件消息到消息總線的某個隊列中,經銷商界限上下文的WebApi服務(消費者)訂閱這個事件消息,然后交給自己的Handler進行消息處理,更新自己的經銷商信息。這樣就實現了訂單界限上下文與經銷商界限上下文解耦。

2.大並發支持:可以通過消息總線進一步提升下單的性能。我們可以將用戶下單的操作直接交給一個下單命令WebApi接收,下單命令WebApi接收到命令后,直接丟給一個消息總線的隊列,然后立即給前端返回下單結果。這樣用戶就不用等待后續的復雜訂單業務邏輯,加快速度。后續訂單的一系列處理交給消息的Handler進行后續的處理與消息的進一步投遞。

 

消息總線設計重點:

1.定義消息(事件)的接口:所有需要投遞與處理的消息,都從這個消息接口繼承,因為需要約束消息中必須包含的內容,比如消息的ID、消息產生的時間等。

public interface IEvent
    {
        Guid Id { get; set; }
        DateTime CreateDate { get; set; }
    }

2.定義消息(事件)處理器接口:當消息投遞到消息總線隊列中后,一定有消費者WebApi接收並處理這個消息,具體的處理方法邏輯在訂閱方處理器中實現,這里先需要定義處理器的接口,便於在消息總線框架中使用。

public interface IEventHandler
    {
        Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
    }

從上面代碼可以看出,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類。

3.定義消息(事件)與消息(事件)處理器關聯接口:一種類型的消息被投遞后,一定要在訂閱方找到這種消息的處理器進行處理,所以一定要定義二者的關聯接口,這樣才能將消息與消息處理器對應起來,才能實現消息被訂閱后的處理。

 public interface IEventHandlerExecutionContext
    {
        void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
            where TEventHandler : IEventHandler;
        bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
            where TEventHandler : IEventHandler;
        Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
    }

RegisterEventHandler方法就是建立消息與消息處理器的關聯,這個方法其實是在訂閱方使用,訂閱方告訴消息總線,什么樣的消息應該交給我的哪個處理器進行處理。

IsRegisterEventHandler方法是判斷消息與處理器之間是否已經存在關聯。

HandleAsync方法是通過查找到消息對應的處理器后,然后調用處理器自己的Handle方法進行消息的處理.

4.定義消息發布、訂閱與消息總線接口:消息總線至少要支持兩個功能,一個是生產者能夠發布消息到我的消息總線,另一個是訂閱方需要能夠從我這個消息總線訂閱消息。

 public interface IEventPublisher
    {
        void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
    }

從上面代碼可以看出,生產者發布的消息仍然要從IEvent繼承的類型。

 public interface IEventSubscriber
    {
        void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent
            where TEventHandler : IEventHandler;
    }

上面代碼是訂閱方用於從消息總線訂閱消息,從代碼中可以看出,它的最終的實現其實就是建立消息與處理器之間的關聯。

 public interface IEventBus:IEventPublisher,IEventSubscriber
    {
    }

消息(事件)總線從兩個接口繼承下來,同時支持消息的發布與消息的訂閱。

5.實現事件基類:上面已經訂閱了消息(事件)的接口,這里來實現事件的基類,其實就是實現消息ID與產生的時間:

  public class BaseEvent : IEvent
    {
        public Guid Id { get; set; }
        public DateTime CreateDate { get; set; }
        public BaseEvent()
        {
            this.Id = Guid.NewGuid();
            this.CreateDate = DateTime.Now;
        }
    }

6.實現消息總線基類:消息總線底層的依賴可以是各種消息代理產品,比如RabbitMq、Kafaka或第三方雲平台提供的消息代理產品,通常我們要封裝這些消息代理產品。在封裝之前,我們需要定義頂層的消息總線基類實現,主要的目的是未來依賴於它的具體實現可替換,另外也將消息與消息處理器的關聯接口傳遞進來,便於訂閱方使用。

public abstract class BaseEventBus : IEventBus
    {
        protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext;
        protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
        {
            this.eventHandlerExecutionContext = eventHandlerExecutionContext;
        }
        public abstract void Publish<TEvent>(TEvent @event)
            where TEvent : IEvent;
        public abstract void Subscribe<TEvent, TEventHandler>()
            where TEvent : IEvent
            where TEventHandler : IEventHandler;
    }

7.實現消息與處理器關聯:消息必須與處理器關聯,訂閱方收到特定類型的消息后,才知道交給哪個處理器處理。

 public class EventHandlerExecutionContext : IEventHandlerExecutionContext
    {
        private readonly IServiceCollection registry;
        private readonly IServiceProvider serviceprovider;
        private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>();
        public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection,
            IServiceProvider> serviceProviderFactory = null)
        {
            this.registry = registry;
            this.serviceprovider = this.registry.BuildServiceProvider();
        }

       //查找消息關聯的處理器,然后調用處理器的處理方法
        public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
        {
            var eventtype = @event.GetType();
            if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0)
            {
                using(var childscope = this.serviceprovider.CreateScope())
                {
                    foreach(var handlertype in handlertypes)
                    {
                        var handler = Activator.CreateInstance(handlertype) as IEventHandler;
                        await handler.HandleAsync(@event);
                    }
                }
            }
        }

       //判斷消息與處理器之間是否有關聯
        public bool IsRegisterEventHandler<TEvent, TEventHandler>()
            where TEvent : IEvent
            where TEventHandler : IEventHandler
        {
            if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist))
            {
                return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler));
            }
            return false;
        }
       
      //將消息與處理器關聯起來,可以在內存中建立關聯,也可以建立在數據庫單獨表中
        public void RegisterEventHandler<TEvent, TEventHandler>()
            where TEvent : IEvent
            where TEventHandler : IEventHandler
        {
            Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations);
        }
    }

上面我們基本上就將消息總線的架子搭建起來了,也實現了基本的功能,下一章我們基於它來實現RabbitMq的消息總線。

QQ討論群:309287205 

微服務實戰視頻請關注微信公眾號:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM