RabbitMQ-5 Queue隊列和Message詳細使用


1、導讀
本章節主要介紹Queue隊列和消息Message的詳細介紹。包含構建過程中各參數的詳細解釋,話不多說我們開始吧

2、Queue隊列
在第一章 初識RabbitMQ 中我們簡單介紹了Queue,Queue的主要是作用於存儲消息;在之前的樣例中大家應該能看到如何構建一個Queue,即queueDeclare方法

queueDeclare
queueDeclare 有兩個重載方法

Queue.DeclareOk queueDec1are() throws IOException;

第一種不帶任何參數的queueDeclare 方法默認創建一個由RabbitMQ 命名的(類似這種amq.gen-LhQzlgv3GhDOv8PIDabOXA 名稱,這種隊列也稱之為匿名隊列〉、排他的、自動刪除的、非持久化的隊列。

Queue.DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive,
boolean autoDelete, Map<String,Object> arguments) throws IOException;

第二種方法參數詳解:

queue :隊列的名稱

durable :設置是否持久化。為true 則設置隊列為持久化。持久化的隊列會存盤,在服務器重啟的時候可以保證不丟失相關信息

exclusive :設置是否排他(獨占隊列)。為true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:排他隊列是基於連接( Connection) 可見的,同一個連接的不同信道(Channel)是可以同時訪問同一連接創建的排他隊列; "首次"是指如果一個連接己經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景

autoDelete :設置是否自動刪除,當最后一個監聽被移除后,自動刪除隊列;也就是說至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除

arguments :設置隊列的一些其他參數;

 

queueBind

在之前的代碼樣例種我們能看到queueBind這個方法,它的主要作用是將交換機和隊列進行綁定;我們主要講解其兩個構造方法的參數詳解

Queue.BindOk queueBind(String queue , String exchange , String routingKey)
Queue.BindOk queueBind(String queue , String exchange , String routingKey , Map<String, Object> arguments)

方法中涉及的參數詳解:

  • queue :聲明的隊列名稱
  • exchange : 聲明的交換器名稱
  • routingKey :用來綁定隊列和交換器的路由鍵
  • argument :定義綁定的一些參數

queueUnbind

同理既有綁定操作也必然有解綁操作,方法如下,參數和綁定都是一樣的裂解,不再碼字了~

Queue.BindOk queueUnBind(String queue , String exchange , String routingKey)
Queue.BindOk queueUnBind(String queue , String exchange , String routingKey , Map<String, Object> arguments)

 

3、Massage 消息

它是服務器和應用程序間傳輸的數據,我們在前面的樣例代碼種可以看到發送消息的樣例;Channel.basicPublish 方法,它有幾個重載方法

(1) void basicPublish (String exchange , String routingKey, BasicProperties props ,byte[) body) throws IOException ;

(2) void basicPublish (String exchange , String routingKey, boolean mandatory, BasicProperties props ,byte[) body) throws IOException ;

(3) void basicPublish (String exchange , String routingKey, boolean mandatory, boolean immediate, BasicProperties props ,byte[) body) throws IOException ;

具體參數詳細介紹如下:

exchange :交換器的名稱,指明消息需要發送到哪個交換器中,如果設置為空,則默認發送到RabbitMQ 默認的交換器中
routingKey : 路由鍵
mandatory :當mandatory 參數設為true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,那么RabbitMQ 會調用Basic.Return 命令將消息返回給生產者。當mandatory參數設置為false 時,出現上述情形,則消息直接被丟棄(再往后的教程中再詳細講解)
immediate :當imrnediate參數設為true時,如果交換器在將消息路由到隊列時發現隊列上並不存在任何消費者,那么這條消息將不會存入隊列中。當與路由鍵匹配的所有隊列都沒有消費者時,該消息會通過Basic.Return返回至生產者
props :消息的基本屬性集,分別有
1、contentType (消息類型)
2、contentEncoding (消息編碼)
3、headers( Map<String ,Object>) (自定義的一些消息數據)
4、deliveryMode (消息是否持久化 = 2)
5、priority (消息優先級)
6、correlationld (可以理解為消息的唯一ID)
7、replyTo (消息重回隊列)
8、expiration (過期時間)
9、messageld
10、timestamp
11、type
12、userld
13、appld
14、clusterld
body :消息體( payload ) 需要發送的消息

接下來我們還是用一段代碼來加深理解

public class MessageProducer {

    public static void main(String[] args) throws Exception {
        //1 創建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 創建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創建Channel
        Channel channel = connection.createChannel();
        //4 聲明
        String routingKey = "test.direct";
        //5 發送
        Map<String, Object> headers = new HashMap<>();
        headers.put("param1", "111");
        headers.put("param1", "222");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2) //設置持久化
                .contentEncoding("UTF-8")
                .expiration("10000") //設置10秒過期
                .headers(headers) //傳遞自定義headers
                .build();
        String msg = "Test Message";
        //exchangeName 為空 默認走RabbitMQ默認交換機
        channel.basicPublish("", routingKey , properties , msg.getBytes());
        channel.close();
        connection.close();
    }
    
}
View Code

————————————————
版權聲明:本文為CSDN博主「傲泣龍騰」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/lhmyy521125/java/article/details/87875696


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM