RabbitMQ (七) 訂閱者模式之主題模式 ( topic )


主題模式和路由模式很像

路由模式是精確匹配

主題模式是模糊匹配

 

 

依然先通過管理后台添加一個交換機.

生產者

    public class Producer
    {

        private const string ExchangeName = "test_exchange_topic";

        public static void Send()
        {
            //獲取一個連接
            IConnection connection = ConnectionHelper.GetConnection();

            //從連接中獲取一個通道
            IModel channel = connection.CreateModel();

            //聲明交換機
            //channel.ExchangeDeclare(ExchangeName, "topic", false, false, null);

            //每次只向消費者發送一條消息,消費者使用后,手動確認后,才會發送另外一條
            channel.BasicQos(0, 1, false);
            
            string msg = "hello world ";

            //發送消息,只發送到路由鍵為"product.delete" 或者 "product.#"的隊列.
            //# 匹配一個或多個
            //* 匹配一個
            //上限為 255 個字節
            channel.BasicPublish(ExchangeName, "product.delete", null, Encoding.Default.GetBytes(msg));
            Console.WriteLine($"send {msg}");

            channel.Close();
            connection.Close();
        }
    }

 

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_exchange1_queue";
        private const string ExchangeName = "test_exchange_topic";

        public static void Receive()
        {
            //獲取連接
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //創建通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //聲明隊列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //將隊列綁定到交換機上
            channel.QueueBind(QueueName, ExchangeName, "product.add", null);
            channel.QueueBind(QueueName, ExchangeName, "product.update", null);
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + str);
                channel.BasicAck(e.DeliveryTag, false);//手動應答
            };

            //監聽隊列
            //bool autoAck = true;//自動確認,一旦mq將消息分發給了消費者,就會從內存中刪除該消息
            bool autoAck = false;//手動應答.
            channel.BasicConsume(QueueName, autoAck, "", false, false, null, consumer);
        }
    }

 

消費者2

    public class Consumer2
    {
        private const string QueueName = "test_exchange2_queue";
        private const string ExchangeName = "test_exchange_topic";
        public static void Receive()
        {
            //獲取連接
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //創建通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //聲明隊列
            channel.QueueDeclare(QueueName, false, false, false, null);
            
            //將隊列綁定到交換機上
            channel.QueueBind(QueueName, ExchangeName, "product.#", null);         //添加消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("         consumer2 : " + str);
                channel.BasicAck(e.DeliveryTag, false);//手動應答
            };

            //監聽隊列
            //bool autoAck = true;//自動確認,一旦mq將消息分發給了消費者,就會從內存中刪除該消息
            bool autoAck = false;//手動應答.
            channel.BasicConsume(QueueName, autoAck, "", false, false, null, consumer);
        }
    }

 

運行結果:

 

 

由於消費者1的路由鍵只有 "product.add" 和 "product.update" ,不包含"product.delete",

而消費者2的路由鍵是"product.#",可以模糊匹配上"product.delete",

所以交換機只會把消息轉發到消費者2聲明的隊列中.

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM