微服務實戰(四):落地微服務架構到直銷系統(將生產者與消費者接入消息總線)


前一篇文章我們已經完成了基於RabbitMq實現的的消息總線,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入消息總線實現消息的發送與消息的接收處理。

 

定義需要發送的消息:

下單消息要被發送到消息總線,並被經銷商微服務的處理器處理。經銷商微服務處理時,需要知道要對哪個經銷商處理多少的PV值與電子幣余額。這些信息就是事件消息需要承載的重要信息。

public class OrderCreatedProcessDealerEvent:BaseEvent
    {
        public decimal OrderTotalPrice { get; set; }
        public decimal OrderTotalPV { get; set; }
        public Guid DealerId { get; set; }
        public Guid OrderId { get; set; }
        public OrderCreatedProcessDealerEvent(Guid dealerid,Guid orderid,decimal ordertotalprice,decimal
            ordertotalpv)
        {
            this.OrderTotalPrice = ordertotalprice;
            this.OrderTotalPV = ordertotalpv;
            this.DealerId = dealerid;
            this.OrderId = orderid;
        }
    }

 

生產者(訂單微服務)連接到消息總線:

生產者-訂單微服務通過Asp.net core WebApi自帶的依賴注入,連接到RabbitMq消息總線。

            services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
            var connectionFactory = new ConnectionFactory { HostName = "localhost" };
            services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
                sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 1));

從上面代碼可以看出,生產者連接到了localhost的Rabbit服務器,並通過調用消息總線的構造函數,定義了發送消息的通道。構造函數具體內容可以查看上一篇文章。

 

生產者(訂單微服務)發送消息到消息總線:

ieventbus.Publish(new OrderCreatedProcessDealerEvent(orderdto.DealerId,
                        orderid, order.OrderTotalPrice.TotalPrice, order.OrderTotalPV.TotalPV));

ieventbus是注入到訂單微服務的構造函數中,並傳遞到訂單創建的用例中。 

 

實現消費者(經銷商微服務)的消息處理器:

消費者會連接到消息總線,接收到特定類型的消息(這里是OrderCreatedProcessDealerEvent),會交給特定的處理器進行處理,所以需要先定義並實現消息處理器。

public class OrderCreatedEventHandler : IEventHandler
    {
        ServiceLocator servicelocator = new ServiceLocator();
        public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
        {
            var idealercontext = servicelocator.GetService<IDealerContext>();
            var irepository =
                servicelocator.GetService<IRepository>(new ParameterOverrides { { "context", idealercontext } });
            var idealerrepository = servicelocator.GetService<IDealerRepository>(new ParameterOverrides { { "context", idealercontext } });
  //先將接收到的消息轉換成特定類型          
var ordercreatedevent = @event as OrderCreatedProcessDealerEvent;
            using (irepository)
            {
                try
                {
                  //根據消息內容,處理自己的邏輯與持久化
                    idealerrepository.SubParentEleMoney(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPrice);
                    idealerrepository.AddDealerPV(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPV);
                    irepository.Commit();
                }
                catch (EleMoneyNotEnoughException)
                {
                     //先不處理電子幣余額不足的情況                   

                }
            }
            return Task.FromResult(true);
        }
    }

 

消費者(經銷商微服務)連接到消息總線:

需要在經銷商微服務指定需要連接到的消息總線,並訂閱哪個類型的消息交給哪個事件處理器進行處理。

            //用於偵聽訂單上下文傳遞的消息
            services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
            var connectionFactory = new ConnectionFactory { HostName = "localhost" };
            services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
                sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 2));
            var eventbus = app.ApplicationServices.GetService<IEventBus>();
//訂閱消息
                        eventbus.Subscribe<OrderCreatedProcessDealerEvent, OrderCreatedEventHandler>();

這樣,兩個微服務直接就能通過RabbitMq消息總線進行消息的發送、消息的接收與處理了,實現了解耦。

 

QQ討論群:309287205 

微服務實戰視頻請關注微信公眾號:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM