[java學習筆記]rabbitMq的使用


  消息中間件實現不同系統之間通信的一個系統,就rabbitMQ來講,消息的發出方將消息送入某個交換機,並且制定一個路由關鍵字,該交換機根據路由關鍵字將消息放入對應的隊列中,然后一直監聽着隊列的程序便可以接收道相應的消息,並且根據預定的程序執行相應的邏輯。

  下面通過一個例子來實現程序間的通信:

消息發出方:

package cn.ly;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 測試rabbitmq客戶端,制造者端測試
 *
 * 生產者發送信息的過程:
 * 1.創建連接
 * 2.設置虛擬機
 * 3.創建會話通道
 * 4.聲明隊列
 * 5.聲明交換機
 * 6.綁定交換機和隊列,創建路由關鍵字
 * 7.發送消息
 * 8.關閉資源
 * 注明:4-6這三步不是必須的,但是如果直接發送消息而沒有隊列的話程序會出錯,所以,在發送之前先聲明,
 * 同理,消費者端也是這樣,需要先聲明,沒有的話就會創建,有的話就不發生什么;
 *
 * 對於工作模式:
 * 1.work queues:不使用交換機,只有一個隊列,可以有多個消費端,隊列通知采用輪詢的方法給監聽的多個消費端
 * 發送消息;
 * 2.publish/subscribe:通過交換機進行消息轉發,有多個隊列,每個隊列均可有一個或者多個消費者進行監聽,
 * 每次生產者發送消息,則均由交換機轉發至各個隊列,由各個隊列自行通知監聽的消費者;
 * 3.Routing:模式同2,區別,為每個隊列配備一個,或多個路由關鍵字,發送消息時指定路由關鍵字,由交換機根據路由
 * 關鍵字匹配進行轉發;
 * 4.Topics:模式同3,區別在於,配備的路由關鍵字可以為通配符的形式,通配符有:#和*,區別:#匹配任意個單詞,而*
 * 只能匹配單個單詞,其中路由關鍵字指定規則:多個單詞使用.隔開;
 * 5.Header:模式同3,區別,匹配的是鍵值對;
 * 6.RPC:遠程異步調用,mq的一個應用,有客戶端和服務端,客戶端向mq發送一個調用服務端的信息,服務端獲得信息,調用
 * 相應服務,將返回結果作為消息發送到另一個隊列,客戶端監聽該隊列獲取返回信息;
 *
 * 對於消費者,每個消費者監聽隊列時指定的參數中,有隊列名,是否自動回復,以及一個的map屬性和一個回調方法;
 * 消費者,單獨的消費者無法實現同時監聽兩個隊列的操作;
 * 針對隊列:可以給它增加路由key,也可以給一系列的鍵值對,作為頭信息,這些東西都是對隊列的標識,當生產者發送消息時,
 * 使用這些標識來決定將消息發送到那個隊列,有路由key的話,優先匹配路由key;
 * 消費者的行為很單純,就是監聽一個隊列,然后發現消息就回收;
 */
public class ProducerTest {
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    // rabbitMq
    @Test
    public void run() {
        // 通過連接工廠創建新的連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬機
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            // 創建連接
            connection = connectionFactory.newConnection();
            // 創建會話通道
            channel = connection.createChannel();
            //聲明隊列,如果隊列在mq 中沒有則要創建
            //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 參數明細
             * 1、queue 隊列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟后隊列還在
             * 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
             * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
             * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            //聲明一個交換機
            //參數:String exchange, String type
            /**
             * 參數明細:
             * 1、交換機的名稱
             * 2、交換機的類型
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing    工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            // 綁定交換機和隊列

            //參數:String queue, String exchange, String routingKey
            /**
             * 參數明細:
             * 1、queue 隊列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);


            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms and email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}
View Code

做的事情主要有:創建鏈接,設置虛擬機,創建會話通道,聲明隊列,聲明交換機,通過關鍵字綁定交換機,發送消息,關閉鏈接;一個鏈接可以有多個會話通道;其中,聲明交換機,聲明隊列和綁定這三件事不是必須做的,只是,初次運行發送消息時如果沒有對應的交換機則會報錯,另外,如果在創建交換機時報錯,可以訪問localhost:15672中,查看是否有同名的交換機,有的話刪除即可;

消息接收方:

package cn.ly.cn.ly;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 通過連接工廠創建新的連接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬機
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        // 創建連接
        connection = connectionFactory.newConnection();
        // 創建會話通道
        channel = connection.createChannel();
        //聲明隊列,如果隊列在mq 中沒有則要創建
        //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟后隊列還在
         * 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用於臨時隊列的創建
         * 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)
         * 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing    工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        // 綁定交換機和隊列

        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將消息轉發到指定的隊列中,在發布訂閱模式中調協為空字符串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息后此方法將被調用
             * @param consumerTag  消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來標識消息的id,可用於確認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}
View Code

消息接收方主要是創建鏈接,聲明回調方法,監聽隊列,同樣,創建隊列的操作不是必須的,但是,如果隊列不存在,監聽時會報錯,所以提前創建;

需要注意的一點:同一個隊列可以由多個進程同時監聽,但是,同一條消息只能被一個進程接收,即監聽同一隊列的多個程序在一次消息發送中,只會有一個接收消息並處理;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM