前一篇文章我們已經完成了基於RabbitMq實現的的消息總線,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入消息總線實現消息的發送與消息的接收處理。
定義需要發送的消息:
下單消息要被發送到消息總線,並被經銷商微服務的處理器處理。經銷商微服務處理時,需要知道要對哪個經銷商處理多少的PV值與電子幣余額。這些信息就是事件消息需要承載的重要信息。
public class OrderCreatedProcessDealerEvent:BaseEvent { public decimal OrderTotalPrice { get; set; } public decimal OrderTotalPV { get; set; } public Guid DealerId { get; set; } public Guid OrderId { get; set; } public OrderCreatedProcessDealerEvent(Guid dealerid,Guid orderid,decimal ordertotalprice,decimal ordertotalpv) { this.OrderTotalPrice = ordertotalprice; this.OrderTotalPV = ordertotalpv; this.DealerId = dealerid; this.OrderId = orderid; } }
生產者(訂單微服務)連接到消息總線:
生產者-訂單微服務通過Asp.net core WebApi自帶的依賴注入,連接到RabbitMq消息總線。
services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services)); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory, sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 1));
從上面代碼可以看出,生產者連接到了localhost的Rabbit服務器,並通過調用消息總線的構造函數,定義了發送消息的通道。構造函數具體內容可以查看上一篇文章。
生產者(訂單微服務)發送消息到消息總線:
ieventbus.Publish(new OrderCreatedProcessDealerEvent(orderdto.DealerId, orderid, order.OrderTotalPrice.TotalPrice, order.OrderTotalPV.TotalPV));
ieventbus是注入到訂單微服務的構造函數中,並傳遞到訂單創建的用例中。
實現消費者(經銷商微服務)的消息處理器:
消費者會連接到消息總線,接收到特定類型的消息(這里是OrderCreatedProcessDealerEvent),會交給特定的處理器進行處理,所以需要先定義並實現消息處理器。
public class OrderCreatedEventHandler : IEventHandler { ServiceLocator servicelocator = new ServiceLocator(); public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent { var idealercontext = servicelocator.GetService<IDealerContext>(); var irepository = servicelocator.GetService<IRepository>(new ParameterOverrides { { "context", idealercontext } }); var idealerrepository = servicelocator.GetService<IDealerRepository>(new ParameterOverrides { { "context", idealercontext } }); //先將接收到的消息轉換成特定類型 var ordercreatedevent = @event as OrderCreatedProcessDealerEvent; using (irepository) { try { //根據消息內容,處理自己的邏輯與持久化 idealerrepository.SubParentEleMoney(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPrice); idealerrepository.AddDealerPV(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPV); irepository.Commit(); } catch (EleMoneyNotEnoughException) { //先不處理電子幣余額不足的情況 } } return Task.FromResult(true); } }
消費者(經銷商微服務)連接到消息總線:
需要在經銷商微服務指定需要連接到的消息總線,並訂閱哪個類型的消息交給哪個事件處理器進行處理。
//用於偵聽訂單上下文傳遞的消息 services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services)); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory, sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 2));
var eventbus = app.ApplicationServices.GetService<IEventBus>(); //訂閱消息 eventbus.Subscribe<OrderCreatedProcessDealerEvent, OrderCreatedEventHandler>();
這樣,兩個微服務直接就能通過RabbitMq消息總線進行消息的發送、消息的接收與處理了,實現了解耦。
QQ討論群:309287205
微服務實戰視頻請關注微信公眾號:
