建議先了解為什么項目要使用 MQ 消息隊列,MQ 消息隊列有什么優點,如果在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能做優化的同時,同時增加了系統的復雜性也維護難易度;其次,需要了解各種常見的 MQ 消息隊列有什么區別,以便在相同的成本下選擇一種最合適本系統的技術。
本文主要討論 RabbitMQ,從3月底接觸一個項目使用了 RabbitMQ,就開始着手學習,主要通過視頻和博客學習了一個月,基本明白了 RabbitMQ 的應用,其它的 MQ 隊列還不清楚,其底層技術還有待學習,以下是我目前的學習心得。
1.安裝 Erlang
RabbitMQ 是基於 Erlang 語言寫的,所以首先安裝 Erlang,本例是在 Windows 上安裝,也可以選擇在 Linux 上安裝,機器上沒有虛擬機,直接在 Windows 上操作,建議在 Linux 上安裝。官方下載 Erlang 軟件,我下載最新版本 21.3。安裝過程很簡單,直接 Next 到底。 Linux 安裝自行谷歌。如下圖:






2.安裝 RabbitMQ
在官方下載,選擇最新版本 3.7。安裝過程很簡單,直接 Next 到底。如下圖:



執行 rabbitmq-server start 命令,啟動服務。本地登陸並創建用戶,如下圖:


1、 超級管理員(administrator)
可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
2、 監控者(monitoring)
可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)
3、 策略制定者(policymaker)
可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
4、 普通管理者(management)
僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
5、 其他
無法登陸管理控制台,通常就是普通的生產者和消費者。
4.JAVA 操作RabbitMQ
參考 RabbitMQ 官網,一共分為6個模式

生產者:發送消息的一端






(1): 准備必要的 Pom 文件,導入相應的 jar 包,
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.edu</groupId> 8 <artifactId>rabbitmqdemo</artifactId> 9 <version>1.0</version> 10 11 <name>rabbitmqdemo</name> 12 <!-- FIXME change it to the project's website --> 13 <url>http://www.example.com</url> 14 15 <properties> 16 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 17 <maven.compiler.source>1.7</maven.compiler.source> 18 <maven.compiler.target>1.7</maven.compiler.target> 19 </properties> 20 21 <dependencies> 22 <!--測試包--> 23 <dependency> 24 <groupId>junit</groupId> 25 <artifactId>junit</artifactId> 26 <version>4.11</version> 27 <scope>test</scope> 28 </dependency> 29 <!--mq客戶端--> 30 <dependency> 31 <groupId>com.rabbitmq</groupId> 32 <artifactId>amqp-client</artifactId> 33 <version>4.5.0</version> 34 </dependency> 35 <!--日志--> 36 <dependency> 37 <groupId>org.slf4j</groupId> 38 <artifactId>slf4j-log4j12</artifactId> 39 <version>1.7.25</version> 40 </dependency> 41 <!--工具包--> 42 <dependency> 43 <groupId>org.apache.commons</groupId> 44 <artifactId>commons-lang3</artifactId> 45 <version>3.3.2</version> 46 </dependency> 47 <!--spring集成--> 48 <dependency> 49 <groupId>org.springframework.amqp</groupId> 50 <artifactId>spring-rabbit</artifactId> 51 <version>1.7.6.RELEASE</version> 52 </dependency> 53 <dependency> 54 <groupId>org.springframework</groupId> 55 <artifactId>spring-test</artifactId> 56 <version>4.3.7.RELEASE</version> 57 </dependency> 58 <dependency> 59 <groupId>junit</groupId> 60 <artifactId>junit</artifactId> 61 <version>RELEASE</version> 62 <scope>compile</scope> 63 </dependency> 64 </dependencies> 65 66 <build> 67 <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> 68 <plugins> 69 <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> 70 <plugin> 71 <artifactId>maven-clean-plugin</artifactId> 72 <version>3.1.0</version> 73 </plugin> 74 <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> 75 <plugin> 76 <artifactId>maven-resources-plugin</artifactId> 77 <version>3.0.2</version> 78 </plugin> 79 <plugin> 80 <artifactId>maven-compiler-plugin</artifactId> 81 <version>3.8.0</version> 82 </plugin> 83 <plugin> 84 <artifactId>maven-surefire-plugin</artifactId> 85 <version>2.22.1</version> 86 </plugin> 87 <plugin> 88 <artifactId>maven-jar-plugin</artifactId> 89 <version>3.0.2</version> 90 </plugin> 91 <plugin> 92 <artifactId>maven-install-plugin</artifactId> 93 <version>2.5.2</version> 94 </plugin> 95 <plugin> 96 <artifactId>maven-deploy-plugin</artifactId> 97 <version>2.8.2</version> 98 </plugin> 99 <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> 100 <plugin> 101 <artifactId>maven-site-plugin</artifactId> 102 <version>3.7.1</version> 103 </plugin> 104 <plugin> 105 <artifactId>maven-project-info-reports-plugin</artifactId> 106 <version>3.0.0</version> 107 </plugin> 108 </plugins> 109 </pluginManagement> 110 </build> 111 </project>
(2): 建立日志配置文件,在 resources 下建立 log4j.properties,便於打印精確的日志信息
1 log4j.rootLogger=DEBUG,A1 2 log4j.logger.com.edu=DEBUG 3 log4j.logger.org.mybatis=DEBUG 4 log4j.appender.A1=org.apache.log4j.ConsoleAppender 5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout 6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
(3): 編寫一個工具類,主要用於連接 RabbitMQ
1 package com.edu.util; 2 3 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 /** 8 * @ClassName ConnectionUtil 9 * @Deccription 穿件連接的工具類 10 * @Author DZ 11 * @Date 2019/5/4 12:27 12 **/ 13 public class ConnectionUtil { 14 /** 15 * 創建連接工具 16 * 17 * @return 18 * @throws Exception 19 */ 20 public static Connection getConnection() throws Exception { 21 ConnectionFactory connectionFactory = new ConnectionFactory(); 22 connectionFactory.setHost("127.0.0.1");//MQ的服務器 23 connectionFactory.setPort(5672);//默認端口號 24 connectionFactory.setUsername("test"); 25 connectionFactory.setPassword("test"); 26 connectionFactory.setVirtualHost("/test"); 27 return connectionFactory.newConnection(); 28 } 29 }
項目總體圖如下:

4.1.Hello World模式
此模式非常簡單,一個生產者對應一個消費者

1 package com.edu.hello; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription 創建發送者 10 * @Author DZ 11 * @Date 2019/5/4 12:45 12 **/ 13 public class Sender { 14 private final static String QUEUE = "testhello"; //隊列的名字 15 16 public static void main(String[] srgs) throws Exception { 17 //獲取連接 18 Connection connection = ConnectionUtil.getConnection(); 19 //創建連接 20 Channel channel = connection.createChannel(); 21 //聲明隊列 22 //參數1:隊列的名字 23 //參數2:是否持久化隊列,我們的隊列存在內存中,如果mq重啟則丟失。如果為ture,則保存在erlang的數據庫中,重啟,依舊保存 24 //參數3:是否排外,我們連接關閉后是否自動刪除隊列,是否私有當前隊列,如果私有,其他隊列不能訪問 25 //參數4:是否自動刪除 26 //參數5:我們傳入的其他參數 27 channel.queueDeclare(QUEUE, false, false, false, null); 28 //發送內容 29 channel.basicPublish("", QUEUE, null, "要發送的消息".getBytes()); 30 //關閉連接 31 channel.close(); 32 connection.close(); 33 } 34 }
定義一個消息接受者
1 package com.edu.hello; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.QueueingConsumer; 7 8 /** 9 * @ClassName Recver 10 * @Deccription 消息接受者 11 * @Author DZ 12 * @Date 2019/5/4 12:58 13 **/ 14 public class Recver { 15 private final static String QUEUE = "testhello";//消息隊列的名稱 16 17 public static void main(String[] args) throws Exception { 18 Connection connection = ConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE, false, false, false, null); 21 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 22 //接受消息,參數2表示自動確認消息 23 channel.basicConsume(QUEUE, true, queueingConsumer); 24 while (true) { 25 //獲取消息 26 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//如果沒有消息就等待,有消息就獲取消息,並銷毀,是一次性的 27 String message = new String(delivery.getBody()); 28 System.out.println(message); 29 } 30 } 31 }
此種模式屬於“點對點”模式,一個生產者、一個隊列、一個消費者,可以運用在聊天室(實際上真實的聊天室比這復雜很多,雖然是“點對點”模式,但是並不是一個生產者,一個隊列,一個消費者)
4.2.work queues

定義消息制造者:
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription 創建發送者 10 * @Author DZ 11 * @Date 2019/5/4 12:45 12 **/ 13 public class Sender { 14 private final static String QUEUE = "testhellowork"; //隊列的名字 15 16 public static void main(String[] srgs) throws Exception { 17 //獲取連接 18 Connection connection = ConnectionUtil.getConnection(); 19 //創建連接 20 Channel channel = connection.createChannel(); 21 //聲明隊列 22 //參數1:隊列的名字 23 //參數2:是否持久化隊列,我們的隊列存在內存中,如果mq重啟則丟失。如果為ture,則保存在erlang的數據庫中,重啟,依舊保存 24 //參數3:是否排外,我們連接關閉后是否自動刪除隊列,是否私有當前隊列,如果私有,其他隊列不能訪問 25 //參數4:是否自動刪除 26 //參數5:我們傳入的其他參數 27 channel.queueDeclare(QUEUE, false, false, false, null); 28 //發送內容 29 for (int i = 0; i < 100; i++) { 30 channel.basicPublish("", QUEUE, null, ("要發送的消息" + i).getBytes()); 31 } 32 //關閉連接 33 channel.close(); 34 connection.close(); 35 } 36 }
定義2個消息消費者
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 import java.util.Queue; 8 9 /** 10 * @ClassName Recver1 11 * @Deccription 消息接受者 12 * @Author DZ 13 * @Date 2019/5/4 12:58 14 **/ 15 public class Recver1 { 16 private final static String QUEUE = "testhellowork";//消息隊列的名稱 17 18 public static void main(String[] args) throws Exception { 19 Connection connection = ConnectionUtil.getConnection(); 20 final Channel channel = connection.createChannel(); 21 channel.queueDeclare(QUEUE, false, false, false, null); 22 //channel.basicQos(1);//告訴服務器,當前消息沒有確認之前,不要發送新消息,合理自動分配資源 23 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 24 @Override 25 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 26 //收到消息時候調用 27 System.out.println("消費者1收到的消息:" + new String(body)); 28 /*super.handleDelivery(consumerTag, envelope, properties, body);*/ 29 //確認消息 30 //參數2:false為確認收到消息,ture為拒絕收到消息 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 //注冊消費者 35 // 參數2:手動確認,我們收到消息后,需要手動確認,告訴服務器,我們收到消息了 36 channel.basicConsume(QUEUE, false, defaultConsumer); 37 } 38 }
1 package com.edu.work; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription 消息接受者 11 * @Author DZ 12 * @Date 2019/5/4 12:58 13 **/ 14 public class Recver2 { 15 private final static String QUEUE = "testhellowork";//消息隊列的名稱 16 17 public static void main(String[] args) throws Exception { 18 Connection connection = ConnectionUtil.getConnection(); 19 final Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE, false, false, false, null); 21 //channel.basicQos(1); 22 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 //收到消息時候調用 26 System.out.println("消費者2收到的消息:" + new String(body)); 27 /*super.handleDelivery(consumerTag, envelope, properties, body);*/ 28 //確認消息 29 //參數2:false為確認收到消息,ture為拒絕收到消息 30 channel.basicAck(envelope.getDeliveryTag(), false); 31 } 32 }; 33 //注冊消費者 34 // 參數2:手動確認,我們收到消息后,需要手動確認,告訴服務器,我們收到消息了 35 channel.basicConsume(QUEUE, false, defaultConsumer); 36 } 37 }
這種模式是最簡單的 work 模式,消息發送者,循環發送了100次消息,打印結果如下:



4.3.public模式

定義消息發布者
1 package com.edu.publish; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 14:43 12 **/ 13 public class Sender { 14 private final static String EXCHANGE_NAME = "testexchange";//定義交換機名字 15 16 public static void main(String[] args) throws Exception { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 //聲明交換機 20 //定義一個交換機,類型為fanout,也就是發布訂閱者模式 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 22 //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息會丟失 23 channel.basicPublish(EXCHANGE_NAME, "", null, "發布訂閱模式的消息".getBytes()); 24 channel.close(); 25 connection.close(); 26 } 27 }
定義2個消息消費者
1 package com.edu.publish; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver1 { 15 //定義交換機 16 private final static String EXCHANGE_NAME = "testexchange"; 17 private final static String QUEUE = "testpubqueue1"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //綁定隊列到交換機 24 channel.queueBind(QUEUE, EXCHANGE_NAME, ""); 25 channel.basicQos(1); 26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 30 System.out.println("消費者1:" + new String(body)); 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 channel.basicConsume(QUEUE, false, defaultConsumer); 35 } 36 }
1 package com.edu.publish; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver2 { 15 //定義交換機 16 private final static String EXCHANGE_NAME = "testexchange"; 17 private final static String QUEUE = "testpubqueue2"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //綁定隊列到交換機 24 channel.queueBind(QUEUE, EXCHANGE_NAME, ""); 25 channel.basicQos(1); 26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 30 System.out.println("消費者2:" + new String(body)); 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 }; 34 channel.basicConsume(QUEUE, false, defaultConsumer); 35 } 36 }
消費者1 和消費者2 都監聽了被同一個交換器綁定的隊列,因此消息被同時消費到了。如果消息發送到沒有隊列綁定的交換器時,消息將丟失,因為交換器沒有存儲消息的能力,消息只能存儲在隊列中。
應用場景:比如一個商城系統需要在管理員上傳商品新的圖片時,前台系統必須更新圖片,日志系統必須記錄相應的日志,那么就可以將兩個隊列綁定到圖片上傳交換器上,一個用於前台系統更新圖片,另一個用於日志系統記錄日志。
4.4.routing

定義消息發布者
1 package com.edu.route; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 15:05 12 **/ 13 public class Sender { 14 private final static String EXCANGE_NAME = "testroute"; 15 16 public static void main(String[] args) throws Exception { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 //定義路由格式的交換機 20 channel.exchangeDeclare(EXCANGE_NAME, "direct"); 21 channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes()); 22 channel.close(); 23 connection.close(); 24 } 25 }
1 package com.edu.route; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver1 { 15 //定義交換機 16 private final static String EXCHANGE_NAME = "testroute"; 17 private final static String QUEUE = "testroute1queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //綁定隊列到交換機 24 //參數3:綁定到交換機指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1"); 26 //如果需要綁定多個路由,再綁定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key2"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消費者1:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
1 package com.edu.route; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver2 { 15 //定義交換機 16 private final static String EXCHANGE_NAME = "testroute"; 17 private final static String QUEUE = "testroute2queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //綁定隊列到交換機 24 //參數3:綁定到交換機指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1"); 26 //如果需要綁定多個路由,再綁定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key3"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消費者2:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
應用場景:利用消費者能夠有選擇性的接收消息的特性,比如我們商城系統的后台管理系統對於商品進行修改、刪除、新增操作都需要更新前台系統的界面展示,而查詢操作確不需要,那么這兩個隊列分開接收消息就比較好。
4.5.Topic

1 package com.edu.topic; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 /** 8 * @ClassName Sender 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 15:19 12 **/ 13 public class Sender { 14 private final static String EXCANGE_NAME = "testtopexchange"; 15 16 public static void main(String[] args) throws Exception { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 channel.exchangeDeclare(EXCANGE_NAME, "topic"); 20 channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息發送者:".getBytes()); 21 channel.close(); 22 connection.close(); 23 } 24 }
1 package com.edu.topic; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver1 { 15 //定義交換機 16 private final static String EXCHANGE_NAME = "testtopexchange"; 17 private final static String QUEUE = "testtopic1queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //綁定隊列到交換機 24 //參數3:綁定到交換機指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*"); 26 //如果需要綁定多個路由,再綁定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消費者1:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
1 package com.edu.topic; 2 3 import com.edu.util.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 /** 9 * @ClassName Recver1 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 14:49 13 **/ 14 public class Recver2 { 15 //定義交換機 16 private final static String EXCHANGE_NAME = "testtopexchange"; 17 private final static String QUEUE = "testtopic2queue"; 18 19 public static void main(String[] args) throws Exception { 20 Connection connection = ConnectionUtil.getConnection(); 21 final Channel channel = connection.createChannel(); 22 channel.queueDeclare(QUEUE, false, false, false, null); 23 //綁定隊列到交換機 24 //參數3:綁定到交換機指定的路由的名字 25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*"); 26 //如果需要綁定多個路由,再綁定一次即可 27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#"); 28 channel.basicQos(1); 29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { 30 @Override 31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/ 33 System.out.println("消費者2:" + new String(body)); 34 channel.basicAck(envelope.getDeliveryTag(), false); 35 } 36 }; 37 channel.basicConsume(QUEUE, false, defaultConsumer); 38 } 39 }
第六種模式是將上述的模式集成其它的框架,進行遠程訪問,這里我們將集成 Spring 實現 RCP 遠程模式的使用
5.Spring 集成 RabbitMQ
5.1.自動集成 Spring
編寫spring的配置,此配置文件的目的是將 Spring 與 RabbitMQ 進行整合,實際上就是將 MQ 的相關信息(連接,隊列,交換機……)通過XML配置的方式實現
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 4 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 5 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd 6 http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd"> 8 <!--定義連接工廠--> 9 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test" 10 virtual-host="/test"/> 11 <!-- 12 定義模板 13 第三個參數,決定消息發送到哪里,如果為exchange,則發送到交換機;如果為queue,則發送到隊列 14 --> 15 <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/> 16 <rabbit:admin connection-factory="connectionFactory"/> 17 <!--定義隊列--> 18 <rabbit:queue name="myQueue" auto-declare="true"/> 19 <!--定義交換機--> 20 <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> 21 <!--將消息綁定到交換機--> 22 <rabbit:bindings> 23 <rabbit:binding queue="myQueue"> 24 25 </rabbit:binding> 26 </rabbit:bindings> 27 </rabbit:fanout-exchange> 28 <!--定義監聽器,收到消息會執行--> 29 <rabbit:listener-container connection-factory="connectionFactory"> 30 <!-- 定義監聽的類和方法--> 31 <rabbit:listener ref="consumer" method="test" queue-names="myQueue"/> 32 </rabbit:listener-container> 33 <!--定義消費者--> 34 <bean id="consumer" class="com.edu.spring.MyConsumer"/> 35 36 </beans>
生產者:
1 package com.edu.spring; 2 3 import org.springframework.amqp.rabbit.core.RabbitTemplate; 4 import org.springframework.context.ApplicationContext; 5 import org.springframework.context.support.ClassPathXmlApplicationContext; 6 7 /** 8 * @ClassName SpringTest 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 18:40 12 **/ 13 public class SpringTest { 14 public static void main(String[] args) throws Exception { 15 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); 16 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); 17 rabbitTemplate.convertAndSend("Spring的消息"); 18 ((ClassPathXmlApplicationContext) applicationContext).destroy(); 19 } 20 }
消費者
1 package com.edu.spring; 2 3 /** 4 * @ClassName MyConsumer 5 * @Deccription TODO 6 * @Author DZ 7 * @Date 2019/5/4 18:35 8 **/ 9 public class MyConsumer { 10 /*用於接收消息*/ 11 public void test(String message) { 12 System.err.println(message); 13 } 14 }
集成Spring主要是在xml中實現了隊列和交換機的創建。

5.2.手動模式
手動模式,主要增加MQ的回調操作,MQ消息失敗或者成功就有相應的回調信息,增強系統的健壯性,一旦產生異常,很快就能定位到異常的位置,所以在實際開發中,一般都這種方式
創建xml配置文件
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xsi:schemaLocation="http://www.springframework.org/schema/rabbit 6 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd 7 http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context-4.3.xsd"> 11 <context:component-scan base-package="com.edu.spring2"/> 12 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> 13 14 <!-- 15 定義連接工廠 16 publisher-confirms為ture,確認失敗等回調才會執行 17 --> 18 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test" 19 virtual-host="/test" publisher-confirms="true"/> 20 21 <rabbit:admin connection-factory="connectionFactory"/> 22 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener" 23 return-callback="returnCallBackListener" 24 mandatory="true"/> 25 <!--定義隊列--> 26 <rabbit:queue name="myQueue" auto-declare="true"/> 27 <!--定義交換機--> 28 <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX"> 29 <!--將消息綁定到交換機--> 30 <rabbit:bindings> 31 <rabbit:binding queue="myQueue"> 32 33 </rabbit:binding> 34 </rabbit:bindings> 35 </rabbit:direct-exchange> 36 <!--定義監聽器,收到消息會執行--> 37 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> 38 <!-- 定義監聽的類和方法--> 39 <rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/> 40 </rabbit:listener-container> 41 42 </beans>
創建回調監聽函數
1 package com.edu.spring2; 2 3 import org.springframework.amqp.rabbit.core.RabbitTemplate; 4 import org.springframework.amqp.rabbit.support.CorrelationData; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * @ClassName ConfirmCallBackListener 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 22:26 12 **/ 13 @Component("confirmCallBackListener") 14 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback { 15 16 @Override 17 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 18 System.out.println("確認回調 ack==" + ack + "回調原因==" + cause); 19 } 20 }
1 package com.edu.spring2; 2 3 import com.rabbitmq.client.Channel; 4 import org.springframework.amqp.core.Message; 5 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 6 import org.springframework.stereotype.Component; 7 8 /** 9 * @ClassName ReceiveConfirmTestListener 10 * @Deccription TODO 11 * @Author DZ 12 * @Date 2019/5/4 22:24 13 **/ 14 @Component("receiveConfirmTestListener") 15 public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { 16 /** 17 * 收到消息時,執行的監聽 18 * 19 * @param message 20 * @param channel 21 * @throws Exception 22 */ 23 @Override 24 public void onMessage(Message message, Channel channel) throws Exception { 25 System.out.println(("消費者收到了消息" + message)); 26 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 27 } 28 }
1 package com.edu.spring2; 2 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.rabbit.core.RabbitTemplate; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * @ClassName ReturnCallBackListener 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 22:28 12 **/ 13 @Component("returnCallBackListener") 14 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback { 15 @Override 16 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 17 System.out.println("失敗回調" + message); 18 } 19 }
回調函數的配置來自 XML

1 package com.edu.spring2; 2 3 import org.springframework.amqp.core.AmqpTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.stereotype.Component; 6 7 /** 8 * @ClassName PublicUtil 9 * @Deccription TODO 10 * @Author DZ 11 * @Date 2019/5/4 22:30 12 **/ 13 @Component("publicUtil") 14 public class PublicUtil { 15 @Autowired 16 private AmqpTemplate amqpTemplate; 17 18 public void send(String excange, String routingkey, Object message) { 19 amqpTemplate.convertAndSend(excange, routingkey, message); 20 } 21 }
創建測試類
1 package com.edu.spring2; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.test.context.ContextConfiguration; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 /** 10 * @ClassName TestMain 11 * @Deccription TODO 12 * @Author DZ 13 * @Date 2019/5/4 22:32 14 **/ 15 @RunWith(SpringJUnit4ClassRunner.class) 16 @ContextConfiguration(locations = {"classpath:applicationContext2.xml"}) 17 public class TestMain { 18 @Autowired 19 private PublicUtil publicUtil; 20 private static String exChange = "DIRECT_EX";//交換機 21 private static String queue = "myQueue"; 22 23 /** 24 * exChange和queue均正確 25 * confirm會執行,ack = ture 26 * 消息正常接收(接收消息確認方法正常執行) 27 */ 28 @Test 29 public void test1() throws Exception { 30 publicUtil.send(exChange, queue, "測試1,隊列和交換機均正確"); 31 } 32 /** 33 * exChange錯誤,queue正確 34 * confirm執行,ack=false 35 * 消息無法接收(接收消息確認方法不能執行) 36 */ 37 @Test 38 public void test2() throws Exception { 39 publicUtil.send(exChange + "1", queue, "測試2,隊列正確,交換機錯誤"); 40 } 41 /** 42 * exChange正常,queue錯誤 43 * return執行 44 * confirm執行,ack=ture 45 */ 46 @Test 47 public void test3() throws Exception { 48 publicUtil.send(exChange, queue + "1", "測試2,隊列錯誤,交換機正確"); 49 } 50 /** 51 * exChange錯誤,queue錯誤 52 * confirm執行,ack=false 53 */ 54 @Test 55 public void test4() throws Exception { 56 publicUtil.send(exChange + "1", queue + "1", "測試2,隊列錯誤,交換機錯誤"); 57 } 58 }
測試結果如下:
-
test1:exChange和queue均正確
- confirm會執行,ack=ture;能正常收到消息(接收消息的方法正常執行)
-
test2:exChange錯誤,queue正確

- test3:exChange正確,queue錯誤

- test4:exChange和queue均錯誤

上述結論及代碼如下圖:

根據上述的測試結果,我們可以根據回調函數的返回結果,查看MQ的錯誤出現在那里。根據上述結論,我們可以對3個回調函數做如下處理:
-
類 ReceiveConfirmTestListener 中的onMessage方法主要用於接收從 RabbitMQ 推送過來的消息,並對消息做相應的邏輯處理
-
類 ConfirmCallBackListener 中的 confirm 方法主要用於檢查交換機(exChange),當 ack=false,交換機可能錯誤
-
類 ReturnCallBackListener 中的 returnedMessage 方法用於檢查隊列(queue),當此方法執行時,隊列可能錯誤




實際上,在真實項目中,上面3個方法也是按照這3個邏輯進行設計的。當然這3個方法中還可以加入更多的日志消息,和邏輯處理業務。
6.參考
https://blog.csdn.net/liu911025/article/details/80460182
https://blog.csdn.net/lyhkmm/article/details/78775369
https://blog.csdn.net/vbirdbest/article/details/78670550
https://blog.csdn.net/vbirdbest/article/details/78670550
https://www.rabbitmq.com/getstarted.html