MassTransit 實現應用程序間交互


MassTransit 介紹 

先看下masstransit 官網介紹:MassTransit 是一個自由、開源、輕量級的消息總線, 用於使用. NET 框架創建分布式應用程序。MassTransit 在現有消息傳輸提供了一組廣泛的功能, 從而使開發人員能夠友好地使用基於消息的會話模式異步連接服務。基於消息的通信是實現面向服務的體系結構的可靠和可擴展的方式。前段時間看eshop文檔,在微服務之間實現基於事件的通信章節。有提到MassTransit是實現event Bus技術之一,於是就翻譯了幾篇

下面利用MassTransit實現eshop中的一個簡單實例(前提你剛好了解eshop中的這個場景): Catelog微服務中產品價格更改,Basket微服務通知購車價格變動

 

如果沒看源碼,這個場景也很常見很好理解,我們在接下來創建兩個webapi站點和一個類庫來演示如何實現上述場景

在類庫中定義產品價格變動消息

 public interface IProductPriceChanged
    {
        int ProductId { get; set; }

        decimal NewPrice { get; set; }

        decimal OldPrice { get; set; }

 }

MassTransit.Catalog站點 

引入一下包:MassTransit、MassTransit.RabbitMQ、Autofac.Extensions.DependencyInjection。在Startup類ConfigureServices中添加:

 builder.Register(c =>
            {
                return Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    cfg.Host(new Uri($"rabbitmq://{Configuration["HostName"]}"), h =>
                    {
                        h.Username(Configuration["UserName"]);
                        h.Password(Configuration["Password"]);
                    });
                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .As<IPublishEndpoint>()
            .SingleInstance();
            builder.Populate(services);
            container = builder.Build();
            return new AutofacServiceProvider(container);

然后在Configure方法添加:  

var bus = container.Resolve<IBusControl>();
var busHandle = TaskUtil.Await(() => bus.StartAsync());
 lifetime.ApplicationStopping.Register(() => busHandle.Stop());

最后在Controller添加模擬修改價格的接口

 private readonly IBus _bus;
        public ValuesController(IBus bus)
        {
            _bus = bus;
        }
        // GET api/values
        [HttpGet]
        public async Task<IEnumerable<string>> GetAsync()
        {
            await _bus.Publish<IProductPriceChanged>(new
            {
                ProductId=100,
                NewPrice=1999,
                OldPrice=2000,
            });
            return new string[] { "價格更改" };
        }

到此發布端已經完成了 

MassTransit.Basket站點

同樣引入包,並在Configure方法中添加訂閱的代碼   

var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
            {
                var host = sbc.Host(new Uri($"rabbitmq://{Configuration["RabbitMQ:HostName"]}"), h =>
                {
                    h.Username(Configuration["RabbitMQ:UserName"]);
                    h.Password(Configuration["RabbitMQ:Password"]);
                });
                sbc.ReceiveEndpoint(host, "ProductPriceChangedQueue", e =>
                {
                    e.Consumer<ProductPriceChangedConsumer>();
                });

            });
            // start/stop the bus with the web application
            applicationLifetime.ApplicationStarted.Register(bus.Start);
            applicationLifetime.ApplicationStopped.Register(bus.Stop);

消費端 你可以使用其他方式寄宿比如控制台等;

最后運行兩個站點看下輸出

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM