基於Masstransit實現Eventbus的功能


Masstransit 是一個非常優秀的基於消息進行通信的分布式應用程序框架,詳情參考官網

在介紹AA.ServiceBus開源地址:https://github.com/ChengLab/AAFrameWork  之前,先介紹下幾個概念.

分布式

分布式系統如何定義?這里引用一下Distributed Systems Concepts and Design(Third Edition)中的一句話:"A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages"(分布式系統是指位於聯網計算機上的組件僅通過傳遞消息來通信和協調其操作的系統)。從這句話里面我們可以看到幾個重點:
1、組件分布在網絡計算機上
2、組件之間僅僅通過消息傳遞來通信並協調行動
嚴格講,同一個分布式系統中的計算機在空間部署上是可以隨意分布的,這些計算機可能被放在不同的機櫃上,也可能在不同的機房中,甚至分布在不同的城市。

 中間件

中間件是介於操作系統和在其上運行的應用程序之間的軟件。中間件實質上充當隱藏轉換層,實現了分布式應用程序的通信和數據管理。它有時被稱為管道,因為它將兩個應用程序連接在一起,使數據和數據庫可在“管道”間輕松傳遞。參考Azure

常見的中間件比如:遠程過程調用中間件,消息中間件,數據庫訪問中間。 

消息中間件

Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems. 

面向消息的中間件(MOM)是支持在分布式系統之間發送和接收消息的軟件或硬件基礎設施 

 

AA.ServiceBus 介紹

AA.ServiceBus 是基於MassTransit的消息中間件,提供點對點和發布訂閱的通信方式。這兩個之間的區別:

  •   端點對端點通信 該消息僅處理一次 並且被一個消費者處理。

 例如命名模式 命令告訴服務做某事,推薦動詞-名詞順序的命名風格:如提交訂單命令(SubmitOrder)

  •  發布訂閱通信 可以被多個訂閱者進行消費處理。

例如事件驅動模式 事件意味着某事已經發生了,推薦以名詞-動詞(過去時態)順序的命名風格,表明發生了某事。示例訂單提交過了事件 OrderSubmitted

 目前實現消息中間件有多種方式,參考微服務.NET:容器化應用架構指南 如圖

AA.ServiceBus 快速開始 

   實例我們創建兩個控制台程序生產者、消費者分別命名ServiceBus.ProducersServiceBus.Consumers,然后在創建一個消息契約類庫命名為ServiceBus.MsgContract ,分別被生產者和消費者引用。     

1.在消息契約類庫中創建兩個消息 分別是 提交訂單 SubmitOrder 和 訂單已提交OrderSubmitted代碼如下

 public interface OrderSubmitted
    {
        long Id { get; set; }
        decimal OrderPrice { get; set; }
    }

   public interface SubmitOrder
    {
        long Id { get; set; }
        Decimal OrderPrice { get; set; }
    }

2.在生產者控制台項目中安裝Install-Package AA.ServiceBus -Version 1.0.0,生產者主要對消息的構造然后進行發送或發布;

public class Producer
    {
        public static void TestProducer()
        {
            //rabbitmq 配置
            string rabbitMqUri = "rabbitmq://localhost:5672";
            string rabbitMqUserName = "your";
            string rabbitMqPassword = "your";


            PulishEvent(rabbitMqUri, rabbitMqUserName, rabbitMqPassword);
            SendCommand(rabbitMqUri, rabbitMqUserName, rabbitMqPassword);
        }

        /// <summary>
        /// 發布事件
        /// </summary>
        /// <param name="rabbitMqUri"></param>
        /// <param name="rabbitMqUserName"></param>
        /// <param name="rabbitMqPassword"></param>
        private static void PulishEvent(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
        {
            IBusControl busControl = ServiceBusManager.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                         .BuildEventProducer();

            TaskUtil.Await(busControl.Publish<OrderSubmitted>(new
            {
                Id = 1010,
                OrderPrice = 1024
            }));
        }
        /// <summary>
        /// 發送命令
        /// </summary>
        /// <param name="rabbitMqUri"></param>
        /// <param name="rabbitMqUserName"></param>
        /// <param name="rabbitMqPassword"></param>
        private static void SendCommand(string rabbitMqUri, string rabbitMqUserName, string rabbitMqPassword)
        {
            string queueName = "submitorder.queue";

            ISendEndpoint busControl = ServiceBusManager.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
                         .BuildCommandProducer(queueName);

            TaskUtil.Await(busControl.Send<SubmitOrder>(new
            {
                Id = 1010,
                OrderPrice=1024
            }));
        }
    }

     

 3.在消費者控制台項目中安裝Install-Package AA.ServiceBus -Version 1.0.0,生產者需要創建對應的消費者進行處理消息,只需要繼承IConsumer接口即可

 

public class Consumer
    {
        public static void TestConsumer()
        {
            //rabbitmq 配置
            string rabbitMqUri = "rabbitmq://localhost:5672";
            string rabbitMqUserName = "your";
            string rabbitMqPassword = "your";
            string queueName = "submitorder.queue";

            var busControl = ServiceBusManager.Instance.UseRabbitMq(rabbitMqUri, rabbitMqUserName, rabbitMqPassword)
             .RegisterConsumer<SubmitOrderCommandConsumer>(queueName)//注冊提交訂單命令消費者
             .RegisterConsumer<OrderSubmittedEventConsumer>(null)   //注冊訂單已創建事件消費者
             .Build();
            busControl.Start();
        }
    }
    /// <summary>
    ///訂單已經提交了  事件消費者
    /// </summary>
    public class OrderSubmittedEventConsumer : IConsumer<OrderSubmitted>
    {
        public async Task Consume(ConsumeContext<OrderSubmitted> context)
        {
            var @event = context.Message;
            Console.WriteLine($"接收到訂單創建了事件消息單價:{@event.OrderPrice}");
            //do somethings...
        }
    }

    /// <summary>
    /// 提交訂單 命令消費者
    /// </summary>
    public class SubmitOrderCommandConsumer : IConsumer<SubmitOrder>
    {
        public async Task Consume(ConsumeContext<SubmitOrder> context)
        {
            var command = context.Message;
            Console.WriteLine($"接收到了創建訂單命令消息單價:{command.OrderPrice}");
            //do somethings...
        }
    }

運行消費者和生產者控制台 輸出如下:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM