沒用過消息隊列?一文帶你體驗RabbitMQ收發消息


人生終將是場單人旅途,孤獨之前是迷茫,孤獨過后是成長。

楔子

先給大家說聲抱歉,最近一周都沒有發文,有一些比較要緊重要的事需要處理。

今天正好得空,本來說准備寫SpringIOC相關的東西,但是發現想要梳理一遍還是需要很多時間,所以我打算慢慢寫,先把MQ給寫了,再慢慢寫其他相關的,畢竟偏理論的東西一遍要比較難寫,像MQ這種偏實戰的大家可以clone代碼去玩一玩,還是比較方便的。

同時MQ也是Java進階不必可少的技術棧之一,所以Java開發從業者對它是必須要了解的。

現在市面上有三種消息隊列比較火分別是:RabbitMQRocketMQKafka

今天要講的消息隊列中我會以RabbitMQ作為案例來入門,因為SpringBoot的amqp中默認只集成了RabbitMQ,用它來講會方便許多,且RabbitMQ的性能和穩定性都很不錯,是一款經過時間考驗的開源組件。

祝有好收獲。

本文代碼: 碼雲地址GitHub地址

1. 🔍消息隊列?

消息隊列(MQ)全稱為Message Queue,是一種應用程序對應用程序的通信方法。

翻譯一下就是:在應用之間放一個消息組件,然后應用雙方通過這個消息組件進行通信。

好端端的為啥要在中間放個組件呢?

小系統其實是用不到消息隊列的,一般分布式系統才會引入消息隊列,因為分布式系統需要抗住高並發,需要多系統解耦,更需要對用戶比較友好的響應速度,而消息隊列的特性可以天然解耦,方便異步更能起到一個頂住高並發的削峰作用,完美解決上面的三個問題。


然萬物抱陽負陰,系統之間突然加了個中間件,提高系統復雜度的同時也增加了很多問題:

  • 消息丟失怎么辦?
  • 消息重復消費怎么辦?
  • 某些任務需要消息的順序消息,順序消費怎么保證?
  • 消息隊列組件的可用性如何保證?

這些都是使用消息隊列過程中需要思考需要考慮的地方,消息隊列能給你帶來很大的便利,也能給你帶來一些對應的麻煩。

上面說了消息隊列帶來的好處以及問題,而這些不在我們今天這篇的討論范圍之內,我打算之后再寫這些,我們今天要做的是搭建出一個消息隊列環境,讓大家感受一下基礎的發消息與消費消息,更高級的問題會放在以后討論。

2. 📖RabbitMQ一覽

RabbitMQ是一個消息組件,是一個erlang開發的AMQP(Advanced Message Queue)的開源實現。

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。

RabbitMQ采用了AMQP協議,至於這協議怎么怎么樣,我們關心的是RabbitMQ結構如何且怎么用。

還是那句話,學東西需要先觀其大貌,我們要用RabbitMQ首先要知道它整體是怎么樣,這樣才有利於我們接下來的學習。

我們先來看看我剛畫的架構圖,因為RabbitMQ實現了AMQP協議,所以這些概念也是AMQP中共有的。

rabbit架構圖

  • Broker: 中間件本身。接收和分發消息的應用,這里指的就是RabbitMQ Server。

  • Virtual host: 虛擬主機。出於多租戶和安全因素設計的,把AMQP的基本組件划分到一個虛擬的分組中,類似於網絡中的namespace概念。當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以划分出多個vhost,每個用戶在自己的vhost創建exchange/queue等。

  • Connection: 連接。publisher/consumer和broker之間的TCP連接。斷開連接的操作只會在client端進行,Broker不會斷開連接,除非出現網絡故障或broker服務出現問題。

  • Channel: 渠道。如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷會比較大且效率也較低。Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。

  • Exchange: 路由。根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。

  • Queue: 消息的隊列。消息最終被送到這里等待消費,一個message可以被同時拷貝到多個queue中。

  • Binding: 綁定。exchange和queue之間的虛擬連接,binding中可以包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。


看完了這些概念,我再給大家梳理一遍其流程:

當我們的生產者端往Broker(RabbitMQ)中發送了一條消息,Broker會根據其消息的標識送往不同的Virtual host,然后Exchange會根據消息的路由key和交換器類型將消息分發到自己所屬的Queue中去。

然后消費者端會通過Connection中的Channel獲取剛剛推送的消息,拉取消息進行消費。

Tip:某個Exchange有哪些屬於自己的Queue,是由Binding綁定關系決定的。

3. 💡RabbitMQ環境

上面講了RabbitMQ大概的結構圖和一個消息的運行流程,講完了理論,這里我們就准備實操一下吧,先進行RabbitMQ安裝。

官網下載地址:http://www.rabbitmq.com/download.html

由於我還沒有屬於自己MAC電腦,所以這里的演示就按照Windows的來了,不過大家都是程序員,安裝個東西總歸是難不倒大家的吧😂

Windows下載地址:https://www.rabbitmq.com/install-windows.html

進去之后可以直接找到Direct Downloads,下載相關EXE程序進行安裝就可以了。

由於RabbitMQ是由erlang語言編寫的,所以安裝之前我們還需要安裝erlang環境,你下載RabbitMQ之后直接點擊安裝,如果沒有相關環境,安裝程序會提示你,然后會讓你的瀏覽器打開erlang的下載頁面,在這個頁面上根據自己的系統類型點擊下載安裝即可,安裝完畢后再去安裝RabbitMQ

這兩者的安裝都只需要一直NEXT下一步就可以了。

安裝完成之后可以按一下Windows鍵看到效果如下:

rabbitmq安裝效果

Tip:其中Rabbit-Command后面會用到,是RabbitMQ的命令行操作台。


安裝完RabbitMQ我們需要對我們的開發環境也導入RabbitMQ相關的JAR包。

為了方便起見,我們可以直接使用Spring-boot-start的方式導入,這里面也會包含所有我們需要用到的RabbitMQ相關的JAR包。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
</dependencies>

直接引入spring-boot-starter-amqp即可。

4. ✍Hello World

搭建好環境之后,我們就可以上手了。

考慮到這是一個入門文章,讀者很多可能沒有接觸過RabbitMQ,直接使用自動配置的方式可能會令大家很迷惑,因為自動配置會屏蔽很多細節,導致大家只看到了被封裝后的樣子,不利於大家理解。

所以在本節Hello World這里,我會直接使用最原始的連接方式就行演示,讓大家看到最原始的連接的樣子。

Tip:這種方式演示的代碼我都在放在prototype包下面。

4.1 生產者

先來看看生產者代碼,也就是我們push消息的代碼:

    public static final String QUEUE_NAME = "erduo";

    // 創建連接工廠
    ConnectionFactory connectionFactory = new ConnectionFactory();

    // 連接到本地server
    connectionFactory.setHost("127.0.0.1");

    // 通過連接工廠創建連接
    Connection connection = connectionFactory.newConnection();

    // 通過連接創建通道
    Channel channel = connection.createChannel();

    // 創建一個名為耳朵的隊列,該隊列非持久(RabbitMQ重啟后會消失)、非獨占(非僅用於此鏈接)、非自動刪除(服務器將不再使用的隊列刪除)
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString();
    // 發布消息
    // 四個參數為:指定路由器,指定key,指定參數,和二進制數據內容
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));

    System.out.println("生產者發送消息結束,發送內容為:" + msg);
    channel.close();
    connection.close();

代碼我都給了注釋,但是我還是要給大家講解一遍,梳理一下。

先通過RabbitMQ中的ConnectionFactory配置一下將要連接的server-host,然后創建一個新連接,再通過此連接創建通道(Channel),通過這個通道創建隊列和發送消息。

這里看上去還是很好理解的,我需要把創建隊列和發送消息這里再拎出來說一下。

創建隊列

    AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

創建隊列的方法里面有五個參數,第一個是參數是隊列的名稱,往后的三個參數代表不同的配置,最后一個參數是額外參數。

  • durable:代表是否將此隊列持久化。

  • exclusive:代表是否獨占,如果設置為獨占隊列則此隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。

  • autoDelete:代表斷開連接后是否自動刪除此隊列。

  • arguments:代表其他額外參數。

這些參數中durable經常會用到,它代表了我們可以對隊列做持久化,以保證RabbitMQ宕機恢復后此隊列也可以自行恢復。

發送消息

    void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;

發送消息的方法里是四個參數,第一個是必須的指定exchange,上面的示例代碼中我們傳入了一個空字符串,這代表我們交由默認的匿名exchange去幫我們路由消息。

第二個參數是路由key,exchange會根據此key對消息進行路由轉發,第三個參數是額外參數,講消息持久化時會用到一下,最后一個參數就是我們要發送的數據了,需要將我們的數據轉成字節數組的方式傳入。

測試

講完了這些API之后,我們可以測試一下我們的代碼了,run一下之后,會在控制台打出如下:

生產者測試結果01

這樣之后我們就把消息發送到了RabbitMQ中去,此時可以打開RabbitMQ控制台(前文安裝時提到過)去使用命令rabbitmqctl.bat list_queues去查看消息隊列現在的情況:

查看隊列狀態

可以看到有一條message在里面,這就代表我們的消息已經發送成功了,接下來我們可以編寫一個消費者對里面的message進行消費了。

4.2 消費者

消費者代碼和生產者的差不多,都需要建立連接建立通道:

    // 創建連接工廠
    ConnectionFactory connectionFactory = new ConnectionFactory();

    // 連接到本地server
    connectionFactory.setHost("127.0.0.1");

    // 通過連接工廠創建連接
    Connection connection = connectionFactory.newConnection();

    // 通過連接創建通道
    Channel channel = connection.createChannel();

    // 創建消費者,阻塞接收消息
    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("-------------------------------------------");
            System.out.println("consumerTag : " + consumerTag);
            System.out.println("exchangeName : " + envelope.getExchange());
            System.out.println("routingKey : " + envelope.getRoutingKey());
            String msg = new String(body, StandardCharsets.UTF_8);
            System.out.println("消息內容 : " + msg);
        }
    };

    // 啟動消費者消費指定隊列
    channel.basicConsume(Producer.QUEUE_NAME, consumer);
//        channel.close();
//        connection.close();

建立完通道之后,我們需要創建一個消費者對象,然后用這個消費者對象去消費指定隊列中的消息。

這個示例中我們就是新建了一個consumer,然后用它去消費隊列-erduo中的消息。

最后兩句代碼我給注釋掉了,因為一旦把連接也關閉了,那我們的消費者就不能保持消費狀態了,所以要開着連接,監聽此隊列。

ok,運行這段程序,然后我們的消費者會去隊列-erduo拿到里面的消息,效果如下:

消費者test01

  • consumerTag:是這個消息的標識。

  • exchangeName:是這個消息所發送exchange的名字,我們先前傳入的是空字符串,所以這里也是空字符串。

  • exchangeName:是這個消息所發送路由key。

這樣我們的程序就處在一個監聽的狀態下,你再次調用生產者發送消息消費者就會實時的在控制上打印消息內容。

5. 📌消息接收確認(ACK)

上面我們演示了生產者和消費者,我們生產者發送一條消息,消費者消費一條信息,這個時候我們的RabbitMQ應該有多少消息?

理論上來說發送一條,消費一條,現在里面應該是0才對,但是現在的情況並不是:

查看隊列狀態

消息隊列里面還是有1條信息,我們重啟一下消費者,又打印了一遍我們消費過的那條消息,通過消息上面的時間我們可以看出來還是當時我們發送的那條信息,也就是說我們消費者消費過了之后這條信息並沒有被刪除。

消費者test01

這種狀況出現的原因是因為RabbitMQ消息接收確認機制,也就是說一條信息被消費者接收到了之后,需要進行一次確認操作,這條消息才會被刪除。

RabbitMQ中默認消費確認是手動的,也可以將其設置為自動刪除,自動刪除模式消費者接收到消息之后就會自動刪除這條消息,如果消息處理過程中發生了異常,這條消息就等於沒被處理完但是也被刪除掉了,所以這里我們會一直使用手動確認模式。

消息接受確認(ACK)的代碼很簡單,只要在原來消費者的代碼里加上一句就可以了:

    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("-------------------------------------------");
            System.out.println("consumerTag : " + consumerTag);
            System.out.println("exchangeName : " + envelope.getExchange());
            System.out.println("routingKey : " + envelope.getRoutingKey());
            String msg = new String(body, StandardCharsets.UTF_8);
            System.out.println("消息內容 : " + msg);

            // 消息確認
            channel.basicAck(envelope.getDeliveryTag(), false);
            System.out.println("消息已確認");
        }
    };

我們將代碼改成如此之后,可以再run一次消費者,可以看看效果:

消息確認

再來看看RabbitMQ中的隊列情況:

消息隊列狀態

從圖中我們可以看出消息消費后已經成功被刪除了,其實大膽猜一猜,自動刪除應該是在我們的代碼還沒執行之前就幫我們返回了確認,所以這就導致了消息丟失的可能性。

我們采用手動確認的方式之后,可以先將邏輯處理完畢之后(可能出現異常的地方可以try-catch起來),把手動確認的代碼放到最后一行,這樣如果出現異常情況導致這條消息沒有被確認,那么這條消息會在之后被重新消費一遍。

后記

今天的內容就到這里,下一篇將會我們將會撇棄傳統的手動建立連接的方式進行發消息收消息,而轉用Spring幫我們定義好的注解和Spring提供的RabbitTemplate,更方便的收發消息。

消息隊列呢,其實用法都是一樣的,只是各個開源消息隊列的側重點稍有不同,我們應該根據我們自己的項目需求來決定我們應該選取什么樣的消息隊列來為我們的項目服務,這個項目選型的工作一般都是開發組長幫你們做了,一般是輪不到我們來做的,但是面試的時候可能會考察相關知識,所以這幾種消息隊列我們都應該有所涉獵。

好了,以上就是本期的全部內容,感謝你能看到這里,歡迎對本文點贊收藏與評論,👍你們的每個點贊都是我創作的最大動力。

我是耳朵,一個一直想做知識輸出的偽文藝程序員,我們下期見。

本文代碼:碼雲地址GitHub地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM