RabbitMQ消息發送與接收-詳細用法


RabbitMQ消息發送與接收

 

1.簡介

  所有MQ產品從模型抽象上來說都是一樣的過程。消費者訂閱某個隊列。生產者創建消息,然后發布到隊列,最后將消息發送到監聽的消費者。

   AMQP(Advanced message queuing protocol)是一個提供統一消息服務的應用層標准協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端、中間件等不同產品,不同開發語言等條件的限制。

  ActiveMQ是基於JMS(Java Message Service)協議的消息中間件。區別如下:

 

 Rabbit模型如下:

 1.Message。消息,是不具體的。由消息頭和消息體組成。消息體是不透明的,而消息頭是一系列可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、delivery-mode(是否持久存儲)等

2.Publisher。消息的生產者,也是一個向交換機發布消息的客戶端應用程序。

3.Exchanger。交換機,用來接收生產者發布的消息並將這些消息路由給服務器中的隊列。

4.Binging。綁定,用於消息隊列和交換器之間的管理。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則。所以可以將交換器理解成一個由綁定構成的路由表。

5.Queue。消息隊列,用來保存消息知道發送給消費者。一個消息可投入一個或對個隊列。

6.Connection。網絡連接,比如一個TCP連接。

7.Channel。信道,多路復用連接中的一條獨立的雙向數據流通道,可讀可寫。一個Connection包括多個channel。因為對於操作系統來說建立和銷毀TCP是非常昂貴的開銷,所以引入信道的概念,以復用一條TCP連接。

8.Consumer。消費者,從消息隊列取得消息的客戶端應用程序。

9.VirtualHost。虛擬主機。表示一批交換機、消息隊列和相關對象。vhost本質上是一個mini版的RabbitMQ服務器,擁有自己的隊列、綁定、交換器和權限控制;vhost通過在各個實例間提供邏輯上分離,允許你為不同應用程序安全保密地運行數據;vhost是AMQP概念的基礎,必須在連接時進行指定,RabbitMQ包含了默認vhost:“/”。

10.Borker。表示消息隊列服務器實體。表示啟動一個rabbitmq所包含的進程。

2.使用

1.簡單隊列模式

不涉及交換機的模型如下:

pom文件引入如下依賴:

復制代碼
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
復制代碼

1.消息生產者 

復制代碼
package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            String message = "測試消息";
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            createChannel.basicPublish("", "myQueue", null, message.getBytes());
            
            System.out.println("消息發送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (createChannel != null) {
                createChannel.close();
            }
            if (newConnection != null) {
                newConnection.close();
            }
        }
        
    }
}
復制代碼

  注意:5672是rabbitmq暴露的端口,15672是management插件的端口。

發送成功之后可以從15672端口查看,也可以從15672進行消費,如下:

 2.消息接收

復制代碼
package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            /**
             * 接收消息。會持續堅挺,不能關閉channel和Connection
             * 參數一:隊列名稱
             * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息
             * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。
             * 參數四:消息接收者
             */
            createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    String string = new String(body, "UTF-8");
                    System.out.println("接收到d消息: -》 " + string);
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}
復制代碼

  注意:消息的確認模式可以為自動也可以為手動,自動確認讀取完會自動從隊列刪除;手動需要自己ack,如果設為手動也沒ack可能會造成消息重復消費。

  如果是多個消費者,會從隊列以輪詢的方式處理消息,這種稱為工作隊列模式。

補充:這種實際也是用了rabbitmq的一個默認交換機,routing_key為隊列名稱。也可以理解為是Rabbitmq類型為System的交換機。

測試:修改消費者代碼

復制代碼
            createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    String string = new String(body, "UTF-8");
                    System.out.println(envelope);
                    System.out.println(properties);
                    System.out.println("接收到d消息: -》 " + string);
                }
            });
復制代碼

結果:(可以看出是有路由key的,值為隊列名稱)

Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=myQueue)
#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
接收到d消息: -》 測試消息

2.涉及交換機的發送和接收

  Exchange類型根據分發策略分為四種。direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由鍵,此外headers交換機和direct交換機完全一致,目前幾乎不用。Exchange只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,消息會丟失。所以只能收到監聽之后生產者發送的消息。

抽取工具類:

復制代碼
package rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtils {
    
    public static Connection getConnection() throws Exception {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

}
復制代碼

1.Direct類型交換-單播模式,也成為路由模式(Routing模式)

  精准綁定,消息中的路由鍵(RoutingKey)和Binding的bindingKey一致。

生產者:

復制代碼
package rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class DirectProducer {

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        //8、創建頻道-channel = connection.createChannel()
        Channel channel = connection.createChannel();

        //聲明交換機- channel.exchangeDeclare(交換機名字,交換機類型)
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
        //連續發3條消息
        for (int i = 0; i < 3; i++) {
            String routingKey = "";
            //發送消息的時候根據相關邏輯指定相應的routing key。
            switch (i){
                case 0:  //假設i=0,為error消息
                    routingKey = "log.error";
                    break;
                case 1: //假設i=1,為info消息
                    routingKey = "log.info";
                    break;
                case 2: //假設i=2,為warning消息
                    routingKey = "log.warning";
                    break;
            }
            //10、創建消息-String m = xxx
            String message = "hello,message!" + i;
            //11、消息發送-channel.basicPublish(交換機[默認Default Exchage],路由key[簡單模式可以傳遞隊列名稱],消息其它屬性,消息內容)
            channel.basicPublish("routing_exchange",routingKey,null,message.getBytes("utf-8"));
        }
        //12、關閉資源-channel.close();connection.close()
        channel.close();
        connection.close();
    }
}
復制代碼

消費者一:

復制代碼
package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DirectConsumerOne {

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        //8、創建頻道-channel = connection.createChannel()
        Channel channel = connection.createChannel();
        //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數)
        channel.queueDeclare("routing_queue1",true,false,false,null);

        //隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
        channel.queueBind("routing_queue1", "routing_exchange", "log.error");
        //創建消費者
        Consumer callback = new DefaultConsumer(channel){
            /**
             * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
             * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
             * @param properties  屬性信息(生產者的發送時指定)
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由的key
                String routingKey = envelope.getRoutingKey();
                //獲取交換機信息
                String exchange = envelope.getExchange();
                //獲取消息ID
                long deliveryTag = envelope.getDeliveryTag();
                //獲取消息信息
                String message = new String(body,"utf-8");
                System.out.println(
                        "routingKey:" + routingKey +
                        ",exchange:" + exchange +
                        ",deliveryTag:" + deliveryTag +
                        ",message:" + message);
            }
        };
        /**
         * 消息消費
         * 參數1:隊列名稱
         * 參數2:是否自動應答,true為自動應答[mq接收到回復會刪除消息],設置為false則需要手動應答
         * 參數3:消息接收到后回調
         */
        channel.basicConsume("routing_queue1",true,callback);

        //注意,此處不建議關閉資源,讓程序一直處於讀取消息
    }
}
復制代碼

消費者二:

復制代碼
package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DirectConsumerTwo {

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        //8、創建頻道-channel = connection.createChannel()
        Channel channel = connection.createChannel();
        //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數)
        channel.queueDeclare("routing_queue2",true,false,false,null);

        //隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
        channel.queueBind("routing_queue2", "routing_exchange", "log.error");
        //隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
        channel.queueBind("routing_queue2", "routing_exchange", "log.info");
        //隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
        channel.queueBind("routing_queue2", "routing_exchange", "log.warning");
        //創建消費者
        Consumer callback = new DefaultConsumer(channel){
            /**
             * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
             * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
             * @param properties  屬性信息(生產者的發送時指定)
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由的key
                String routingKey = envelope.getRoutingKey();
                //獲取交換機信息
                String exchange = envelope.getExchange();
                //獲取消息ID
                long deliveryTag = envelope.getDeliveryTag();
                //獲取消息信息
                String message = new String(body,"utf-8");
                System.out.println(
                        "routingKey:" + routingKey +
                        ",exchange:" + exchange +
                        ",deliveryTag:" + deliveryTag +
                        ",message:" + message);
            }
        };
        /**
         * 消息消費
         * 參數1:隊列名稱
         * 參數2:是否自動應答,true為自動應答[mq接收到回復會刪除消息],設置為false則需要手動應答
         * 參數3:消息接收到后回調
         */
        channel.basicConsume("routing_queue2",true,callback);

        //注意,此處不建議關閉資源,讓程序一直處於讀取消息
    }
}
復制代碼

結果:

(1)消費者一

(2)消費者二

 

2.fanout多播模式,也稱為Publish/Scribe模式

  每個發到fanout類型交換器的消息會被分發到所有的隊列中。fanout不處理路由鍵,發消息最快。

兩個消費者:

消費者1:

復制代碼
package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class FanoutConsumerOne {

     public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtils.getConnection();
            //8、創建頻道-channel = connection.createChannel()
            Channel channel = connection.createChannel();
            //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數)
            channel.queueDeclare("fanout_queue1",true,false,false,null);
            //隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
            channel.queueBind("fanout_queue1", "fanout_exchange", "");

            //創建消費者
            Consumer callback = new DefaultConsumer(channel){
                /**
                 * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
                 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
                 * @param properties  屬性信息(生產者的發送時指定)
                 * @param body 消息內容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //路由的key
                    String routingKey = envelope.getRoutingKey();
                    //獲取交換機信息
                    String exchange = envelope.getExchange();
                    //獲取消息ID
                    long deliveryTag = envelope.getDeliveryTag();
                    //獲取消息信息
                    String message = new String(body,"utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                            ",exchange:" + exchange +
                            ",deliveryTag:" + deliveryTag +
                            ",message:" + message);
                }
            };
            /**
             * 消息消費
             * 參數1:隊列名稱
             * 參數2:是否自動應答,true為自動應答[mq接收到回復會刪除消息],設置為false則需要手動應答
             * 參數3:消息接收到后回調
             */
            channel.basicConsume("fanout_queue1",true,callback);

            //注意,此處不建議關閉資源,讓程序一直處於讀取消息
        }
}
復制代碼

消費者二:

復制代碼
package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class FanoutConsumerTwo {

     public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtils.getConnection();
            //8、創建頻道-channel = connection.createChannel()
            Channel channel = connection.createChannel();
            //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數)
            channel.queueDeclare("fanout_queue2",true,false,false,null);
            //隊列綁定交換機-channel.queueBind(隊列名, 交換機名, 路由key[廣播消息設置為空串])
            channel.queueBind("fanout_queue2", "fanout_exchange", "");

            //創建消費者
            Consumer callback = new DefaultConsumer(channel){
                /**
                 * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
                 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
                 * @param properties  屬性信息(生產者的發送時指定)
                 * @param body 消息內容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //路由的key
                    String routingKey = envelope.getRoutingKey();
                    //獲取交換機信息
                    String exchange = envelope.getExchange();
                    //獲取消息ID
                    long deliveryTag = envelope.getDeliveryTag();
                    //獲取消息信息
                    String message = new String(body,"utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                            ",exchange:" + exchange +
                            ",deliveryTag:" + deliveryTag +
                            ",message:" + message);
                }
            };
            /**
             * 消息消費
             * 參數1:隊列名稱
             * 參數2:是否自動應答,true為自動應答[mq接收到回復會刪除消息],設置為false則需要手動應答
             * 參數3:消息接收到后回調
             */
            channel.basicConsume("fanout_queue2",true,callback);

            //注意,此處不建議關閉資源,讓程序一直處於讀取消息
        }
}
復制代碼

生產者:

復制代碼
package rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class FanoutProducer {

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        //8、創建頻道-channel = connection.createChannel()
        Channel channel = connection.createChannel();
        //聲明交換機- channel.exchangeDeclare(交換機名字,交換機類型)
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
        //連續發10條消息
        for (int i = 0; i < 10; i++) {
            //10、創建消息-String m = xxx
            String message = "hello, message!" + i;
            //11、消息發送-channel.basicPublish(交換機[默認Default Exchage],路由key[簡單模式可以傳遞隊列名稱],消息其它屬性,消息內容)
            channel.basicPublish("fanout_exchange","",null,message.getBytes("utf-8"));
            System.out.println("發送消息成功: " + message);
        }
        //12、關閉資源-channel.close();connection.close()
        channel.close();
        connection.close();
    }
}
復制代碼

啟動兩個生產者,后啟動消費者后消費消息。

 

3.Topic類型

  處理routingKey和bindingKey,支持通配符。# 匹配0或多個單詞,* 匹配單個單詞。 Topic主題模式可以實現 Publish/Subscribe發布訂閱模式 和 Routing路由模式 的雙重功能

生產者:

復制代碼
package rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class TopicProducer {

      public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtils.getConnection();
            //8、創建頻道-channel = connection.createChannel()
            Channel channel = connection.createChannel();

            //聲明交換機- channel.exchangeDeclare(交換機名字,交換機類型)
            channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
            //連續發3條消息
            for (int i = 0; i < 5; i++) {
                String routingKey = "";
                //發送消息的時候根據相關邏輯指定相應的routing key。
                switch (i){
                    case 0:  //假設i=0,為error消息
                        routingKey = "log.error";
                        break;
                    case 1: //假設i=1,為info消息
                        routingKey = "log.info";
                        break;
                    case 2: //假設i=2,為warning消息
                        routingKey = "log.warning";
                        break;
                    case 3: //假設i=3,為log.info.add消息
                        routingKey = "log.info.add";
                        break;
                    case 4: //假設i=4,為log.info.update消息
                        routingKey = "log.info.update";
                        break;
                }
                //10、創建消息-String m = xxx
                String message = "hello,message!" + i;
                //11、消息發送-channel.basicPublish(交換機[默認Default Exchage],路由key[簡單模式可以傳遞隊列名稱],消息其它屬性,消息內容)
                channel.basicPublish("topic_exchange",routingKey,null,message.getBytes("utf-8"));
            }
            //12、關閉資源-channel.close();connection.close()
            channel.close();
            connection.close();
        }
}
復制代碼

消費者一:

復制代碼
package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class TopicConsumerOne {

    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        //8、創建頻道-channel = connection.createChannel()
        Channel channel = connection.createChannel();
        //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數)
        channel.queueDeclare("topic_queue1",true,false,false,null);
        //隊列綁定交換機與路由key
        channel.queueBind("topic_queue1", "topic_exchange", "log.*");
        //創建消費者
        Consumer callback = new DefaultConsumer(channel){
            /**
             * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
             * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
             * @param properties  屬性信息(生產者的發送時指定)
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由的key
                String routingKey = envelope.getRoutingKey();
                //獲取交換機信息
                String exchange = envelope.getExchange();
                //獲取消息ID
                long deliveryTag = envelope.getDeliveryTag();
                //獲取消息信息
                String message = new String(body,"utf-8");
                System.out.println(
                        "routingKey:" + routingKey +
                        ",exchange:" + exchange +
                        ",deliveryTag:" + deliveryTag +
                        ",message:" + message);
            }
        };
        /**
         * 消息消費
         * 參數1:隊列名稱
         * 參數2:是否自動應答,true為自動應答[mq接收到回復會刪除消息],設置為false則需要手動應答
         * 參數3:消息接收到后回調
         */
        channel.basicConsume("topic_queue1",true,callback);

        //注意,此處不建議關閉資源,讓程序一直處於讀取消息
    }
}
復制代碼

消費者二:

復制代碼
package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class TopicConsumerTwo {

     public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtils.getConnection();
            //8、創建頻道-channel = connection.createChannel()
            Channel channel = connection.createChannel();
            //9、聲明隊列-channel.queueDeclare(名稱,是否持久化,是否獨占本連接,是否自動刪除,附加參數)
            channel.queueDeclare("topic_queue2",true,false,false,null);

            //隊列綁定路由key
            channel.queueBind("topic_queue2", "topic_exchange", "log.#");
            //創建消費者
            Consumer callback = new DefaultConsumer(channel){
                /**
                 * @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
                 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)
                 * @param properties  屬性信息(生產者的發送時指定)
                 * @param body 消息內容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //路由的key
                    String routingKey = envelope.getRoutingKey();
                    //獲取交換機信息
                    String exchange = envelope.getExchange();
                    //獲取消息ID
                    long deliveryTag = envelope.getDeliveryTag();
                    //獲取消息信息
                    String message = new String(body,"utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                            ",exchange:" + exchange +
                            ",deliveryTag:" + deliveryTag +
                            ",message:" + message);
                }
            };
            
            /**
             * 消息消費
             * 參數1:隊列名稱
             * 參數2:是否自動應答,true為自動應答[mq接收到回復會刪除消息],設置為false則需要手動應答
             * 參數3:消息接收到后回調
             */
            channel.basicConsume("topic_queue2",true,callback);

            //注意,此處不建議關閉資源,讓程序一直處於讀取消息
        }
}
復制代碼

啟動兩個消費者后啟動生產者,最終入下:

(1)消費者一

 (2)消費者二

 

 總結:

1、簡單模式

一個生產者、一個消費者,不需要設置交換機(使用默認的交換機,一個direct類型的交換機,routing_key為queue名稱)

2、工作隊列模式 Work Queue

一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機,一個direct類型的交換機,routing_key為queue名稱)

3、發布訂閱模式 Publish/subscribe

需要設置類型為fanout的交換機,並且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列。多播模式,不進行RoutingKey的判斷。

4、路由模式 Routing

需要設置類型為direct的交換機,交換機和隊列進行綁定,並且指定routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列

5、通配符模式 Topic

需要設置類型為topic的交換機,交換機和隊列進行綁定,並且指定通配符方式的routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列

 

  補充一下,無論是fanout多播模式還是direct路由模式還是topic通配符模式,Exchanger收到消息是會發送到后面的queue列中。如果一個應用以多實例部署,多個實例監聽一個Exchanger下面相同的隊列,不會造成一個消息被相同的應用多實例重復消費,因為queue本質是不可重復消費。

  開發中可以一個應用一個交換機,不同的消息類型放到不同的隊列中。如果涉及死信隊列,可以對每個應用再建立一個死信交換機,隊列名稱相同,便於處理死信消息。

 

補充:消息的屬性可以通過BasicProperties進行設置

BasicProperties源碼如下:

復制代碼
    public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
復制代碼

測試:

(1)生產者發送消息時生成一些屬性

復制代碼
package rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Producer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            String message = "測試消息";
            // 設置消息屬性以及headers
            Map<String, Object> headers = new HashMap<>();
            headers.put("creator", "張三");
            BasicProperties build = new BasicProperties().builder().appId("test001").messageId("001").headers(headers).build();
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            createChannel.basicPublish("", "myQueue", build, message.getBytes());
            
            System.out.println("消息發送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (createChannel != null) {
                createChannel.close();
            }
            if (newConnection != null) {
                newConnection.close();
            }
        }
        
    }
}
復制代碼

(2)消息接收者

復制代碼
package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            /**
             * 接收消息。會持續堅挺,不能關閉channel和Connection
             * 參數一:隊列名稱
             * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息
             * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。
             * 參數四:消息接收者
             */
            createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    
                    System.out.println(properties);
                    
                    System.out.println(envelope);
                    
                    String string = new String(body, "UTF-8");
                    System.out.println("接收到d消息: -》 " + string);
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}
復制代碼

結果:

#contentHeader<basic>(content-type=null, content-encoding=null, headers={creator=張三}, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=001, timestamp=null, type=null, user-id=null, app-id=test001, cluster-id=null)
Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=myQueue)
接收到d消息: -》 測試消息

 

補充: RabbitMQheaders消息類型的交換機使用方法如下:

x-match 為all是匹配所有的請求頭和值,必須所有相等才會發送;any是滿足任意一個即可。

復制代碼
package rabbitmq;

import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;

import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class HeadersProducer {
    
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = ConnectionUtils.getConnection();
            channel = connection.createChannel();

            
            //聲明交換機
            channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);
            // 聲明queue
            channel.queueDeclare("header_queue", true, false, false, null);
            // 聲明bind-x-match any  匹配任意一個頭,x-match all 匹配所有key
            Map<String, Object> bindingArgs = new HashMap<String, Object>();
            bindingArgs.put("x-match", "all"); //any or all
            bindingArgs.put("headName1", "val1");
            bindingArgs.put("headName2", "val2");
            channel.queueBind("header_queue", "header_exchange", "", bindingArgs);
            
            //設置消息頭鍵值對信息
            Map<String, Object> headers = new Hashtable<String, Object>();
            headers.put("headName1", "val1");
            headers.put("headName2", "val2");
            Builder builder = new Builder();
            builder.headers(headers);
            String message = "這是headers測試消息234";
            channel.basicPublish("header_exchange", "", builder.build(), message.getBytes());
            System.out.println("發送消息: " + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    // 回滾。如果未異常會提交事務,此時回滾無影響
                    channel.txRollback();
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}
復制代碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM