微服務實戰(三):落地微服務架構到直銷系統(構建基於RabbitMq的消息總線)


從前面文章可以看出,消息總線是EDA(事件驅動架構)與微服務架構的核心部件,沒有消息總線,就無法很好的實現微服務之間的解耦與通訊。通常我們可以利用現有成熟的消息代理產品或雲平台提供的消息服務來構建自己的消息總線;也可以自己完全寫一個消息代理產品,然后基於它構建自己的消息總線。通常我們不用重復造輪子(除非公司有特殊的要求,比如一些大型互聯網公司考慮到自主可控的白盒子),可以利用比如像RabbitMq這樣成熟的消息代理產品作為消息總線的底層支持。

RabbitMq核心組件解釋:

Connection:消息的發送方或訂閱方通過它連接到RabbitMq服務器。

Channel:消息的發送方或訂閱方通過Connection連接到RabbitMq服務器后,通過Channel建立會話通道。

Exchange:消息的發送方向Exchange發送消息,通過RabbitMq服務器中Exchange與Queue的綁定關系,Exchange會將消息路由到匹配的Queue中。

Queue:消息的承載者,消息的發送者的消息最終通過Exchange路由到匹配的Queue,消息的接收者從Queue接收消息並進行處理。

Exchange模式:在消息發送到Exchange時,需要路由到匹配的Queue中,至於如何路由,則是由Exchange模式決定的。

1.Direct模式:特定的路由鍵(消息類型)轉發到該Exchange的指定Queue中。

2.Fanout模式:發送到該Exchange的消息,被同時發送到Exchange下綁定的所有Queue中。

3.Topic模式:具有某種特征的消息轉發到該Exchange的指定Queue中。

我們最常見的使用是Direct模式,如果消息要被多個消費者消費,則可以使用Fanout模式。

 

實現基於RabbitMq的消息總線:
我們首先需要安裝Erlang與RabbitMq到服務器上,然后就可以進行基於RabbitMq的消息總線的開發了,開發的總體思路與步驟如下:

1.首先建立一個項目作為消息總線,然后引入Rabbitmq.Client 這個nuget包,這樣就有了RabbitMq開發的支持。

2.前面實現了基本的消息總線,所有基於RabbitMq的消息總線是從它繼承下來的,並需要傳入特定的參數到消息總線的構造函數中:

 public RabbitMqEB(IConnectionFactory connectionFactory,IEventHandlerExecutionContext context,
            string exchangeName,string exchangeType,string queueName,int publisherorconsumer,
            bool autoAck = true) : base(context)
        {
            this.connectionFactory = connectionFactory;
            this.connection = this.connectionFactory.CreateConnection();
            this.exchangeName = exchangeName;
            this.exchangeType = exchangeType;
            this.autoAck = autoAck;
            this.queueName = queueName;
            if (publisherorconsumer == 2)
            {
                this.channel = CreateComsumerChannel();
            }
        }

connectionFactory:RabbitMq.Client中的類型,用於與RabbitMq服務器建立連接時需要使用的對象。

context:消息與消息處理器之間的關聯關系的對象。

exchangeName:生產者或消費者需要連接到的Exchange的名字。

exchangeType:前面所描述的Exchange模式。

queueName:生產者或消費者發送或接收消息時的Queue的名字。

publisherorconsumer:指定連接到消息總線的組件是消息總線的生產者還是消費者,消費者和生產者會有不同,消費者(publisherorconsumer==2)會構建一個消費通道,用於從Queue接收消息並調用父類的ieventHandlerExecutionContext的HandleAsync方法來處理消息。

3.建立到RabbitMq的連接: 

//判斷是否已經建立了連接
public bool IsConnected
        {
            get { return this.connection != null && this.connection.IsOpen; }
        }
public bool TryConnect() { //出現連接異常時的重試策略,通常通過第三方nuget包實現重試功能,這里出現連接異常時,每個1秒重試一次,共重試5次 var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>() .WaitAndRetry(5, p => TimeSpan.FromSeconds(1),(ex,time)=> { //記錄錯誤日志 }); policy.Execute(() => { //建立RabbitMq Server的連接 this.connection = this.connectionFactory.CreateConnection(); }); if (IsConnected) { return true; } return false; }

 4.創建消費者通道:

private IModel CreateComsumerChannel()
        {
            if (!IsConnected)
            {
                TryConnect();
            }
            var channel = this.connection.CreateModel();            
            channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
            channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false,
                arguments: null);
            var consumer = new EventingBasicConsumer(channel);
          //消費者接收到消息的處理
            consumer.Received += async (model, ea) =>
                {
                    var eventbody = ea.Body;
                    var json = Encoding.UTF8.GetString(eventbody);
                    var @event = (IEvent)JsonConvert.DeserializeObject(json);
                   //調用關聯對象中消息對應的處理器的處理方法
                    await this.eventHandlerExecutionContext.HandleAsync(@event);
                   //向會話通道確認此消息已被處理
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                };
            channel.BasicConsume(queue: this.queueName, autoAck: false, consumer: consumer);
            
            channel.CallbackException += (sender, ea) =>
            {
                this.channel.Dispose();
                this.channel = CreateComsumerChannel();
             };
            return channel;
        }

5.對生產者發布消息到交換機隊列的支持:

 public override void Publish<TEvent>(TEvent @event)
        {
            if (!IsConnected)
            {
                TryConnect();
            }
            using(var channel = this.connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);
              //發布到交換機,根據交換機與隊列的綁定以及交換機模式,最終發布到指定的隊列中
                channel.BasicPublish(this.exchangeName, @event.GetType().FullName,null, body);
            }
        }

6.對訂閱者從交換機隊列中訂閱消息的支持:

 public override void Subscribe<TEvent, TEventHandler>()
        {
           //注冊接收到的消息類型到訂閱方的處理器之間的關系
            if (!this.eventHandlerExecutionContext.IsRegisterEventHandler < TEvent,TEventHandler>()){
                this.eventHandlerExecutionContext.RegisterEventHandler<TEvent, TEventHandler>();
              //消費者進行隊列綁定
                this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
            }
        }

從上面的6個步驟,我們基本上就完成了基於RabbitMq消息總線的基本功能,這里需要說明的是,上述代碼只是演示,在實際生產環境中,不能直接使用以上代碼,還需要小心的重構此代碼以保證可靠性與性能。

 

QQ討論群:309287205 

微服務實戰視頻請關注微信公眾號:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM