微軟微服務eShopOnContainers示例之EventBusRabbitMq解析與實踐


eShopOnContainers

eShopOnContainers是微軟官方的微服務架構示例,GitHub地址https://github.com/dotnet-architecture/eShopOnContainers

eShopOnContainers架構中有一個使用RabbitMQ實現的EventBus(事件總線),EventBus使用的是發布訂閱模式, 使用事件驅動,使得有了一個新需求后直接添加對應Handler並注冊訂閱即可,符合單一職責原則。下在就詳細說說我是如果把eShopOnContainers中的EventBus用到自己的項目中的

項目結構圖

   

持久連接類

面向接口是主流的開發方式,使用rabbitmq我們需要有一個管理與rabbtimq服務器連接的類,也就是

IRabbitMqPersistentConnection 接口與 DefaultRabbitMqPersistentConnection

   

1 /// <summary>

2 /// Rabbitmq持久連接

3 /// </summary>

4 public interface IRabbitMqPersistentConnection : IDisposable

5 {

6   /// <summary>

7    /// 是否連接

8   /// </summary>

9   bool IsConnected { get; }

10

11   /// <summary>

12   /// 嘗試連接

13    /// </summary>

14    /// <returns></returns>

15    bool TryConnect();

16

17    IModel CreateModel();

18 }

   

在接口里定義了三個方法(屬性也是方法),第一個判斷是否已經連接,第二個嘗試連接,第二個方法創建一個channel

DefaultRabbitMqPersistentConnection類里實現了接口,它需要一個類型為IConnectionFactory的參數

   

   

首先來看看TryConnect方法

在這里使用了傳入的_connectionFactory創建了連接,並且注冊了連接成功、失敗等事件進行了log輸出

1 public bool TryConnect()

2 {

3    Logger.Info("RabbitMQ客戶端正在嘗試連接");

4

5    lock (_syncRoot)

6   {

7      var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>()

9 .    WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>

10      {

11       Logger.Warn(ex.ToString());

12      }

13 );

14

15    policy.Execute(() =>

16   {

17     _connection = _connectionFactory.CreateConnection();

19    });

20

21   if (IsConnected)

22    {

23      _connection.ConnectionShutdown += OnConnectionShutdown;

24      _connection.CallbackException += OnCallbackException;

25      _connection.ConnectionBlocked += OnConnectionBlocked;

26

27      Logger.Info($"連接到 {_connection.Endpoint.HostName} 並注冊了異常失敗事件");

28

29      return true;

30   }

31    Logger.Fatal("致命錯誤:無法創建和打開RabbitMQ連接");

32

33    return false;

34    }

35 }

   

訂閱管理器

連接類其它的已經沒有什么值得一說的了,下面我們把眼光轉到訂閱管理器

這是管理器的接口,作用是來用維護EventDataEventHandler的關系,圖中箭頭指向的代碼是我自己添加的,是為了定義一個命名約定把EventDataEventHandler的關系統一添加到管理器中

訂閱管理器接口的默認實現是InMemoryEventBusSubscriptionsManager類放到內存中,也可以輕松添加redissql server等實現,InMemoryEventBusSubscriptionsManager內部維護了一個字典,對應關系都放在這個字典中,可以進行添加訂閱、取消訂閱

   

在說EventBus類之前先說說兩種Handler,一種是普通的事件處理,泛型接口接收IntegrationEvent(EventData)的派生類做為參數並且定義了Handler方法來進行處理

第二種是動態EventDataHandler,它並不會要求必須是IntegrationEvent的派生類,可以靈活的處理其它的類型

   

事件總線

下面就到了EventBus的接口,定義很簡單,進行訂閱事件,和dynamic的事件以及取消訂閱和最重要的發布事件

1   public interface IEventBus

2    {

3      /// <summary>

4      /// 訂閱事件

5      /// </summary>

6     /// <typeparam name="T"></typeparam>

7      /// <typeparam name="TH"></typeparam>

8      void Subscribe<T, TH>()

9      where T : IntegrationEvent

10     where TH : IIntegrationEventHandler<T>;

11

12      /// <summary>

13      /// 訂閱動態事件

14      /// </summary>

15      /// <typeparam name="TH"></typeparam>

16      /// <param name="eventName"></param>

17      void SubscribeDynamic<TH>(string eventName)

18      where TH : IDynamicIntegrationEventHandler;

19

20     /// <summary>

21     /// 取消訂閱動態事件

22      /// </summary>

23     /// <typeparam name="TH"></typeparam>

24     /// <param name="eventName"></param>

25      void UnsubscribeDynamic<TH>(string eventName)

26      where TH : IDynamicIntegrationEventHandler;

27

28      /// <summary>

29     /// 訂閱事件

30      /// </summary>

31      /// <param name="event"></param>

32      /// <param name="handler"></param>

33      void Subscribe(Type @event, Type handler);

34

35      /// <summary>

36      /// 取消訂閱事件

37      /// </summary>

38     /// <typeparam name="T"></typeparam>

39      /// <typeparam name="TH"></typeparam>

40      void Unsubscribe<T, TH>()

41      where TH : IIntegrationEventHandler<T>

42      where T : IntegrationEvent;

43

44     /// <summary>

45      /// 發布事件

46     /// </summary>

47      /// <param name="event"></param>

48      void Publish(IntegrationEvent @event);

49 }

我使用的是RabbitMQEventBus實現,RabbitMQAMQP協議的企業級消息隊列,可靠性與性能都非常高。具體大家可以去了解一下RbbaitMQ

EventBusRabbitMq類需要三個參數

RabbitMqPersistentConnection 持久連接類

IEventBusSubscriptionsManager 訂閱管理器

ILifetimeScope 以及autofac (依賴注入)

實際上還需要一個Logger參數,在我的項目中使用了全局的Logger類,所以就不需要這個參數了,在構造函數中可以看到如果沒有傳入訂閱管理器,那么將會使用默認的InMemry,而且在構造函數中就創建了消費者(和Queue), 在示例中是沒有常量QueueName的,使用的是斷開連接后自動刪除的隊列,但是這樣會丟失消息,所以我改成了持久的隊列

   

下面就是創建消費者的方法了,首先聲明一個direct類型的交換機,我對方法進行了修改,添加了對消費者的公平分發,以及手動交付和消息持久化來確保消息會被成功的消費,Received方法中調用了ProcessEvent方法,在這個方法里就是對消息的消費了

1   private IModel CreateConsumerChannel()

2   {

3      if (!_persistentConnection.IsConnected)

4     {

5     _persistentConnection.TryConnect();

6     }

7

8     var channel = _persistentConnection.CreateModel();

9

10      channel.ExchangeDeclare(exchange: BrokerName,

11      type: "direct");

12      //均發,同一時間只處理一個消息

13      channel.BasicQos(0, 1, false);

14      _queueName = channel.QueueDeclare(QueueName, true, false, false);

15

16      var consumer = new EventingBasicConsumer(channel);

17     consumer.Received += async (model, ea) =>

18      {

19      var eventName = ea.RoutingKey;

20      var message = Encoding.UTF8.GetString(ea.Body);

21

22      await ProcessEvent(eventName, message);

23      //交付

24      channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

25    };

26

27      channel.BasicConsume(queue: _queueName,

28      autoAck: false,

29      consumer: consumer);

30

31      channel.CallbackException += (sender, ea) =>

32     {

33        _consumerChannel.Dispose();

34       _consumerChannel = CreateConsumerChannel();

35     };

36

37      return channel;

38    }

   

方法很簡單,首先判斷事件名稱在訂閱管理器中是否存在,然后創建一個autofac的生命周期

使用EventData的名稱拿到所有的Hnalder

var subscriptions = _subsManager.GetHandlersForEvent(eventName);

然后進行判斷是否是Dynamic,后面代碼基本一致就是從autofac中拿到EventHandler然后調用Handler方法

   

   

下面就是依賴注入的部分了

首先是注冊訂閱管理器

在這里創建ConnectionFactory類,並且讀取config中的配置項,拿到hostuser以及pwd,我又添加了斷線重連和工作進程恢復

builder.RegisterInstance(new DefaultRabbitMqPersistentConnection(new ConnectionFactory()

{

  HostName = ConfigurationManager.AppSettings["rabbitmqHost"],

  UserName = ConfigurationManager.AppSettings["rabbitmqUser"],

  Password = ConfigurationManager.AppSettings["rabbitmqPwd"],

  //斷線重連並恢復工作進程

  AutomaticRecoveryEnabled = true,

  TopologyRecoveryEnabled = true

})).As<IRabbitMqPersistentConnection>()

.SingleInstance();

   

然后就是注冊事件總統與訂閱管理器

builder.RegisterType<EventBusRabbitMq.EventBusRabbitMq>().As<IEventBus>().SingleInstance();

builder.RegisterType<InMemoryEventBusSubscriptionsManager>().As<IEventBusSubscriptionsManager>().SingleInstance();

   

最后我拿到了EventDataEventHandler所在程序中,掃描所有的Type並為所有以Handler結尾的類型添加到容器了,然后拿到了派生於IntegrationEvent的所有子類然后與對應的Handler進行訂閱,這樣基本上大多數的開發場景都不需要單獨進行訂閱了,只要符合命名約定就會自動訂閱

//配置EventBus

var ass = Assembly.GetAssembly(typeof(Response));

builder.RegisterAssemblyTypes(ass).Where(t => t.Name.EndsWith("Handler")).InstancePerDependency();

var build = builder.Build();

var bus = build.Resolve<IEventBus>();

var allEvent = ass.GetTypes().Where(o => o.IsSubclassOf(typeof(IntegrationEvent)));

foreach (var t in allEvent)

{

  var handler = ass.GetTypes().FirstOrDefault(o => o.Name.StartsWith(t.Name) && o.Name.EndsWith("Handler"));

  if (handler != null)

  bus.Subscribe(t, handler);

}

   

   

   

最后就是使用了,我定義了TestEventTestEventHandler類,在Handler中輸出TestEvent的屬性 Test

1     public class TestEvent : IntegrationEvent

2     {

3        public string Test { get; set; }

4      }

5

6     public class TestEventHandler : IIntegrationEventHandler<TestEvent>

7      {

8        private readonly Repository<Lawfirm> _lawfirmRepository;

9

10       public TestEventHandler(Repository<Lawfirm> lawfirmRepository)

11       {

12          _lawfirmRepository = lawfirmRepository;

13        }

14

15      public Task Handle(TestEvent @event)

16       {

17         Console.WriteLine(@event.Test);

18        return Task.FromResult(0);

19         }

20   }

   

使用非常簡單,下面我使用事件總線發布了一個消息,然后把程序運行起來

1 _eventBus = container.Resolve<IEventBus>();

2

3 _eventBus.Publish(new TestEvent

4 {

5 Test = "Hello World"

6 });

   

我們看到了剛才發布的消息HelloWorld ,哈哈,輸出是不是很奇怪,不家什么成功導入x條,這是因為我的爬蟲項目,在博文的同時正在采集着數據,消息被發到了這個消費者下並且被成功消費。安利一下我搭的爬蟲框架,使用了.net core 2.0+Ef core 2.0,集成了無頭瀏覽器與anglesharp還有EventBus那就不用說了,相關技術棧有 依賴注入、倉儲模式、AutoMapper、還有一個Web的管理界面以及下一步要集成的Hangfire任務調度。有感興趣的可以留言,我會根據反饋信息決定是否開源分享給大家

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM