RabbitMQ六種隊列模式-簡單隊列模式


前言

RabbitMQ六種隊列模式-簡單隊列 [本文]
RabbitMQ六種隊列模式-工作隊列
RabbitMQ六種隊列模式-發布訂閱
RabbitMQ六種隊列模式-路由模式
RabbitMQ六種隊列模式-主題模式

在官網的教程中,描述了如上六類工作隊列模式:

  1. 簡單隊列模式:最簡單的工作隊列,其中一個消息生產者,一個消息消費者,一個隊列。也稱為點對點模式
  2. 工作模式:一個消息生產者,一個交換器,一個消息隊列,多個消費者。同樣也稱為點對點模式
  3. 發布/訂閱模式:無選擇接收消息,一個消息生產者,一個交換器,多個消息隊列,多個消費者。稱為發布/訂閱模式
  4. 路由模式:在發布/訂閱模式的基礎上,有選擇的接收消息,也就是通過 routing 路由進行匹配條件是否滿足接收消息。
  5. 主題模式:同樣是在發布/訂閱模式的基礎上,根據主題匹配進行篩選是否接收消息,比第四類更靈活。
  6. RPC模式:與上面其他5種所不同之處,類模式是擁有請求/回復的。也就是有響應的,上面5種都沒有。

接下來幾篇文章一起來看看這幾種隊列模式吧,本篇為簡單隊列模式。

文章目錄

1 實現功能2 構建項目2.1 導入依賴2.2 封裝Connection3 生產者4 消費者5 測試環節5.1 啟動RabbitMQ5.2 創建vhost5.3 依次運行Producer\Customer6 簡單隊列總結

1 實現功能

功能描述:一個生產者 P 發送消息到隊列 Q,一個消費者 C 接收

P 表示為生產者 、C 表示為消費者,紅色表示隊列。

2 構建項目

創建一個簡單的maven項目

  • rabbitmq 父工程
    -- common 存放公用工具類
    -- customer 消費者
    -- producer生產者

2.1 導入依賴

在 rabbitmq 父工程 pom.xml 導入 maven 依賴

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
</dependencies>

2.2 封裝Connection

common模塊中封裝 rabbitmq 連接類

public class MQConnectionUtils {

    public static Connection newConnection() throws IOException, TimeoutException {
        /** 1.定義連接工廠 */
        ConnectionFactory factory = new ConnectionFactory();
         /** 2.設置服務器地址 */
        factory.setHost("127.0.0.1");
         /** 3.設置協議端口號 */
        factory.setPort(5672);
         /** 4.設置vhost */
        factory.setVirtualHost("test001_host");
         /** 5.設置用戶名稱 */
        factory.setUsername("guest");
         /** 6.設置用戶密碼 */
        factory.setPassword("guest");
         /** 7.創建新的連接 */
        Connection newConnection = factory.newConnection();
        return newConnection;
    }

}

3 生產者

生產者負責創建消息隊列並發送消息入列,簡單分為5步:

  1. 獲取連接
  2. 創建通道
  3. 創建隊列聲明
  4. 發送消息
  5. 關閉隊列
public class Producer {

    /** 隊列名稱 */
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        /** 1.獲取連接 */
        Connection newConnection = MQConnectionUtils.newConnection();
         /** 2.創建通道 */
        Channel channel = newConnection.createChannel();
         /** 3.創建隊列聲明 */
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        String msg = "我是生產者生成的消息";
        System.out.println("生產者發送消息:" + msg);
         /** 4.發送消息 */
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        channel.close();
        newConnection.close();
    }
}

4 消費者

消費者實現和生產者實現過程差不多,但是沒有關閉連接和通道,是因為要消費者一直等待隨時可能發來的消息,大致分為如下3步:

  1. 獲取連接
  2. 獲取通道
  3. 監聽隊列
public class Customer {

    /** 隊列名稱 */
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("002");
        /** 1.獲取連接 */
        Connection newConnection = MQConnectionUtils.newConnection();
         /** 2.獲取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException 
{
                String msgString = new String(body, "UTF-8");
                System.out.println("消費者獲取消息:" + msgString);
            }
        };
         /** 3.監聽隊列 */
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }

}

5 測試環節

至此,整個項目代碼寫完了,接下來就是測試環節

5.1 啟動RabbitMQ

5.2 創建vhost

5.3 依次運行Producer\Customer

Producer生產者啟動

Rabbit管理平台,三條消息

Customer消費者啟動

6 簡單隊列總結

簡單隊列也稱為點對點,即一個生產者對應一個消費者,生產者發送消息到隊列,消費者在隊列中取出消息消費。

生產者大致步驟:

  1. 獲取連接
  2. 創建通道
  3. 創建隊列聲明
  4. 發送消息
  5. 關閉隊列

消費者大致步驟:

  1. 獲取連接
  2. 獲取通道
  3. 監聽隊列
  4. 消費消息

簡單隊列雖然簡單,但是有一些不足,比如這種點對點無疑在復雜情況下會產生大量冗余代碼,繼續看下一篇工作隊列吧。

案例代碼:https://www.lanzous.com/i5ydu6d

我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下微信公眾號哈:niceyoo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM