RabbitMQ入門:Hello RabbitMQ 代碼實例


在之前的一篇博客RabbitMQ入門:認識並安裝RabbitMQ(以Windows系統為例)中,我們安裝了RabbitMQ並且對其也有的初步的認識,今天就來寫個入門小例子來加深概念理解並了解代碼怎么實現。

本篇博客圍繞下面幾個方面展開:

  1. 代碼前的理論熱身
  2. 代碼實例:Hello RabbitMQ
  3. 運行代碼並調試問題

Now, Let's begin !

一、代碼前的理論熱身

我們來看張圖:

Publisher(生產者)生成消息,然后publish(發布)消息到exchange(路由器,也有資料翻譯成交換機),然后根據路由規則將消息傳遞到Queue(隊列),最終交由Consumer(消費者)進行消費處理。

這里的生產者和消費者都是我們的應用,因此我們的代碼中要實現這兩個部分。

中間的節點就是RabbitMQ 提供的內容,需要再生產者和消費者里面調用其接口來定義和使用這些節點。

 

二、代碼實例:Hello RabbitMQ

  1. 首先來實現生產者,這里我沒有用Publisher做類名,而是用的Provider,沒有特別的用意,就是在起名字的時候不小心寫成了這樣,不需要在意這個細節,O(∩_∩)O。
    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Provider {
    
        //定義隊列名
        static String QUEUE_NAME = "helloRabbit";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                //1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
                
                //2.為通道聲明隊列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                
                //3.發布消息
                String msg = " hello rabbitmq, welcome to sam's blog.";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("provider send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                //4.關閉連接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    }

    在第2步中,channel.queueDeclare 用來創建隊列,有5個參數:String queue, 隊列名; boolean durable, 該隊列是否需要持久化; boolean exclusive,該隊列是否為該通道獨占的(其他通道是否可以消費該隊列); boolean autoDelete,該隊列不再使用的時候,是否讓RabbitMQ服務器自動刪除掉; Map<String, Object> arguments 其他參數。第3步中,channel.basicPublish 發布消息(用在生產者),有4個參數:String exchange, 路由器(有的資料翻譯成交換機)的名字,即將消息發到哪個路由器; String routingKey, 路由鍵,即發布消息時,該消息的路由鍵是什么; BasicProperties props, 指定消息的基本屬性; byte[] body 消息體,也就是消息的內容,是字節數組。 可能你會疑惑,為什么沒有exchange呢?因為如果聲明了隊列,可以不聲明路由器。

  2. 接着來實現消費者,消費者實現和生產者過程差不多,但是在這里並沒有關閉連接和通道,是因為要消費者一直等待隨時可能發來的消息。代碼如下:
    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class HelloConsumer {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明隊列
                channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null);
                System.out.println(" **** keep alive ,waiting for messages, and then deal them");
                // 3.通過回調生成消費者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                        
                        //獲取消息內容然后處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]");
                    }
                };
                
                //4.消費消息
                channel.basicConsume(Provider.QUEUE_NAME, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    在第4步中,channel.basicConsume 用來接收消息,用在消費者,有3個參數:String queue, 隊列名字,即要從哪個隊列中接收消息; boolean autoAck, 是否自動確認,默認true; Consumer callback 消費者,即誰接收消息。

 

三、運行代碼並調試問題

代碼寫好了,接下來進行測試,

  1. 先來執行下Provider.java,發現報錯了:
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142)
        at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:952)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
        at com.sam.hello_rabbitmq.Provider.main(Provider.java:36)
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
        ... 3 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:509)
        at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
        at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:345)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:286)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:600)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:527)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at com.sam.hello_rabbitmq.Provider.main(Provider.java:60)
    關鍵堆棧信息是:inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true',說是helloRabbit這個隊列durable(是否需要持久化)
    參數已經設定成了true 但是代碼中指定的是false,沖突了,納尼?訪問RabbitMQ管理頁面:http://localhost:15672/#/queues 發現已經存在一個隊列helloRabbit,

    點helloRabbit的鏈接,發現隊列的durable屬性確實是true。哦,原來我之前在做別的練習的時候,創建過一個叫這個名字的隊列了,而且屬性值剛好為true.

    那么接下來刪掉這個既存的隊列

    再去執行Provider.java,后台打印了內容,並且隊列中有了一條ready的消息。

    問題解決!

  2. 執行HelloConsumer.java,預想的結果是在啟動后,控制台直接打印出log並且RabbitMQ管理頁面沒有ready的消息:

    結果符合預期。

到此,全部工作完美結束。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM