一、RabbitMQ介紹
MQ全稱為Message Queue,即消息隊列, RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message
Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開
發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/
開發中消息隊列通常有如下應用場景:
1、任務異步處理。
將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。
2、應用程序解耦合
MQ相當於一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
市場上還有哪些消息隊列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
為什么使用RabbitMQ呢?
1、使得簡單,功能強大。
2、基於AMQP協議。
3、社區活躍,文檔完善。
4、高並發性能好,這主要得益於Erlang語言。
5、Spring Boot默認已集成RabbitMQ
二、使用MQ的好處
2.1實現異步處理
同步的通信:發出一個調用請求之后,在沒有得到結果之前,就不返回。由調用者主動等待這個調用的結果。
異步通信:調用在發出之后,這個調用就直接返回了,所以沒有返回結果。也就是說,當一個異步過程調用發出后,調用者不會馬上得到結果。而是在調用發出后,
被調用者通過狀態、通知來通知調用者,或通過回調函數處理這個調用。
2.2實現解耦
// 偽代碼 public void returnGoods(){
stockService.updateInventory ();
payService.refund(); noticeService.notice();
2.3實現流量削鋒
三、 RabbitMQ 中的概念模型
MQ的本質:消息隊列,又叫做消息中間件。是指用高效可靠的消息傳遞機制進行與平台無關的數據交流,並基於數據通信來進行分布式系統的集成。
通過提供消息傳遞和消息隊列模型, 可以在分布式環境下擴展進程的通信
MQ的特點:
1、 是一個獨立運行的服務。生產者發送消息,消費者接收消費,需要先跟服務器建立連接。
消息模型:
所有 MQ 產品從模型抽象上來說都是一樣的過程:消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,
最后將消息發送到監聽的消費者。

RabbitMQ的基本概念
下圖是RabbitMQ的基本結構:

組成部分說明如下:
- Broker :消息隊列服務,此進程包括兩個部分:Exchange和Queue。
- Exchange :消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。隊列使用綁定鍵(Binding Key)跟交換機建立綁定關系。
- Queue :消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的消費方,它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
- Producer :消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。
- Consumer :消息消費者,即消費方客戶端,接收MQ轉發的消息。
- Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
- Binding:綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
- Connection:無論是生產者發送消息,還是消費者接收消息,都必須跟Broker之間建立一個連接,這個是TCP長連接
- Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
- Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
Virtual Host理解如下圖:

相關名詞:
包括:ConnectionFactory(連接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。
ConnectionFactory(連接管理器):應用程序與Rabbit之間建立連接的管理器,程序代碼中使用;
Channel(信道):消息推送使用的通道;
Exchange(交換器):用於接受、分配消息;
Queue(隊列):用於存儲生產者的消息;
RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;
BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;
看到上面的解釋,最難理解的路由鍵和綁定鍵了,那么他們具體怎么發揮作用的,請看下圖:

消息發布接收流程:
-----發送消息-----
1、生產者和Broker建立TCP連接。
2、生產者和Broker建立通道。
3、生產者通過通道消息發送給Broker,由Exchange將消息進行轉發。
4、Exchange將消息轉發到指定的Queue(隊列)
----接收消息-----
1、消費者和Broker建立TCP連接
2、消費者和Broker建立通道
3、消費者監聽指定的Queue(隊列)
4、當有消息到達Queue時Broker默認將消息推送給消費者。
5、消費者接收到消息。
四、 下載安裝
RabbitMQ由Erlang語言開發,Erlang語言用於並發及分布式系統的開發,在電信領域應用廣泛,OTP(Open
Telecom Platform)作為Erlang語言的一部分,包含了很多基於Erlang開發的中間件及工具庫,安裝RabbitMQ需
要安裝Erlang/OTP,並保持版本匹配,如下圖:
RabbitMQ的下載地址:http://www.rabbitmq.com/download.html

1)下載erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe,以管理員方式運行此文件,安裝。
erlang安裝完成需要配置erlang環境變量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添
加%ERLANG_HOME%\bin;
2)安裝RabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3,以管理員方式運行此文件,安裝
3)啟動
- 安裝成功后會自動創建RabbitMQ服務並且啟動。
從開始菜單啟動RabbitMQ,完成在開始菜單找到RabbitMQ的菜單:

RabbitMQ Service-install :安裝服務
RabbitMQ Service-remove 刪除服務
RabbitMQ Service-start 啟動
RabbitMQ Service-stop 啟動
2.如果沒有開始菜單則進入安裝目錄下sbin目錄手動啟動:

1)安裝並運行服務
rabbitmq-service.bat install 安裝服務 rabbitmq-service.bat stop 停止服務 rabbitmq-service.bat start 啟動服務
2)安裝管理插件
安裝rabbitMQ的管理插件,方便在瀏覽器端管理RabbitMQ
管理員身份運行 rabbitmq-plugins.bat enable rabbitmq_management
啟動成功 登錄RabbitMQ
進入瀏覽器,輸入:http://localhost:15672

初始賬號和密碼:guest/guest

3) 注意事項:
1、安裝erlang和rabbitMQ以管理員身份運行。
2、當卸載重新安裝時會出現RabbitMQ服務注冊失敗,此時需要進入注冊表清理erlang
搜索RabbitMQ、ErlSrv,將對應的項全部刪除。
五、java操作隊列
1、消息隊列RabbitMQ的五種形式隊列
1).點對點(簡單)的隊列
2).工作(公平性)隊列模式
3.發布訂閱模式
4.路由模式Routing
5.通配符模式Topics
2、簡單隊列
1)功能:一個生產者P發送消息到隊列Q,一個消費者C接收

P表示為生產者 、C表示為消費者 紅色表示隊列。
點對點模式分析:

Maven依賴:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
封裝Connection:
/** * 封裝Connection */ public class MQConnectionUtils { public static Connection getConnection(){ //創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器地址 factory.setHost("localhost"); //設置端口號 factory.setPort(5672); //設置用戶名 factory.setUsername("guest"); //設置密碼 factory.setPassword("guest"); //設置vhost factory.setVirtualHost("/admin_yehui"); try { //創建連接 return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
參數詳解:

生產者:
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到連接 Connection connection = MQConnectionUtils.getConnection(); //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務 Channel channel = connection.createChannel(); //聲明隊列 如果Rabbit中沒有此隊列將自動創建 /** * 參數1:隊列的名稱 * 參數2:是否持久化 * 參數3:是否獨占此鏈接,是否排他性隊列。排他性隊列只能在聲明它的 Connection中使用(可以在同一個 Connection 的不同的 channel 中使用),
連接斷開時自動刪除。
* 參數4:隊列不在使用時是否自動刪除
* 參數5:隊列參數
*
*/
channel.queueDeclare(QUEUE_NAME, false,false, false, null);
String msg = "test_yehui_rabbitmq";
/**
* 發送消息
* 參數1: Exchange的名稱,如果沒有指定,則使用Default Exchange
* 參數2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列
* 參數3:消息包含的屬性
* 參數4:消息體
* 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯
* 示綁定或解除綁定認的交換機,routingKey等於隊列名稱
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("消息發送體:"+msg);
channel.close();
connection.close();
}
}
消費者:
public class Consumer01 {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException {
//得到連接
Connection connection = MQConnectionUtils.getConnection();
//創建一個通道
Channel channel = connection.createChannel();
//定義消費方法
DefaultConsumer consumer = new DefaultConsumer (channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//得到交換機
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內容
String message = new String(body, "utf-8");
System.out.println("消費者消費:"+message);
}
};
//監聽隊列
/**
* 參數1:隊列名稱
* 參數2: 設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置
* 為false則需要手動回復
* 參數3:消費消息的方法,消費者接收到消息后調用此方法
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
3、消息隊列RabbitMQ應答模式
為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。 如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。 沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。 消息應答是默認打開的。
我們通過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。
如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同於ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。
4、工作隊列
work queues與簡單隊列相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。
應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

P表示為生產者 、C表示為消費者 紅色表示隊列。
工作隊列分析

均攤消費
發布訂閱模式:
1、每個消費者監聽自己的隊列。
2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息
測試:
1、使用簡單隊列,啟動多個消費者。
2、生產者發送多個消息。
結果:
1、一條消息只會被一個消費者接收;
2、rabbit采用輪詢的方式將消息是平均發送給消費者的;
3、消費者在處理完某條消息后,才會收到下一條消息。
RabbitMQ的公平轉發
目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。
並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。 為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,只有在消費者空閑的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,
也就是等待消費者處理完畢並自己對剛剛處理的消息進行確認之后,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去清閑。 通過 設置channel.basicQos(1);
生產者
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到連接
Connection connection = MQConnectionUtils.getConnection(); //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
Channel channel = connection.createChannel(); //聲明隊列 如果Rabbit中沒有此隊列將自動創建
/** * 參數1:隊列的名稱 * 參數2:是否持久化 * 參數3:是否獨占此鏈接 * 參數4:隊列不在使用時是否自動刪除 * 參數5:隊列參數 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); /** * 發送消息 * 參數1: Exchange的名稱,如果沒有指定,則使用Default Exchange * 參數2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列 * 參數3:消息包含的屬性 * 參數4:消息體 * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯 * 示綁定或解除綁定認的交換機,routingKey等於隊列名稱 */ channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息
for(int i=0;i<10;i++){ String msg = "test_yehui_rabbitmq"+i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } System.out.println("消息發送完畢"); channel.close(); connection.close(); } }
消費者1:
public class Consumer01 { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到連接
Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
Channel channel = connection.createChannel(); //聲明隊列 channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息 //定義消費方法
DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交換機
String exchange = envelope.getExchange(); //路由key
String routingKey = envelope.getRoutingKey(); //消息id
long deliveryTag = envelope.getDeliveryTag(); //消息內容
String message = new String(body, "utf-8"); System.out.println("消費者消費:"+message); try { //睡眠1s
Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動回執消息
channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列
/** * 參數1:隊列名稱 * 參數2: 設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置 * 為false則需要手動回復 * 參數3:消費消息的方法,消費者接收到消息后調用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
消費者2
public class Consumer02 { //隊列名稱
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到連接
Connection connection = MQConnectionUtils.getConnection(); //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
Channel channel = connection.createChannel(); //聲明隊列 如果Rabbit中沒有此隊列將自動創建
channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { //得到交換機
String exchange = envelope.getExchange(); //路由key
String routingKey = envelope.getRoutingKey(); //消息id
long deliveryTag = envelope.getDeliveryTag(); //消息內容
String message = new String(body, "utf-8"); System.out.println("消費者消費:"+message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手動回執消息
channel.basicAck(envelope.getDeliveryTag(), false); } } }; //監聽隊列
/** * 參數1:隊列名稱 * 參數2: 設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置 * 為false則需要手動回復 * 參數3:消費消息的方法,消費者接收到消息后調用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
結果;
消費者1比消費者2消費得少
5、RabbitMQ交換機的作用
生產者發送消息不會向傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列在將消息以推送或者拉取方式給消費者進行消費,
這和我們之前學習Nginx有點類似。 交換機的作用根據具體的路由策略分發到不同的隊列中,交換機有四種類型。
Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的,隊列與直連類型的交換機綁定,需指定一個精確的綁定鍵,
Fanout exchange(廣播交換機)主題類型的交換機與隊列綁定時,不需要指定綁定鍵。因此生產者發送消息到廣播類型的交換機上,也不需要攜帶路由鍵。消息達到交換機時,所有與之綁定了的隊列,都會收到相同的消息的副本。

Topic exchange(主題交換機)隊列通過路由鍵綁定到交換機上,然后,交換機根據消息里的路由值,將消息路由給一個或多個綁定隊列,

p是生產者 X是交換機 C1 、C2 是消費者
6、發布/訂閱模式Publish/Subscribe
基本概念:
這個可能是消息隊列中最重要的隊列了,其他的都是在它的基礎上進行了擴展。
功能實現:一個生產者發送消息,多個消費者獲取消息(同樣的消息),包括一個生產者,一個交換機,多個隊列,多個消費者。
思路解讀(重點理解): 
(1)一個生產者,多個消費者
(2)每一個消費者都有自己的一個隊列
(3)生產者沒有直接發消息到隊列中,而是發送到交換機
(4)每個消費者的隊列都綁定到交換機上
(5)消息通過交換機到達每個消費者的隊列 該模式就是Fanout Exchange(廣播交換機)將消息路由給綁定到它身上的所有隊列 以用戶發郵件案例講解
注意:交換機沒有存儲消息功能,如果消息發送到沒有綁定消費隊列的交換機,消息則丟失。
工作原理圖:

生產者:
public class ProducerFanout { //交換機
private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //得到連接
Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
Channel channel = connection.createChannel();
//聲明交換機
/** *
參數1:交換機名稱
* 參數2:交換機類型 */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//發送消息 /** * 參數1:交換機名稱 * 參數2:路由key * 參數3:消息屬性參數 * 參數4:消息實體 */
channel.basicPublish(EXCHANGE_NAME,"",null,"fanout_exchange_msg".getBytes());
channel.close();
connection.close();
}
}
郵件消費者
public class ConsumerEmailFanout { //郵件隊列
private static final String EMAIL_QUEUE = "email_queue"; //交換機
private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("郵件消費者"); //得到連接
Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
Channel channel = connection.createChannel();
//聲明一個隊列
channel.queueDeclare(EMAIL_QUEUE,false,false,false,null);
//綁定交換機 // 4.消費者綁定交換機 參數1 隊列 名稱 參數2交換機名稱 參數3 routingKey
channel.queueBind(EMAIL_QUEUE,EXCHANGE_NAME,"");
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息:" + msg); } }; //消費者監聽隊列消息
channel.basicConsume(EMAIL_QUEUE,true,consumer); } }
短信消費者
public class ConsumerSMSFanout { //短信隊列
private static final String SMS_QUEUE = "sms_queue"; //交換機
private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("短信消費者"); //得到連接
Connection connection = MQConnectionUtils.getConnection(); //創建一個通道
Channel channel = connection.createChannel(); //聲明一個隊列
channel.queueDeclare(SMS_QUEUE,false,false,false,null); //綁定交換機 // 4.消費者綁定交換機 參數1 隊列 名稱 參數2交換機名稱 參數3 routingKey
channel.queueBind(SMS_QUEUE,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消費者獲取生產者消息:" + msg); } }; //消費者監聽隊列消息
channel.basicConsume(SMS_QUEUE,true,consumer); } }
7、路由模式RoutingKey



路由模式:
1、每個消費者監聽自己的隊列,並且設置routingkey。
2、生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
生產者:
/** * 生產者 */
public class DirctProduct { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明交換機 //3.綁定的交換機 參數1交互機名稱 參數2 exchange類型
channel.exchangeDeclare("dirctExchange","direct"); //聲明隊列
channel.queueDeclare("emailDirectQueue",true,false,false,null); channel.queueDeclare("smsDirectQueue",true,false,false,null); //綁定交換機 //交換機和隊列進行綁定
/** * 參數1:隊列的名稱 * 參數2:交換機的名稱 * 參數3:路由key */ channel.queueBind("emailDirectQueue","dirctExchange","emailRoutKey"); channel.queueBind("smsDirectQueue","dirctExchange","smsRoutKey"); //發送消息
channel.basicPublish("dirctExchange","emailRoutKey",null,"Email郵件發送".getBytes()); channel.basicPublish("dirctExchange","smsRoutKey",null,"Sms發送發送".getBytes()); channel.close(); connection.close(); } }
郵件消費者
/** * 郵件消費者 */
public class EamilConsomer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("emailDirectQueue",true,consumer); } }
短信消費者
/** * 短信消費者 */
public class SmsConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("smsDirectQueue",true,consumer); } }
8、通配符模式Topics


路由模式:
1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。
2、生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配,由交換機根據routingkey來轉發消息到指定的隊列。
符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor
生產者:
public class ProducerTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //聲明交換機
channel.exchangeDeclare("topicExchange","topic"); //聲明隊列
channel.queueDeclare("emailQueueTopic",true,false,false,null); channel.queueDeclare("smsQueueTopic",true,false,false,null); //綁定
channel.queueBind("emailQueueTopic","topicExchange","email.*"); channel.queueBind("smsQueueTopic","topicExchange","sms.*"); //發送消息
channel.basicPublish("topicExchange","email.log",null,"email郵件".getBytes()); channel.basicPublish("topicExchange","sms.log",null,"sms郵件".getBytes()); channel.close(); connection.close(); } }
郵件消費者:
public class EmailTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println("郵件隊列:"+new String(body)); } }; channel.basicConsume("emailQueueTopic",true,consumer); } }
短信消費者:
public class SmsTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException { System.out.println("短信隊列:"+new String(body)); } }; channel.basicConsume("smsQueueTopic",true,consumer); } }
9、SpringBoot整合RabbitMQ
生產者:
maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>2.0.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- fastjson 依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.29</version>
</dependency>
<!-- 添加springboot對amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
yml文件
spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui
定義RabbitConfig類,配置Exchange、Queue、及綁定交換機。
案例是用的是fanout交換機類型
@Configuration public class RabbitMQConfig { // 郵件隊列
public static String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue"; // 短信隊列
public static String FANOUT_SMS_QUEUE = "fanout_sms_queue"; //交換機
public static String EXCHANGE_NAME = "fanoutExchange"; //定義郵件隊列
@Bean("fanoutEamilQueue") public Queue fanoutEamilQueue(){ return new Queue(FANOUT_EMAIL_QUEUE); } //定義短信隊列
@Bean("fanoutSmsQueue") public Queue fanoutSmsQueue(){ return new Queue(FANOUT_SMS_QUEUE); } //定義交換機
@Bean("fanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE_NAME); } //將郵件隊列綁定交換機
@Bean("bindingEmailExchange") public Binding bindingEmailExchange(@Qualifier("fanoutEamilQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //將郵件隊列綁定交換機
@Bean("bindingSmsExchange") public Binding bindingSmsExchange(@Qualifier("fanoutSmsQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
生產者投遞消息
/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EMAIL_QUEUE,msg); } }
控制層調用代碼
@RestController public class RabbitController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/index") public String index(){ fanoutProducer.send("郵件消息"); fanoutProducer.send("短信消息"); return "index"; } }
消費者
maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>2.0.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- fastjson 依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- 添加springboot對amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
application.yml文件
spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui server: port: 8081
監聽消息
/** * 監聽隊列 */ @Component public class ReceiveHandler { /** * 郵箱 * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("郵箱消費者獲取生產者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消費者獲取生產者消息msg:" + msg); } }
10、消息確認機制
問題產生背景: 生產者發送消息出去之后,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的。而且有的時候我們在發送消息之后,
后面的邏輯出問題了,我們不想要發送之前的消息了,需要撤回該怎么做。
解決方案: 1.AMQP 事務機制
2.Confirm 模式
1.事務模式:
RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback()
txSelect用於將當前channel設置成transaction模式
txCommit用於提交事務
txRollback用於回滾事務

在事務模式里面,只有收到了服務端的 Commit-OK 的指令,才能提交成功。所以
可以解決生產者和服務端確認的問題。但是事務模式有一個缺點,它是阻塞的,一條消
息沒有發送完畢,不能發送下一條消息,它會榨干 RabbitMQ 服務器的性能。所以不建
議大家在生產環境使用。
rabbitMq api實現

springboot 整合RabbitMq 實現
/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { //在springboot設置事務
rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend("faoutEmailQueue1",i); } return "success"; } } @Component public class ReceiveHandler { /** * 郵箱 * @param
*/ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(String i) throws IOException { System.out.println(i); } }
使用抓包工具查看沒有使用事務時:

使用抓包工具查看使用事務時:

2.生產者確認Confirm 模式
RabbitTemplate模板類
@Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //spring.rabbitmq.publisher-confirms=true 開啟確認模式, // 可以在application.properties,也可以根據下面的代碼配置
connectionFactory.setPublisherConfirms(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) {// 如果發送交換機成功,但是沒有匹配路由到指定的隊列, 這個時候ack返回是true(這是一個坑)
System.out.println("生產者ACK成功:" + correlationData.getId()); } else { // 失敗 啟動重試 // 強制轉換
CorrelationDataDo errorCorrelationData = (CorrelationDataDo) correlationData; System.out.println(errorCorrelationData);// 注意: 不能調用rabbitTemplate發送,會導致線程死鎖 //rabbitTemplate.convertAndSend(); // 解決辦法 errorCorrelationData放入緩存. 讓定時任務輪詢發送.
Map errorMap = new HashMap(); errorMap.put("status", "-2");// ack失敗
errorMap.put("errorMsg", cause); errorMap.put("errorCorrelationData", errorCorrelationData); redisTemplate.boundHashOps("orderMessageStatus").put(errorCorrelationData.getId(), errorMap); } } }); return rabbitTemplate; }
注意事項: 但是在主線程發送消息的過程中,rabbitMQ服務器關閉,這時候主程序和 ConfirmCallback 線程都
會等待Connection恢復,然后重新啟動 rabbitmq ,當應用程序重新建立 connection 之后,兩個線程都會死鎖
擴展CorrelationDataDo類
@Data public class CorrelationDataDo extends CorrelationData { // 消息
private Object message; // 交換機
private String exchange; // 路由鍵
private String routingKey; // 重試次數
private int retryCount = 0; }
/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<String,Object>(); map.put("id",i); String uuid = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody((JSONObject.toJSONString(map)).getBytes())//設置發送消息
.setCorrelationId(uuid)//設置全局ID
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化
.setContentType("application/json")//設置格式application/json
.build(); //CorrelationData 用於消息確認 CorrelationDataDo繼承了CorrelationData用於擴展屬性
CorrelationDataDo correlationData = new CorrelationDataDo(); correlationData.setId(uuid); correlationData.setMessage(message); correlationData.setExchange("faoutEmailQueue1"); rabbitTemplate.convertAndSend("faoutEmailQueue1",message,correlationData); } return "success"; } } /** * 接收消息 */ @Component public class ReceiveHandler { /** * 郵箱 * @param
*/ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(Message message) { // 創建json消息轉換器
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); //獲取消息
Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }
3.消費者消息確認
3.1消費json格式數據
// 創建json消息轉換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 獲取消息 Map map = (Map) jackson2JsonMessageConverter.fromMessage(message); //1.獲取消息
Long seckillId = ((Integer) map.get("seckillId")).longValue(); //2.扣減庫存
int result = 0; result = seckillGoodsMapper.updateStockCountByPrimaryKey(seckillId); if (result > 0) { System.out.println("修改庫存成功"); // 清空redis緩存數據
redisTemplate.boundHashOps("orderMessage").delete(message.getMessageProperties().getC orrelationId()); } else { System.out.println("修改庫存失敗,人工處理"); }
3.2消息確認方式
基於配置:
在消費者application.properties文件中設置消息確認方式
# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto spring.rabbitmq.listener.simple.acknowledge-mode=auto
none: autoAck=true 自動ack,不管監聽是否發生錯誤都返回ack
manual: 手動ack, 用戶手動設置ack或者nack
auto: 根據監聽容器反會ack或者nack,如果容器拋出異常則自動啟動重試機制.
auto模式:
# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto spring.rabbitmq.listener.simple.acknowledge-mode=auto
manual 模式
# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto spring.rabbitmq.listener.simple.acknowledge-mode=manual
配置在config類里面
@Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //簽收模式 AcknowledgeMode.MANUAL手動 AcknowledgeMode.AUTO自動,AcknowledgeMode.NONE // none: autoAck=true 自動ack,不管監聽是否發生錯誤都返回ack
factory.setAcknowledgeMode(AcknowledgeMode.NONE); return factory; }
消費者代碼
@Component public class SeckillOrder_Consumer { @Autowired TbSeckillGoodsMapper seckillGoodsMapper; @Autowired RedisTemplate redisTemplate; //containerFactory 這個就是不配置在配置文件里面,而是寫在config配置類里面的引入的config類
@RabbitListener(queues = {"seckillOrder_queue"},containerFactory = "mqConsumerlistenerContainer") public void receive(Message message, Channel channel) throws IOException { // 創建json消息轉換器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); Map map = (Map) jackson2JsonMessageConverter.fromMessage(message); // 解決冪等性問題
Object orderMessage = redisTemplate.boundHashOps("orderMessage").get(message.getMessageProperties().getCorrelationId()); if (orderMessage == null) { System.out.println("已經消費了, 不在重復消費"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } //1.獲取消息
Long seckillId = ((Integer) map.get("seckillId")).longValue(); //2.扣減庫存
int result = 0; try { result = seckillGoodsMapper.updateStockCountByPrimaryKey(seckillId); if (result > 0) { System.out.println("修改庫存成功"); //清空redis緩存數據
redisTemplate.boundHashOps("orderMessage").delete(message.getMessageProperties().getCorrelationId()); // 手動確認ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("修改庫存失敗,人工處理"); // 將錯誤放入緩存
Map errorMap = new HashMap(); errorMap.put("status", "-4");//SQL執行異常
errorMap.put("errorMsg", "SQL執行的結果:" + result); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } catch (Exception e) { System.out.println("修改庫存失敗,人工處理"); // 手動確認nack 區分問題原因: 數據庫網絡連接(重回隊列) 其他原因(不重回隊列) // 將錯誤放入緩存
Map errorMap = new HashMap(); if (e instanceof MyBatisSystemException) { MyBatisSystemException myBatisSystemException = (MyBatisSystemException) e; // 獲取根本原因
Throwable rootCause = myBatisSystemException.getRootCause(); if (rootCause instanceof ConnectException) { // 重試-重回隊列 設置重試3次
errorMap = (Map) redisTemplate.boundHashOps("orderMessageStatus").get(message.getMessageProperties().getCorrelationId()); if (errorMap == null) { errorMap = new HashMap(); errorMap.put("retryCount", 0);//初始重試次數
} int retryCount = (int) errorMap.get("retryCount"); if (++retryCount <= 3) { // 重試
errorMap.put("status", "-5");//數據庫網絡原因
errorMap.put("errorMsg", e.getMessage()); errorMap.put("retryCount", retryCount); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.out.println("網絡原因.開啟重試:" + retryCount); } else { errorMap.put("status", "-5");//數據庫網絡原因
errorMap.put("errorMsg", e.getMessage()); errorMap.put("retryCount", retryCount); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("重試次數已用完. 不在重試..."); } } else { errorMap.put("status", "-6");//其他原因
errorMap.put("errorMsg", e.getMessage()); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("其他原因,不需要重回隊列"); } } else { errorMap.put("status", "-6");//其他原因
errorMap.put("errorMsg", e.getMessage()); redisTemplate.boundHashOps("orderMessageStatus").put(message.getMessageProperties().getCorrelationId(), errorMap); // 手動確認nack
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); System.out.println("其他原因,不需要重回隊列"); } } } }
config類相關的解釋請參考:https://www.jianshu.com/p/090ed51006d5,https://blog.csdn.net/qq_42606051/article/details/82869148
11、限流
基於配置文件
#限流2條 spring.rabbitmq.listener.simple.prefetch=2
/** * 發送消息類 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public String send(String msg){ for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<String,Object>(); map.put("id",i); String uuid = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody((JSONObject.toJSONString(map)).getBytes())//設置發送消息
.setCorrelationId(uuid)//設置全局ID
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息持久化
.setContentType("application/json")//設置格式application/json
.build(); //CorrelationData 用於消息確認 CorrelationDataDo繼承了CorrelationData用於擴展屬性
CorrelationDataDo correlationData = new CorrelationDataDo(); correlationData.setId(uuid); correlationData.setMessage(message); correlationData.setExchange("faoutEmailQueue1"); rabbitTemplate.convertAndSend("faoutEmailQueue1",message,correlationData); } return "success"; } } /** * 接收消息 */ @Component public class ReceiveHandler { /** * 郵箱 * @param
*/ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1")) public void receive_email(Message message) { // 創建json消息轉換器
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); //獲取消息
Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }
在消費者端debug運行結果:
運行生產者可以看到Ready=3 Unacked=2, Total=5, Total代表隊列中的消息總條數,Ready代表消費者還可以讀到的條數,Unacked:代表還有多少條沒有被應答
基於config配置類
@Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //限流配置
factory.setPrefetchCount(2); return factory; }
消費者端代碼
/** * 接收消息 */ @Component public class ReceiveHandler { /** * 郵箱 * @param
*/ @RabbitListener(queuesToDeclare = @Queue(value ="faoutEmailQueue1"), containerFactory = "mqConsumerlistenerContainer") public void receive_email(Message message) { // 創建json消息轉換器
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); //獲取消息
Map<String,Object> map = (Map<String, Object>) jsonMessageConverter.fromMessage(message); System.out.println(map); } }
參考博客代碼:https://blog.csdn.net/linsongbin1/article/details/100658415,https://blog.csdn.net/vbirdbest/article/details/78699913
11、RabbitMQ消息重試機制
消費者在消費消息的時候,如果消費者業務邏輯出現程序異常,這時候應該如何處理?
答案:使用消息重試機制。(演示重試機制)
如何合適選擇重試機制:
情況1: 消費者獲取到消息后,調用第三方接口,但接口暫時無法訪問,是否需要重試? (需要重試機制)
情況2: 消費者獲取到消息后,拋出數據轉換異常,是否需要重試?(不需要重試機制)需要發布進行解決。
如何實現重試機制 總結:
對於情況2,如果消費者代碼拋出異常是需要發布新版本才能解決的問題,那么不需要重試,重試也無濟於事。應該采用日志記錄+定時任務job健康檢查+人工進行補償
重試機制案例:
生產者代碼就按照上面的案例就可以了,
消費者:
yml文件
spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####開啟消費者重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 3000 server: port: 8081
/** * 監聽隊列 */ @Component public class ReceiveHandler { /** * 郵箱 * @param msg */ //rabbitmq 默認情況下 如果消費者程序出現異常的情況下,會自動實現補償機制
//重試機制都是間隔性的 // 補償(重試機制) 隊列服務器 發送補償請求 // 如果消費端 程序業務邏輯出現異常消息會消費成功嗎? 是不能消費者成功的 //@RabbitListener 底層 使用Aop進行攔截,如果程序沒有拋出異常,自動提交事務 // 如果Aop使用異常通知攔截 獲取異常信息的話,自動實現補償機制 ,該消息會緩存到rabbitmq服務器端進行存放,一直重試到不拋異常為准。 // 修改重試機制策略 一般默認情況下 間隔5秒重試一次 @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("出現異常"); int i = 1/0; System.out.println("郵箱消費者獲取生產者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消費者獲取生產者消息msg:" + msg); } }
調用第三方接口重試機制分析圖:

重試機制調用第三方接口 @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("郵件消費者獲取生產者消息msg:" + msg + ",消息id:" + messageId); // 重試機制都是間隔性
JSONObject jsonObject = JSONObject.parseObject(msg); String email = jsonObject.getString("email"); String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; System.out.println("郵件消費者開始調用第三方郵件服務器,emailUrl:" + emailUrl); JSONObject result = HttpClientUtils.httpGet(emailUrl); // 如果調用第三方郵件接口無法訪問,如何實現自動重試.
if (result == null) { throw new Exception("調用第三方郵件服務器接口失敗!"); } System.out.println("郵件消費者結束調用第三方郵件服務器成功,result:" + result + "程序執行結束"); // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收
channel.basicAck(deliveryTag, false); } // 默認是自動應答模式
}
12、消費者如果保證消息冪等性,不被重復消費
產生原因:網絡延遲傳輸中,消費出現異常或者是消費延遲消費,會造成MQ進行重試補償,
在重試過程中,可能會造成重復消費。 消費者如何保證消息冪等性,
不被重復消費 解決辦法:
①使用全局MessageID判斷消費方使用同一個,解決冪等性。
②或者使用業務邏輯保證唯一(比如訂單號碼)
基於全局消息id區分消息,解決冪等性(重復消費)
生產者:
@RequestMapping("/send") public String send(){ String msg = "my_fanout_msg:" + System.currentTimeMillis(); //設置全局ID
Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); fanoutProducer.send(message); return null; }
消費者
/** * 郵箱 使用全局ID * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(Message message){ System.out.println("出現異常"); String messageId = message.getMessageProperties().getMessageId(); int i = 1/0; System.out.println("郵箱消費者獲取生產者消息msg:" + messageId); }
yml文件
spring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####開啟消費者重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 3000 server: port: 8081
啟動測試,重試的時候沒有發生變化
13、SpringBoot整合RabbitMQ簽收模式
//郵件隊列
@Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { System.out .println(Thread.currentThread().getName() + ",郵件消費者獲取生產者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手動簽收
channel.basicAck(deliveryTag, false); } }
開啟手動應答
pring: rabbitmq: ####連接地址 host: 127.0.0.1 ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: /admin_host listener: simple: retry: ####開啟消費者異常重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 2000 ####開啟手動ack acknowledge-mode: manual
14屬性設置
1.TTL(Time To Live)
1.1消息的過期時間
/** * 設置過期時間 * @return
*/ @Bean("ttlQueue") public Queue ttlQueue(){ Map<String,Object> map = new HashMap<>(); map.put("x-message-ttl", 11000);//隊列中的消息未被消費11s后過期
return new Queue("ttlQueue",true,false,false,map); }
2)設置單條消息的過期時間
在發送消息的時候指定消息屬性。
MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的過期屬性,單位ms
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message("這條消息4秒后過期".getBytes(), messageProperties); rabbitTemplate.send(RabbitMQConfig.FANOUT_EMAIL_QUEUE,message); // 隨隊列的過期屬性過期,單位ms
rabbitTemplate.convertSendAndReceive(RabbitMQConfig.FANOUT_EMAIL_QUEUE, "消息發送");
2.死信隊列
2.1概述
死信隊列 聽上去像 消息“死”了 其實也有點這個意思,死信隊列 是 當消息在一個隊列 因為下列原因:
消息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=false消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
隊列超載變成了 “死信” 后 被重新投遞(publish)到另一個Exchange 該Exchange 就是DLX 然后該Exchange 根據綁定規則 轉發到對應的 隊列上
監聽該隊列 就可以重新消費,說白了就是 沒有被消費的消息 換個地方重新被消費
生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者
2.2.應用場景
在定義業務隊列的時候,可以考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被發送到該死信隊列上
定義業務(普通)隊列的時候指定參數
x-dead-letter-exchange: 用來設置死信后發送的交換機
x-dead-letter-routing-key:用來設置死信的routingKey
死信隊列配置
生產者
@Component public class FanoutConfig { /** * 定義死信隊列相關信息 */
public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信隊列 交換機標識符 */
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信隊列交換機綁定鍵標識符 */
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 郵件隊列
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信隊列
private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交換機
private String EXCHANGE_NAME = "fanoutExchange"; // 1.定義郵件隊列
@Bean public Queue fanOutEamilQueue() { // 將普通隊列綁定到死信隊列交換機上
Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定義短信隊列
@Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定義交換機
@Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.隊列與交換機綁定郵件隊列
@Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.隊列與交換機綁定短信隊列
@Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 配置死信隊列 * * @return
*/ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }
@RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("郵件消費者獲取生產者消息msg:" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); Integer timestamp = jsonObject.getInteger("timestamp"); try { int result = 1 / timestamp; System.out.println("result:" + result); // 通知mq服務器刪除該消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); // // 丟棄該消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } @Component public class DeadConsumer { @RabbitListener(queues = "dead_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("死信郵件消費者獲取生產者消息msg:" + msg + ",消息id:" + messageId); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
消息留轉圖

六、消息可靠性
在 RabbitMQ 里面提供了很多保證消息可靠投遞的機制,這個也是 RabbitMQ 的一個特性。
在理解消息可靠性投遞的時候,必須明確一個問題,因為效率與可靠性是無法兼得的,如果要保證每一個環節都成功,勢必會對消息的收發效率造成影響。所以如果是一
6.1RabbitMq的工作模型
使用 RabbitMQ 收發消息的時候,有幾個主要環節:
6.2消息發送到 RabbitMQ 服務器
Transaction(事務)模式
public class TransactionProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立連接
Connection conn = factory.newConnection(); // 創建消息通道
Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ"; // 聲明隊列(默認交換機AMQP default,Direct) // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { channel.txSelect(); // 發送消息 // String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // int i =1/0;
channel.txCommit(); System.out.println("消息發送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已經回滾"); } channel.close(); conn.close(); } }
rabbitTemplate.setChannelTransacted(true);
Confirm(確認)模式
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("發送消息失敗:" + cause); throw new RuntimeException("發送異常:" + cause); } } });
6.3消息在隊列中的存儲
@Bean("durableQueue") public Queue queue(){ return new Queue("durableQueue",true,false,false,new HashMap<>()); }
2.持久化交換機
@Bean("fanoutDurableExchange") public FanoutExchange fanoutDurableExchange(){ // exchangeName, durable, exclusive, autoDelete,
return new FanoutExchange("fanoutDurableExchange",true,false,new HashMap<>()); }
3.消息持久化
MessageProperties messageProperties = new MessageProperties(); //消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的過期屬性,單位ms
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 隨隊列的過期屬性過期,單位ms
rabbitTemplate.convertAndSend("faoutEmailQueue","324243232",messageProperties);
6.4消息投遞到消費者
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
public class SecondConsumer { @RabbitHandler public void process(String msgContent, Channel channel, Message message) throws IOException { System.out.println("Second Queue received msg : " + msgContent); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
如果消息無法處理或者消費失敗,也有兩種拒絕的方式,Basic.Reject()拒絕單條,Basic.Nack()批量拒絕。如果 requeue 參數設置為 true,可以把這條消息重新存入隊列,
6.5消費者回調
6.6補償機制
6.7消息冪等性
# 表示消息確認方式,none manual 手動ack 和auto 自動ack ;默認auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto

