MassTransit 介紹
先看下masstransit 官網介紹:MassTransit 是一個自由、開源、輕量級的消息總線, 用於使用. NET 框架創建分布式應用程序。MassTransit 在現有消息傳輸上提供了一組廣泛的功能, 從而使開發人員能夠友好地使用基於消息的會話模式異步連接服務。基於消息的通信是實現面向服務的體系結構的可靠和可擴展的方式。前段時間看eshop文檔,在微服務之間實現基於事件的通信章節。有提到MassTransit是實現event Bus技術之一,於是就翻譯了幾篇
下面利用MassTransit實現eshop中的一個簡單實例(前提你剛好了解eshop中的這個場景): Catelog微服務中產品價格更改,Basket微服務通知購車價格變動

如果沒看源碼,這個場景也很常見很好理解,我們在接下來創建兩個webapi站點和一個類庫來演示如何實現上述場景
在類庫中定義產品價格變動消息
public interface IProductPriceChanged { int ProductId { get; set; } decimal NewPrice { get; set; } decimal OldPrice { get; set; } }
MassTransit.Catalog站點
引入一下包:MassTransit、MassTransit.RabbitMQ、Autofac.Extensions.DependencyInjection。在Startup類ConfigureServices中添加:
builder.Register(c => { return Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.Host(new Uri($"rabbitmq://{Configuration["HostName"]}"), h => { h.Username(Configuration["UserName"]); h.Password(Configuration["Password"]); }); }); }) .As<IBusControl>() .As<IBus>() .As<IPublishEndpoint>() .SingleInstance(); builder.Populate(services); container = builder.Build(); return new AutofacServiceProvider(container);
然后在Configure方法添加:
var bus = container.Resolve<IBusControl>(); var busHandle = TaskUtil.Await(() => bus.StartAsync()); lifetime.ApplicationStopping.Register(() => busHandle.Stop());
最后在Controller添加模擬修改價格的接口
private readonly IBus _bus; public ValuesController(IBus bus) { _bus = bus; } // GET api/values [HttpGet] public async Task<IEnumerable<string>> GetAsync() { await _bus.Publish<IProductPriceChanged>(new { ProductId=100, NewPrice=1999, OldPrice=2000, }); return new string[] { "價格更改" }; }
到此發布端已經完成了
MassTransit.Basket站點
同樣引入包,並在Configure方法中添加訂閱的代碼
var bus = Bus.Factory.CreateUsingRabbitMq(sbc => { var host = sbc.Host(new Uri($"rabbitmq://{Configuration["RabbitMQ:HostName"]}"), h => { h.Username(Configuration["RabbitMQ:UserName"]); h.Password(Configuration["RabbitMQ:Password"]); }); sbc.ReceiveEndpoint(host, "ProductPriceChangedQueue", e => { e.Consumer<ProductPriceChangedConsumer>(); }); }); // start/stop the bus with the web application applicationLifetime.ApplicationStarted.Register(bus.Start); applicationLifetime.ApplicationStopped.Register(bus.Stop);
消費端 你可以使用其他方式寄宿比如控制台等;
最后運行兩個站點看下輸出

