RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ


轉載請注明出處

0.目錄

RabbitMQ-從基礎到實戰(2)— 防止消息丟失

RabbitMQ-從基礎到實戰(3)— 消息的交換(上)

RabbitMQ-從基礎到實戰(4)— 消息的交換(中)

RabbitMQ-從基礎到實戰(5)— 消息的交換(下)

RabbitMQ-從基礎到實戰(6)— 與Spring集成

1.簡介

本篇博文介紹了在windows平台下安裝RabbitMQ Server端,並用JAVA代碼實現收發消息

2.安裝RabbitMQ

  1. RabbitMQ是用Erlang開發的,所以需要先安裝Erlang環境,在這里下載對應系統的Erlang安裝包進行安裝
  2. 點擊這里下載對應平台的RabbitMQ安裝包進行安裝

Windows平台安裝完成后如圖

 

3.啟用RabbitMQ Web控制台

RabbitMQ提供一個控制台,用於管理和監控RabbitMQ,默認是不啟動的,需要運行以下命令進行啟動

  1. 點擊上圖的Rabbit Command Prompt,打開rabbitMQ控制台
  2. 官方介紹管理控制台的頁面,可以看到,輸入以下命令啟動后台控制插件

    rabbitmq-plugins enable rabbitmq_management

  3. 登錄后台頁面:http://localhost:15672/   密碼和用戶名都是 guest ,界面如下

 

目前可以先不用理會此界面,后面使用到時會詳細介紹,也可以到這里查看官方文檔。

4.編寫MessageSender

Spring對RabbitMQ已經進行了封裝,正常使用中,會使用Spring集成,第一個項目中,我們先不考慮那么多

在IDE中新建一個Maven項目,並在pom.xml中貼入如下依賴,RabbitMQ的最新版本依賴可以在這里找到

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

等待Maven下載完成后,就可以在Maven Dependencies中看到RabbitMQ的JAR

在這里,我們發現,RabbitMQ的日志依賴了slf4j-api這個包,slf4j-api並不是一個日志實現,這樣子是打不出日志的,所以,我們給pom加上一個日志實現,這里用了logback

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.1</version>
</dependency>

之后maven依賴如下,可以放心寫代碼了

 

新建一個MessageSender類,代碼如下

 1 import java.io.IOException;
 2 import java.util.concurrent.TimeoutException;
 3 
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 public class MessageSender {
12     
13     private Logger logger = LoggerFactory.getLogger(MessageSender.class);
14 
15     //聲明一個隊列名字
16     private final static String QUEUE_NAME = "hello";
17     
18     public boolean sendMessage(String message){
19         //new一個RabbitMQ的連接工廠
20         ConnectionFactory factory = new ConnectionFactory();
21         //設置需要連接的RabbitMQ地址,這里指向本機
22         factory.setHost("127.0.0.1");
23         Connection connection = null;
24         Channel channel = null;
25         try {
26             //嘗試獲取一個連接
27             connection = factory.newConnection();
28             //嘗試創建一個channel
29             channel = connection.createChannel();
30             //這里的參數在后面詳解
31             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32             //注意這里調用了getBytes(),發送的其實是byte數組,接收方收到消息后,需要重新組裝成String
33             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
34             logger.info("Sent '" + message + "'");
35             //關閉channel和連接
36             channel.close();
37             connection.close();
38         } catch (IOException | TimeoutException e) {
39             //失敗后記錄日志,返回false,代表發送失敗
40             logger.error("send message faild!",e);
41             return false;
42         }
43         return true;
44     }
45 }

然后在App類的main方法中調用sendMessage

1 public class App {
2     public static void main( String[] args ){
3         MessageSender sender = new MessageSender();
4         sender.sendMessage("hello RabbitMQ!");
5     }
6 }

打印日志如下

打開RabbitMQ的控制台,可以看到消息已經進到了RabbitMQ中

點進去,用控制台自帶的getMessage功能,可以看到消息已經成功由RabbitMQ管理了

至此,MessageSender已經寫好了,在該類的31和33行,我們分別調用了隊列聲明和消息發送

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

queueDeclare,有很多參數,我們可以看一下他的源碼,注釋上有詳細的解釋,我簡單翻譯了一下

 1 /**
 2      * Declare a queue 聲明一個隊列
 3      * @see com.rabbitmq.client.AMQP.Queue.Declare
 4      * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 5      * @param queue the name of the queue隊列的名字
 6      * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,為true則在rabbitMQ重啟后生存
 7      * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性隊列(別人看不到),只對當前連接有效,當前連接斷開后,隊列刪除(設置了持久化也刪除)
 8      * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自動刪除,在最后一個連接斷開后刪除隊列
 9      * @param arguments other properties (construction arguments) for the queue 其他參數
10      * @return a declaration-confirm method to indicate the queue was successfully declared
11      * @throws java.io.IOException if an error is encountered
12      */
13     Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
14                                  Map<String, Object> arguments) throws IOException;

前面4個都非常好理解,最后一個“其他參數”,到底是什么其他參數,這個東西真的很難找,用到再解釋吧,官方文檔如下

  • TTL Time To Live  存活時間

basicPublish的翻譯如下

 1  /**
 2      * Publish a message.發送一條消息
 3      *
 4      * Publishing to a non-existent exchange will result in a channel-level
 5      * protocol exception, which closes the channel.
 6      *
 7      * Invocations of <code>Channel#basicPublish</code> will eventually block if a
 8      * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
 9      *
10      * @see com.rabbitmq.client.AMQP.Basic.Publish
11      * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
12      * @param exchange the exchange to publish the message to 交換模式,會在后面講,官方文檔在這里 13      * @param routingKey the routing key 控制消息發送到哪個隊列
14      * @param props other properties for the message - routing headers etc 其他參數
15      * @param body the message body 消息,byte數組
16      * @throws java.io.IOException if an error is encountered
17      */
18     void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

這里又有個其他參數,它的類型是這樣的,設置消息的一些詳細屬性

 

5.編寫MessageConsumer

為了和Sender區分開,新建一個Maven項目MessageConsumer

 1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16 
17 public class MessageConsumer {
18     
19     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
20     
21     public boolean consume(String queueName){
22         //連接RabbitMQ
23         ConnectionFactory factory = new ConnectionFactory();
24         factory.setHost("127.0.0.1");
25         Connection connection = null;
26         Channel channel = null;
27         try {
28             connection = factory.newConnection();
29             channel = connection.createChannel();
30             //這里聲明queue是為了取消息的時候,queue肯定會存在
31             //注意,queueDeclare是冪等的,也就是說,消費者和生產者,不論誰先聲明,都只會有一個queue
32             channel.queueDeclare(queueName, false, false, false, null);
33             
34             //這里重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這里要重新組裝成String
35             Consumer consumer = new DefaultConsumer(channel){
36                 @Override
37                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
38                           throws IOException {
39                         String message = new String(body, "UTF-8");
40                         logger.info("Received '" + message + "'");
41                 }
42             };
43             //上面是聲明消費者,這里用聲明的消費者消費掉隊列中的消息
44             channel.basicConsume(queueName, true, consumer);
45             
46             //這里不能關閉連接,調用了消費方法后,消費者會一直連接着rabbitMQ等待消費
47            
48         } catch (IOException | TimeoutException e) {
49             //失敗后記錄日志,返回false,代表消費失敗
50             logger.error("send message faild!",e);
51             return false;
52         }
53         
54         
55         return true;
56     }
57 }

然后在App的main方法中調用Cunsumer進行消費

 1 public class App 
 2 {
 3     //這個隊列名字要和生產者中的名字一樣,否則找不到隊列
 4     private final static String QUEUE_NAME = "hello";
 5     
 6     public static void main( String[] args )
 7     {
 8         MessageConsumer consumer = new MessageConsumer();
 9         consumer.consume(QUEUE_NAME);
10     }
11 }

結果如下,消費者一直在等待消息,每次有消息進來,就會立刻消費掉

6.多個消費者同時消費一個隊列

改造一下Consumer

在App中new多個消費者

改造Sender,使它不停的往RabbitMQ中發送消息

啟動Sender

啟動Consumer,發現消息很平均的發給四個客戶端,一人一個,誰也不插隊

如果我們把速度加快呢?把Sender的休息時間去掉,發現消費開始變得沒有規律了,其實呢,它還是有規律的,這個是RabbitMQ的特性,稱作“Round-robin dispatching”,消息會平均的發送給每一個消費者,可以看第一第二行,消息分別是56981和56985,相應的82、82、84都被分給了其他線程,只是在當前線程的時間片內,可以處理這么多任務,所以就一次打印出來了

 

7.結束語

這一章介紹了從安裝到用JAVA語言編寫生產者與消費者,在這里只是簡單的消費消息並打印日志,如果一個消息需要處理的時間很長,而處理的過程中,這個消費者掛掉了,那消息會不會丟失呢?答案是肯定的,而且已經分配給這個消費者,但還沒來得及處理的消息也會一並丟失掉,這個問題,RabbitMQ早就考慮到了,並且提供了解決方案,下一篇博文將進行詳細介紹


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM