RabbitMQ入門:發布/訂閱(Publish/Subscribe)


在前面的兩篇博客中

遇到的實例都是一個消息只發送給一個消費者(工作者),他們的消息模型分別為(P代表生產者,C代表消費者,紅色代表隊列):

這次我們來看下將一個消息發送給多個消費者(工作者),這種模式一般被稱為“發布/訂閱”模式。其工作模型為(P代表生產者,X代表Exchange(路由器/交換機),C代表消費者,紅色代表隊列):

我們發現,工作模型中首次出現路由器,並且每個消費者有單獨的隊列。生產者生成消息后將其發送給路由器,然后路由器轉送到隊列,消費者各自到自己的隊列里面獲取消息進行消費。在實際的應用場景中,生產者一般不會直接將消息發送給隊列,而是發送給路由器進行中轉,Exchange必須清楚的知道怎么處理收到的消息:是將消息發送到一個特定隊列還是多有隊列,或者直接廢棄消息。這種才符合RabbitMQ消息模型的核心思想

接下來我們詳細展開今天的話題:

一、Exchange

Exchange在我們的工作模型中首次出現,因此需要詳細介紹下。

Exchange分為4種類型:

Direct:完全根據key進行投遞的,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。
Topic:對key進行模式匹配后進行投遞,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不需要key,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
Headers:我們可以不考慮它。

今天我們的實例采用fanout類型的exchange。

盡管首次出現,但是其實我們前面的案例中也有用到exchange,只是我們沒有給他名字,用的是RabbitMQ默認的,比如下面這段代碼,我們將路由器名這個參數傳入了“”,如果我們需要自己聲明exchange的話,這個就不能傳入“”了,而是傳入自己定義好的值。

二、臨時隊列

前面兩篇博客中,我們都在使用隊列的時候給出了定義好的名字,這在生產者和消費者共用相同隊列的時候很有必要,但是我們有了exchange,生產者不需要知道有哪些隊列,因此隊列名字可以不用指定了,而是通過RabbitMQ 接口自己去生成臨時隊列,隊列名字也由RabbitMQ自動生成。通過

可以聲明一個非持久的、通道獨占的、自動刪除的隊列,getQueue()方法可以獲取隨機隊列名字。這個名字用來在隊列和exchange之間建立binding關系的時候使用:

 

三、代碼實現

基於上面exchange和臨時隊列的知識鋪墊,可以展開今天的代碼實現了。

  1.  生產者
    public class Product {
        //exchange名字
        public static String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明exchange和exchange的類型
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                
                String msg = " hello rabbitmq, this is publish/subscribe mode";
                // 3.發送消息到指定的exchange,隊列指定為空,由exchange根據情況判斷需要發送到哪些隊列
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉連接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

     

  2. 消費者1
    public class Consumer1 {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明exchange以及exchange類型
                channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                // 3.創建隨機名字的隊列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和隊列的綁定關系
                channel.queueBind(queueName, Product.EXCHANGE_NAME, "");
                System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
                // 5.通過回調生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取消息內容然后處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費消息
                channel.basicConsume(queueName, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

     

  3. 消費者2,核心代碼同消費者1一樣,只是在日志打印上將"Consumer1"改為"Consumer2"而已。這里不再列出具體代碼。
  4. 先運行消費者1和2,然后運行生產者,觀察控制台log打印情況:
    生產者:
    product send a msg:  hello rabbitmq, this is publish/subscribe mode
    
    消費者1**** Consumer1 keep alive ,waiting for messages, and then deal them
    *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]
    
    消費者2: **** Consumer2 keep alive ,waiting for messages, and then deal them
    *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

    可以看到,當生產者發出消息后,兩個消費者最終都收到了消息。

  5. 我們去查看RabbitMQ管理頁面:

    在Exchanges 標簽頁里面多了一個名為“exchange”的路由器,他的類型是fanout。點exchange 的link進入詳細頁面:

    發現在binding項目中有了兩條綁定關系,隊列的名字也可以看到。將頁面切換到Queues標簽頁:

    出現了兩個新的隊列,隊列名字和綁定關系中的一樣,並且隊列都是自動刪除的、通道獨占的。

  6. 然后將消費者1和消費者2都停掉,重新查看管理頁面,我們發現exchange還在,binding關系不存在了,臨時隊列也自動刪除了

     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM