RabbitMQ的主題(Topic)模式(五)


目錄

本系列向大家介紹RabbitMQ的簡單用法;

1. RabbitMQ的簡單實踐

2. RabbitMQ的輪詢模式和公平分發

3. RabbitMQ的發布訂閱模式(Publish/Subscribe)

4. RabbitMQ路由模式(Routing)

5. RabbitMQ的主題(Topic)模式

一、定義

我們先來看一張圖:

這張圖有幾個不同的routingkey,每個routingkey都是由兩段組成的;
下面我們來編寫代碼來實現相關功能;

二、生產者生產消息

生產者發送四條消息,routingkey不相同,代碼如下:

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("msg send start!");
            TopicPublisher();
            Console.WriteLine("msg send end!");
        }
        static string EXCHANGE_NAME = "routing_exchange_topic";
        static void TopicPublisher()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
            var routingKeyList = new List<string>() { "usa.news", "usa.weather", "europe.news", "europe.weather" };
            for (var i = 0; i < routingKeyList.Count; i++)
            {
                var msg = $"hello topic {routingKeyList[i]}!";

                channel.BasicPublish(
                    exchange: EXCHANGE_NAME,
                    routingKey: routingKeyList[i],
                    basicProperties: null,
                    body: Encoding.UTF8.GetBytes(msg));
                Console.WriteLine($"消息發送:" + msg);
            }
            Console.WriteLine($"消息發送完成!");
            channel.Close();
            conn.Close();
        }

    }

結果運行如圖:

三、消費者

3.1 消費者1,綁定的routingkey為europe.#,代碼如下:

        static string EXCHANGE_NAME = "routing_exchange_topic";
        static string QUEUE_NAME = "routing_queue_topic1";
        static void TopicConsumer1()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
            channel.QueueDeclare(queue: QUEUE_NAME);
            // 綁定routingKey
            string routingKey = "europe.#";
            channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine($"Topic Consumer1 收到消息: {message},時間:{DateTime.Now}");
            };
            channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
            Console.ReadKey();
            channel.Close();
            conn.Close();
        }
    }

3.2 消費者2,綁定的routingkey為#.weather,代碼如下:

        static string EXCHANGE_NAME = "routing_exchange_topic";
        static string QUEUE_NAME = "routing_queue_topic2";
        static void TopicConsumer2()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Topic);
            channel.QueueDeclare(queue: QUEUE_NAME);
            string routingKey = "#.weather";
            // 可以綁定多個routingKey         
            channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, routingKey: routingKey);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine($"Topic Consumer2 收到消息: {message},時間:{DateTime.Now}");
            };
            channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
            Console.ReadKey();
            channel.Close();
            conn.Close();
        }

3.3 查看運行結果

(1)隊列綁定到交換機的結果如圖:

(2)消費者1的接收到的消息:

(3)消費者2的接收到的消息:

通過結果我們可以看到,消費者1和消費者2分別接收到了不同key的消息;

四、小結

1、topic模式,我們可以通過模糊匹配的,來接收自己想要的消息;使用Topic模式生產者在聲明隊列時需要制定消息到達隊列方式為topic;
2、Consumer消費者,匹配消息時:#匹配一個或者多個關鍵字,*匹配一個關鍵字
3、參考文檔:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM