RabbitMQ六種隊列模式-發布訂閱模式


前言

RabbitMQ六種隊列模式-簡單隊列
RabbitMQ六種隊列模式-工作隊列
RabbitMQ六種隊列模式-發布訂閱 [本文]
RabbitMQ六種隊列模式-路由模式
RabbitMQ六種隊列模式-主題模式

上文的工作隊列模式是直接在生產者與消費者里聲明好一個隊列,這種情況下消息只會對應同類型的消費者。

顯然這種只處理同種類型的消息是有弊端的。

舉個用戶注冊的列子

門戶網站,用戶在注冊完后一般都會發送消息通知用戶注冊成功(失敗)。

如果在一個系統中,用戶注冊信息有郵箱、手機號,那么在注冊完后會向郵箱和手機號都發送注冊完成信息(假設都發送)。

利用 MQ 實現業務異步處理,如果是用工作隊列的話,就會聲明一個注冊信息隊列。注冊完成之后生產者會向隊列提交一條注冊數據,消費者取出數據同時向郵箱以及手機號發送兩條消息。但是實際上郵箱和手機號信息發送實際上是不同的業務邏輯,不應該放在一塊處理。

這個時候就可以利用發布/訂閱模式將消息發送到轉換機(EXCHANGE),聲明兩個不同的隊列(郵箱、手機),並綁定到交換機。這樣生產者只需要發布一次消息,兩個隊列都會接收到消息發給對應的消費者,大致如下圖所示。

在應用中,只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。

文章目錄

1. 什么是發布訂閱模式2. 代碼部分2.1 生產者2.2 郵件消費者2.3 短信消費者2.4 運行截圖3. 總結

1. 什么是發布訂閱模式

簡單解釋就是,可以將消息發送給不同類型的消費者。做到發布一次,消費多個。下圖取自於官方網站(RabbitMQ)的發布/訂閱模式的圖例:

P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。

下面代碼部分會展示郵件、短信的例子,通過綁定到一個交換機,但是

2. 代碼部分

2.1 生產者

public class ProducerFanout {

    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        /** 1.創建新的連接 */
        Connection connection = MQConnectionUtils.newConnection();
        /** 2.創建通道 */
        Channel channel = connection.createChannel();
        /** 3.綁定的交換機 參數1交互機名稱 參數2 exchange類型 */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /** 4.發送消息 */
        for (int i = 0; i < 10; i++)
        {
            String message = "用戶注冊消息:" + i;
            System.out.println("[send]:" + message);
            //發送消息
            channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes("utf-8"));
            try {
                Thread.sleep(5 * i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /** 5.關閉通道、連接 */
        channel.close();
        connection.close();
        /** 注意:如果消費沒有綁定交換機和隊列,則消息會丟失 */
    }

}

代碼補充,channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8")); 其中第二個參數為空類似於表示全局廣播,只要綁定到該隊列上的消費者理論上是都可以收到的。

2.2 郵件消費者

public class ConsumerEmailFanout {

    private static final String QUEUE_NAME = "consumerFanout_email";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("郵件消費者啟動");
        /* 1.創建新的連接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.創建通道 */
        Channel channel = connection.createChannel();
        /* 3.消費者關聯隊列 */
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        /* 4.消費者綁定交換機 參數1 隊列 參數2交換機 參數3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException 
{
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息:" + msg);
            }
        };
        /* 5.消費者監聽隊列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

代碼補充, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 中第三個參數置為空時,可以接收到生產者所有的消息(生產者 routingKey 參數為空時)。

2.3 短信消費者

public class ConsumerSMSFanout {

    private static final String QUEUE_NAME = "ConsumerFanout_sms";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消費者啟動");
        /* 1.創建新的連接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.創建通道 */
        Channel channel = connection.createChannel();
        /* 3.消費者關聯隊列 */
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        /* 4.消費者綁定交換機 參數1 隊列 參數2交換機 參數3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException 
{
                String msg = new String(body, "UTF-8");
                System.out.println("消費者獲取生產者消息:" + msg);
            }
        };
        /* 5.消費者監聽隊列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

代碼補充, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 中第三個參數置為空時,可以接收到生產者所有的消息(生產者 routingKey 參數為空時)

2.4 運行截圖

先運行兩個消費者,再運行生產者。如果沒有提前將隊列綁定到交換機,那么直接運行生產者的話,消息是不會發到任何隊列里的。

生產者


短信消費者

郵件消費者

3. 總結

首先相對於工作模式,發布訂閱模式引入了交換機的概念,相對其類型上更加靈活廣泛一些。通過上文我們可以總結如下:

1.生產者不是直接操作隊列,而是將數據發送給交換機,由交換機將數據發送給與之綁定的隊列。從不加特定參數的運行結果中可以看到,兩種類型的消費者(email,sms)都收到相同數量的消息。

  1. 必須聲明交換機,並且設置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),其中 fanout 指分發模式(將每一條消息都發送到與交換機綁定的隊列)。

  2. 隊列必須綁定交換機:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

生產者發送消息到交換機,多個消費者聲明多個隊列,與交換機進行綁定,隊列中的消息可以被所有消費者消費,類似於QQ群消息

案例代碼:https://www.lanzous.com/i5ydu6d

我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下微信公眾號哈:niceyoo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM