RabbitMQ入門:路由(Routing)


在上一篇博客《RabbitMQ入門:發布/訂閱(Publish/Subscribe)》中,我們認識了fanout類型的exchange,它是一種通過廣播方式發送消息的路由器,所有和exchange建立的綁定關系的隊列都會接收到消息。但是有一些場景只需要訂閱到一部分消息,這個時候就不能使用fanout 類型的exchange了,這個就引出來今天的“豬腳”--Direct Exchange,通過Routing Key來決定需要將消息發送到哪個或者哪些隊列中。

接下來請收看詳細內容:

  1. Direct Exchange(直接路由器)
  2. 多重綁定
  3. 代碼實例

一、Direct Exchange(直接路由器)

在上文中介紹exchange的時候,對direct exchange進行了簡單介紹,它是一種完全按照routing key(路由關鍵字)進行投遞的:當消息中的routing key和隊列中的binding key完全匹配時,才進行會將消息投遞到該隊列中。這里提到了一個routing key和binding key(綁定關鍵字),是什么東東?

  1. routing key:

     在發送消息的時候,basicPublish的第二個參數就是routing key,由於上次是fanout 類型的exchange 進行廣播方式投遞,這個字段不會影響投遞結果,因此我們這里就傳入了“”,但是在direct 類型的exchange中我們就不能傳入""了,需要指定具體的關鍵字。

  2. binding key:

    我們在前文中建立綁定關系的時候,queueBind的第三個參數就是綁定關鍵字

我們聲明direact exchange的時候使用:


二、多重綁定

多個隊列相同的綁定鍵綁定到同一個路由器的情況,我們稱之為多重綁定

工作模型為(P代表生產者,X代表路由器,紅色的Q代表隊列,C代表消費者):

 

三、代碼實例

 預備知識了解完了,現在來寫個程序感受下。

  1. 生產者
    public class LogDirectSender {
        // exchange名字
        public static String EXCHANGE_NAME = "directExchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明direct類型的exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
                
                // 3.發送消息到指定的exchange,隊列指定為空,由exchange根據情況判斷需要發送到哪些隊列
                String routingKey = "debug";
                String msg = " hello rabbitmq, I am " + routingKey;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉連接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }

    和上次博客中生產者的區別就是黑字粗體部分:1.路由器類型改為direct 2.消息發布的時候指定了routing key

  2. 消費者
    public class LogDirectReciver {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明direct類型的exchange
                channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                // 3.創建隨機名字的隊列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和隊列的綁定關系
                 String[] bindingKeys = { "error", "info", "debug" };
    //            String[] bindingKeys = { "error" };
                for (int i = 0; i < bindingKeys.length; i++) {
                    channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]);
                    System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]);
                }
    
                // 5.通過回調生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取消息內容然后處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費消息
                channel.basicConsume(queueName, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    和上次博客中消費者的區別就是黑字粗體部分:1.路由器類型改為direct 2.建立綁定關系的時候指定了binding key

  3. 執行消費者,控制台log打印如下:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug

    這個消費者我們視為消費者1,它會接收error,info,debug三個關鍵字的消息。

  4. 將String[] bindingKeys = { "error", "info", "debug" };改為String[] bindingKeys = { "error" };,然后再運行一次消費者。控制台log打印如下:
     **** LogDirectReciver keep alive ,waiting for error

    這個消費者我們視為消費者2,它只會接收error 關鍵字的消息。

  5. 執行生產者,然后將String routingKey = "debug";的值分別改為“info"和"error",然后分別執行,這樣一共執行了三次生產者
    第一次執行:
    product send a msg:  hello rabbitmq, I am debug
    
    第二次執行:
    product send a msg:  hello rabbitmq, I am info
    
    第三次執行:
    product send a msg:  hello rabbitmq, I am error
  6. 再次查看兩個消費者的控制台log:
    消費者1:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug
    *********** LogDirectReciver get message :[ hello rabbitmq, I am debug]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am info]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
    
    消費者2:
     **** LogDirectReciver keep alive ,waiting for error
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]

     

  7. 查看RabbitMQ管理頁面

    exchanges標簽頁里面多了個direct類型的路由器。進入詳細頁面:

    有4個綁定關系,其中三個的隊列是同一個。切換到Queues標簽頁:

    有兩個臨時隊列。

  8. 如果關掉消費者1和消費者2,會發現隊列自動刪除了,綁定關系也不存在了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM