Java連接RabbitMQ


首先,我們需要了解,RabbitMQ的作用是什么?

  • 解耦:例如短信,郵件,訂單系統等操作使用rabbitmq作為中間件更為合適,意思就是當用戶下了訂單時,會存放至mq,再由別的系統例如庫存過來調用,這種架構的話,即使庫存系統掛掉了,也不會影響我們訂單系統的使用
  • 異步:假如有一個用戶注冊功能,注冊的時候要發送郵件和短信,此時我們就可以將注冊信息寫入mq,然后郵件和短信就可以並發去處理
  • 削鋒:用戶的大量請求,例如秒殺或團搶活動,就可以使用rabbitmq作為存儲,數據庫再慢慢的去處理

缺點:系統的可用性降低,系統引入的外部依賴越多,系統能夠越容易掛掉,就假如我們引入了MQ后,MQ一掛掉,我們的整個系統也就會無法使用了


擴展:rabbitMQ為什么是通過通道處理消息而不是鏈接呢?

主要原因是因為鏈接是基於TCP協議的,通過三次握手來實現的話是很慢的,而且一開一關開銷很大,所以我們就是在TCP協議的基礎上使用通道channel來實現長鏈接,這樣我們來處理消息的效率才會更高,並且通道可以是多個


接下來,我們就來使用Java來連接一下我們的RabbitMQ

1.創建一個Maven項目

首先,我們需要先創建一個Maven項目,可參考我之前編寫的Maven項目的創建

2.導入RabbitMQ依賴包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

3.通過JAVA實現消息發送

注:這里記得給先我們用戶授權virtualhost,由於我使用的virtualhost為'/',用戶為admin,所以使用以下命令進行授權

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

創建RabbitMQ連接信息的工具類

public class MqConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定義一個鏈接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //設置服務地址
        connectionFactory .setHost("192.168.36.199");
        //設定端口,注意,這里RabbitMQ的端口,不是管理頁面的端口
        connectionFactory .setPort(5672);
        //設定用戶名
        connectionFactory .setUsername("admin");
        //設定密碼
        connectionFactory .setPassword("123");
        //設定虛擬訪問節點
        connectionFactory .setVirtualHost("/");
        return connectionFactory.newConnection();
    }
}

創建生產者

注:這里導入的是rabbitmq的包

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生產者
 */
public class Send {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME="queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();
      //准備發送的消息內容
        String msg = "world";
     //發送消息給隊列
      /**
       * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
       * 參數2:簡單模式:隊列名稱
       * 參數3:消息的狀態控制
       * 參數4:消息內容
       */
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.print("發送成功");
        //關閉通道
        channel.close();
        connection.close();
    }
}

此時我們的queue1隊列中就可查看到該消息

4.隊列的創建

創建一個queue4隊列並向其發送消息

/**
 * 生產者
 */
public class Send {
    //設定隊列名稱
    private static final String QUEUE_NAME="queue4";

    public static void main(String[] args) throws IOException, TimeoutException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();        
/** * 創建隊列 * 參數1:隊列的名稱 * 參數2:是否要持久化,持久化和非持久化都會存盤,但是非持久化重啟服務器會丟失,所以只是單純的重啟服務,隊列是不會消失的 * 參數3:排他性,是否是獨占獨立 * 參數4:是否自動刪除,隨着最后一個消費者消費完畢后是否刪除隊列 * 參數5:攜帶一些附加參數 */ channel.queueDeclare(QUEUE_NAME,true,false,false,null);
     //發送消息給隊列 /** * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有 * 參數2:簡單模式:隊列名稱 * 參數3:消息的狀態控制 * 參數4:消息內容 */ for(int i = 1;i <= 5; i++){ //准備發送的消息內容 String msg = "Hello" + i; //發送消息給隊列queue channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } System.out.print("發送成功"); //關閉通道 channel.close(); connection.close(); } }

此時再登錄我們的管理頁面即可看到我們新創建的隊列

5.交換機的創建與隊列綁定

交換機的創建

//true代表持久性,交換機不會隨着服務器重啟造成丟失
channel.exchangeDeclare("交換機名稱","交換機類型",true);

交換機與隊列的綁定

//routekey根據交換機模式判斷是否填寫
channel.queueBind(“隊列名稱”,”交換機名稱”,”routekey”)

6.實現fanout發布於訂閱模式

如果不知道fanout是什么,可以參考RabbitMQ通過管理頁面實現發布與訂閱

/**
 * 生產者
 */
public class Send {public static void main(String[] args) throws IOException, TimeoutException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "Hello World";

        //准備交換機(已創建的交換機)
        String exchangeName = "fanout-exchange";

        //准備路由
        String routekey = "";

        //發送消息給交換機
        /**
         * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
         * 參數2:routekey
         * 參數3:消息的狀態控制
         * 參數4:消息內容
         */
        //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
        channel.basicPublish(exchangeName,routekey,null,msg.getBytes());

        System.out.print("發送成功");
        //關閉通道
        channel.close();
        connection.close();
    }
}

7.實現direct發布於訂閱模式

/**
 * 生產者
 */
public class Send {public static void main(String[] args) throws IOException, TimeoutException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "Hello World";

        //准備交換機(已創建的交換機)
        String exchangeName = "direct-exchange";

        //准備路由
        String routekey = "email";

        //發送消息給交換機
        /**
         * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
         * 參數2:routekey
         * 參數3:消息的狀態控制
         * 參數4:消息內容
         */
        //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
        channel.basicPublish(exchangeName,routekey,null,msg.getBytes());

        System.out.print("發送成功");
        //關閉通道
        channel.close();
        connection.close();
    }
}

8.實現topic發布於訂閱模式

其實topic模式和direct模式幾乎一致,只需要修改交換器名稱和routekey即可

/**
 * 生產者
 */
public class Send {

    public static void main(String[] args) throws IOException, TimeoutException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "Hello World";

        //准備交換機(已創建的交換機)
        String exchangeName = "topic-exchange";

        //准備路由
        String routekey = "kuang.chen.com";

        //發送消息給交換機
        /**
         * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
         * 參數2:routekey
         * 參數3:消息的狀態控制
         * 參數4:消息內容
         */
        //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
        channel.basicPublish(exchangeName,routekey,null,msg.getBytes());

        System.out.print("發送成功");
        //關閉通道
        channel.close();
        connection.close();
    }
}

9.消費者的實現

需要先了解,在消費者中,RabbitMQ為其提供了消息確認機制,分為消息自動確認模式和消息手動確認模式,當消息確認后,我們隊列中的消息將會移除

那這兩種模式要如何選擇呢?

  • 如果消息不太重要,丟失也沒有影響,那么自動ACK會比較方便。好處就是可以提高吞吐量,缺點就是會丟失消息
  • 如果消息非常重要,不容丟失,則建議手動ACK,正常情況都是更建議使用手動ACK。雖然可以解決消息不會丟失的問題,但是可能會造成消費者過載

消息自動確認模式的實現

注:自動確認模式,消費者不會判斷消費者是否成功接收到消息,也就是當我們程序代碼有問題,我們的消息都會被自動確認,消息被自動確認了,我們隊列就會移除該消息,這就會造成我們的消息丟失

/**
 * 消費者
 */
public class Recv {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該隊列,true代表自動確認
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
            }
        });
    }
}

實現效果,消費者會將我們隊列中的消息全部接收然后確認,並移除隊列

此時我們再來看一下Connections和Channels

Connections

Channels

可以看到在我們啟動消費者監聽后,通道是一直存在的

消息手動確認模式的實現

/**
 * 消費者
 */
public class Recv {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該隊列,false代表手動確認
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
            }
        });
    }
}

手動確認模式下,當我們消費者成功接收到消息后,在隊列中消息會進入Unacked項,也就是待確認模式

所以我們還需要加上下列代碼,來實現消息者在成功接收到消息后,手動確認

#添加紅色字段

/**
 * 消費者
 */
public class Recv {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME = "queue1";
public static void main(String[] args) throws IOException, TimeoutException { //從mq工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //獲取一個通道 Channel channel = connection.createChannel(); //監聽該隊列 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ System.out.println("接收到的消息:"+ new String(body,"UTF-8")); //獲取消息的編號,我們需要根據消息的編號來確認消息 long tag = envelope.getDeliveryTag(); //獲取當前內部類中的通道 Channel c = this.getChannel(); //手動確認消息,確認以后,則表示消息已經成功處理,消息就會從隊列中移除 c.basicAck(tag,true); } }); } }

此時,我們的消息才會成功被確認,並移除隊列。


這里我們對消息確認機制中的消費者接收消息確認進行了初步的介紹,具體的消息確認機制可參考RabbitMQ消息確認機制進行了解

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM