MQ全稱為Message Queue,消息隊列是應用程序和應用程序之間的通信方法。
為什么使用MQ
在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。
開發中消息隊列通常有如下應用場景:
1、任務異步處理
將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。
2、應用程序解耦合
MQ相當於一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
MQ是消息通信的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。
AMQP 與 JMS 區別
-
JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式
-
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
-
JMS規定了兩種消息模式;而AMQP的消息模式更加豐富
市場上常見的消息隊列有如下:
目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ
RabbitMQ:
使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)、數據持久化都有很好的支持。多用於進行企業級的ESB整合。
RabbitMQ提供了6種模式:簡單模式,work模式,Publish/Subscribe發布與訂閱模式,Routing路由模式,Topics主題模式,RPC遠程調用模式(遠程調用,不太算MQ)
角色說明:
1、 超級管理員(administrator)
可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
2、 監控者(monitoring)
可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)
3、 策略制定者(policymaker)
可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
4、 普通管理者(management)
僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
5、 其他
無法登陸管理控制台,通常就是普通的生產者和消費者。
設置Virtual Hosts權限:
user:用戶名
configure :一個正則表達式,用戶對符合該正則表達式的所有資源擁有 configure 操作的權限
write:一個正則表達式,用戶對符合該正則表達式的所有資源擁有 write 操作的權限
read:一個正則表達式,用戶對符合該正則表達式的所有資源擁有 read 操作的權限
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
//創建鏈接工廠對象 //設置RabbitMQ服務主機地址,默認localhost //設置RabbitMQ服務端口,默認5672 //設置虛擬主機名字,默認/ //設置用戶連接名,默認guest //設置鏈接密碼,默認guest //創建鏈接 //創建頻道 //聲明隊列 //創建消息 //消息發送 //關閉資源
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //創建鏈接工廠對象 ConnectionFactory connectionFactory = new ConnectionFactory(); //設置RabbitMQ服務主機地址,默認localhost connectionFactory.setHost("192.168.211.132"); //設置RabbitMQ服務端口,默認5672 connectionFactory.setPort(5672); //設置虛擬主機名字(在rabbitmq服務器中,消息隊列是放在虛擬主機中的,這是為了更好分類管理各種消息隊列,一般會加/) //虛擬主機名字得先在服務器中手動添加一個,否則會找不到虛擬主機 connectionFactory.setVirtualHost("/qianyi"); //設置用戶連接名,默認guest(你想用哪個rabbitmq服務器中的賬戶就填哪個的賬號密碼) connectionFactory.setUsername("guest"); //設置鏈接密碼,默認guest connectionFactory.setPassword("guest"); //創建鏈接(在本身和rabbitmq服務器之間建立連接,類似跟redis、mysql之間建立連接) Connection connection = connectionFactory.newConnection(); //創建頻道(在本身和指定的rabbitmq服務器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); //聲明隊列(說明要在rabbitmq服務器中指定的虛擬主機中的哪條消息隊列) /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi1",true,false,false,null); //創建消息 String message = "hello!qianyi!"; //消息發送 /** * 消息發送 * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage(不寫就填空串) * 參數2:路由key,簡單模式可以傳遞隊列名稱 * 參數3:消息其它屬性(沒有填null) * 參數4:消息內容(消息內容是字符串,需要轉換成字節數組才能傳輸) */ channel.basicPublish("", "qianyi1", null, message.getBytes()); //關閉資源(連接和頻道的) channel.close(); connection.close(); }
}
//創建鏈接工廠對象 //設置RabbitMQ服務主機地址,默認localhost //設置RabbitMQ服務端口,默認5672 //設置虛擬主機名字,默認/ //設置用戶連接名,默認guest //設置鏈接密碼,默認guest //創建鏈接 //創建頻道 //創建隊列 //創建消費者,並設置消息處理 //消息監聽 //關閉資源(不建議關閉,建議一直監聽消息)
public static void main(String[] args) throws IOException, TimeoutException { //創建鏈接工廠對象 ConnectionFactory connectionFactory = new ConnectionFactory(); //設置RabbitMQ服務主機地址,默認localhost connectionFactory.setHost("192.168.211.132"); //設置RabbitMQ服務端口,默認5672 connectionFactory.setPort(5672); //設置虛擬主機名字(rabbitmq服務器上創建的虛擬主機) connectionFactory.setVirtualHost("qianyi"); //設置用戶連接名,默認guest(指定用哪個用戶登錄,guest是默認超級管理員) connectionFactory.setUsername("guest"); //設置鏈接密碼,默認guest connectionFactory.setPassword("guest"); //創建鏈接(連接到rabbitmq服務器) Connection connection = connectionFactory.newConnection(); //創建頻道(創建連接rabbitmq服務器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明隊列,指定到哪個隊列獲取消息 /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi1",true,false,false,null); //創建消費者,並設置消息處理(DefaultConsumer:消息消費者,參數傳入創建的頻道)然后再重寫handleDelivery方法,可以用lambab表達式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者標簽,在channel.basicConsume時候可以指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) * @param properties 屬性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機信息 String exchange = envelope.getExchange(); //獲取消息ID long deliveryTag = envelope.getDeliveryTag(); //獲取消息信息 String message = new String(body, "UTF-8"); //輸出獲得的消息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息監聽 /** * 消息監聽 * 要監聽哪個隊列?當消費者收到消息之后是否自動告訴rebbitmq服務器已經收到?收到消息之后,如何處理呢? * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調(傳入上面創建的消費者對象,這個消費者對象中對做了對收到的消息處理) */ channel.basicConsume("qianyi1",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽消息) //channel.close(); //connection.close(); }
}
P:生產者,也就是要發送消息的程序
C:消費者:消息的接受者,會一直等待消息到來。
queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
在rabbitMQ中消費者是一定要到某個消息隊列中去獲取消息的
模式說明
Work Queues
與入門程序的簡單模式
相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。
應用場景
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重復代碼截取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //創建頻道(在本身和指定的rabbitmq服務器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); //聲明隊列(說明要在rabbitmq服務器中指定的虛擬主機中的哪條消息隊列) /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi2",true,false,false,null); //創建消息(因為是發給多個消費者,所以進行for循環) for (int i = 0; i <= 3; i++) { String message = "hello!qianyi!"+i; //消息發送 /** * 消息發送 * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage(不寫就填空串) * 參數2:路由key,簡單模式可以傳遞隊列名稱 * 參數3:消息其它屬性(沒有填null) * 參數4:消息內容(消息內容是字符串,需要轉換成字節數組才能傳輸) */ channel.basicPublish("", "qianyi2", null, message.getBytes()); } //關閉資源(連接和頻道的) channel.close(); connection.close(); } }
com.xxx.work.ConsumeOne
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //創建頻道(創建連接rabbitmq服務器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明隊列,指定到哪個隊列獲取消息 /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi2",true,false,false,null); //創建消費者,並設置消息處理(DefaultConsumer:消息消費者,參數傳入創建的頻道)然后再重寫handleDelivery方法,可以用lambab表達式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者標簽,在channel.basicConsume時候可以指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) * @param properties 屬性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機信息 String exchange = envelope.getExchange(); //獲取消息ID long deliveryTag = envelope.getDeliveryTag(); //獲取消息信息 String message = new String(body, "UTF-8"); //輸出獲得的消息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息監聽 /** * 消息監聽 * 要監聽哪個隊列?當消費者收到消息之后是否自動告訴rebbitmq服務器已經收到?收到消息之后,如何處理呢? * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調(傳入上面創建的消費者對象,這個消費者對象中對做了對收到的消息處理) */ channel.basicConsume("qianyi2",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽消息) //channel.close(); //connection.close(); } }
P:生產者,也就是要發送消息的程序
C:消費者:消息的接受者,會一直等待消息到來。
Queue:消息隊列,圖中紅色部分
而在訂閱模型中,多了一個exchange(交換機)角色,而且過程略有變化:
P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
C:消費者,消息的接受者,會一直等待消息到來。
Queue:消息隊列,接收消息、緩存消息。
Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決於Exchange的類型。Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
1.每個消費者監聽自己的隊列。
2.生產者將消息發給broker(代理人),由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息
(1)生產者
生產者需要注意如下3點:
1.聲明交換機
2.聲明隊列
3.隊列需要綁定指定的交換機
生產者:申明一個交換機,然后綁定這個交換機所有的(根據需求)隊列,發送消息即可
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重復代碼截取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //創建頻道(在本身和指定的rabbitmq服務器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); /** * 聲明交換機 * 參數1:交換機名稱 * 參數2:交換機類型,fanout、topic、direct、headers(以下用fanout類型,廣播模式,每個與交換機綁定的隊列都會接收到信息) */ channel.exchangeDeclare("QY", BuiltinExchangeType.FANOUT); //聲明隊列(說明要在rabbitmq服務器中指定的虛擬主機中的哪條消息隊列) /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi3",true,false,false,null); channel.queueDeclare("qianyi4",true,false,false,null); //隊列綁定交換機 //參數1:需要綁定的隊列 //參數2:需要綁定的交換機 channel.queueBind("qianyi3","QY",""); channel.queueBind("qianyi4","QY",""); //創建消息(中文會亂碼) String message = "發布訂閱模式:歡迎光臨紅浪漫!"; //消息發送 /** * 消息發送 * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage(不寫就填空串) * 參數2:路由key,簡單模式可以傳遞隊列名稱,發布訂閱模式不傳遞隊列名稱 * 參數3:消息其它屬性(沒有填null) * 參數4:消息內容(消息內容是字符串,需要轉換成字節數組才能傳輸) */ channel.basicPublish("QY", "", null, message.getBytes()); //關閉資源(連接和頻道的) channel.close(); connection.close(); } }
消費者:消費者可以是多個,只要監聽的隊列跟交換機綁定了,那么生產者發送的內容這個消費者都能收到
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //創建頻道(創建連接rabbitmq服務器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明隊列,指定到哪個隊列獲取消息 /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi3",true,false,false,null); //創建消費者,並設置消息處理(DefaultConsumer:消息消費者,參數傳入創建的頻道)然后再重寫handleDelivery方法,可以用lambab表達式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者標簽,在channel.basicConsume時候可以指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) * @param properties 屬性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機信息 String exchange = envelope.getExchange(); //獲取消息ID long deliveryTag = envelope.getDeliveryTag(); //獲取消息信息 String message = new String(body, "UTF-8"); //輸出獲得的消息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息監聽 /** * 消息監聽 * 要監聽哪個隊列?當消費者收到消息之后是否自動告訴rebbitmq服務器已經收到?收到消息之后,如何處理呢? * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調(傳入上面創建的消費者對象,這個消費者對象中對做了對收到的消息處理) */ channel.basicConsume("qianyi3",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽消息) //channel.close(); //connection.close(); } }
總結:生產者發送的消息先發給申明的交換機,交換機又綁定了一個或者多個隊列,那么在這個模式下,消費者只需要監視跟交換機綁定的隊列,就可以獲取到生產者發送的消息。
如果兩個或者多個消費者監視同一個隊列,那么又會出現這種情況:即生產者發送一條消息,能同時接收到信息的只有一個消費者,無法做到上面的效果,多個消費者同時收到消息
所以如果需要同時接收消息的話,必須一個消費者監聽一條隊列,而該隊列必須跟交換機有綁定的關系
發布訂閱模式與work隊列模式的區別:
1、work隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。
2、發布/訂閱模式的生產方是面向交換機發送消息,work隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。
3、發布/訂閱模式需要設置隊列和交換機的綁定,work隊列模式不需要設置,實際上work隊列模式會將隊列綁 定到默認的交換機
路由模式特點:
1.隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
2.消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey。
3.Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息
P:生產者,向Exchange發送消息,交換機綁定隊列的時候,會指定一個routing key,給交換機發送消息的時候,也要帶着指定的routing key,並且有幾個routing key就發送幾次(一次只能指定一個routing key)
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
創建消息生產者,代碼如下:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重復代碼截取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //創建頻道(在本身和指定的rabbitmq服務器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); /** * 聲明交換機 * 參數1:交換機名稱 * 參數2:交換機類型,fanout、topic、direct、headers(以下用DIRECT類型,路由模式,交換機發送的消息會根routing key發送給匹配的隊列) */ channel.exchangeDeclare("QY1", BuiltinExchangeType.DIRECT); //聲明隊列(說明要在rabbitmq服務器中指定的虛擬主機中的哪條消息隊列) /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi5",true,false,false,null); channel.queueDeclare("qianyi6",true,false,false,null); //隊列綁定交換機 //參數1:需要綁定的隊列 //參數2:需要綁定的交換機 //參數3:需要綁定的routing key(路由key)在交換機給隊列發送消息的時候,會根據它發送 channel.queueBind("qianyi5","QY1","rouingkey1"); channel.queueBind("qianyi6","QY1","rouingkey2"); //創建消息(中文會亂碼) String message1 = "發布訂閱模式:歡迎光臨紅浪漫!111"; String message2 = "發布訂閱模式:歡迎光臨紅浪漫!222"; //消息發送 /** * 消息發送 * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage(不寫就填空串) * 參數2:路由key,簡單模式可以傳遞隊列名稱,廣播模式不傳遞隊列名稱 * 參數3:消息其它屬性(沒有填null) * 參數4:消息內容(消息內容是字符串,需要轉換成字節數組才能傳輸) */ channel.basicPublish("QY1", "rouingkey1", null, message1.getBytes()); channel.basicPublish("QY1", "rouingkey2", null, message2.getBytes()); //關閉資源(連接和頻道的) channel.close(); connection.close(); } }
消費者:
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //創建頻道(創建連接rabbitmq服務器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明隊列,指定到哪個隊列獲取消息 /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi6",true,false,false,null); //創建消費者,並設置消息處理(DefaultConsumer:消息消費者,參數傳入創建的頻道)然后再重寫handleDelivery方法,可以用lambab表達式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者標簽,在channel.basicConsume時候可以指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) * @param properties 屬性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機信息 String exchange = envelope.getExchange(); //獲取消息ID long deliveryTag = envelope.getDeliveryTag(); //獲取消息信息 String message = new String(body, "UTF-8"); //輸出獲得的消息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息監聽 /** * 消息監聽 * 要監聽哪個隊列?當消費者收到消息之后是否自動告訴rebbitmq服務器已經收到?收到消息之后,如何處理呢? * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調(傳入上面創建的消費者對象,這個消費者對象中對做了對收到的消息處理) */ channel.basicConsume("qianyi6",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽消息) //channel.close(); //connection.close(); } }
總結:當在生產者中綁定交換機的時候,每綁定一個隊列,都會給該隊列指定一個routing key,然后在生產者向交換機發送消息的時候,指定某個已經綁定的routing key,就會將該條消息發送到對應的隊列,比如A隊列綁定了routingkey1,生產者發送消息給隊列發送消息的時候,就會去隊列中找routingkey1的隊列,發送過去,消費者只需要根據監聽的隊列就可以獲得該條消息。
可以做到這樣:申明兩個隊列,分別指定routingkey,發送消息的時候也發送兩條,一條消息指定其中一個routingkey,也就將那條消息發送給了一個隊列,另一條消息指定另一個routingkey,消息也就發送到了另一個隊列,兩個或者多個消費者只需要綁定不同的隊列就可以獲得兩個不同的消息。
Routingkey
一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: qianyi.insert
通配符規則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
圖解:
-
紅色Queue:綁定的是
usa.#
,因此凡是以usa.
開頭的routing key
都會被匹配到 -
黃色Queue:綁定的是
#.news
,因此凡是以.news
結尾的routing key
都會被匹配
使用topic類型的Exchange,發送消息的routing key有3種: item.insert
、item.update
、item.delete
:
創建TopicProducer實現消息生產,代碼如下:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //QueueUtil是工具類,將之前的一直重復代碼截取到這個工具類中 Connection connection = QueueUtil.queueUtil(); //創建頻道(在本身和指定的rabbitmq服務器中的指定虛擬主機之間建立穩定、快速的通道) Channel channel = connection.createChannel(); /** * 聲明交換機 * 參數1:交換機名稱 * 參數2:交換機類型,fanout、topic、direct、headers(以下用DIRECT類型,路由模式,交換機發送的消息會根routing key發送給匹配的隊列) */ channel.exchangeDeclare("QY2", BuiltinExchangeType.TOPIC); //聲明隊列(說明要在rabbitmq服務器中指定的虛擬主機中的哪條消息隊列) /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi7",true,false,false,null); channel.queueDeclare("qianyi8",true,false,false,null); channel.queueDeclare("qianyi9",true,false,false,null); //隊列綁定交換機 //參數1:需要綁定的隊列 //參數2:需要綁定的交換機 //參數3:需要綁定的routing key(路由key)在交換機給隊列發送消息的時候,會根據它發送 // (*表示后面一個單詞無論是什么,只要交換機發送消息的routingkey的值是item開頭的,它都能接收到) //下面操作給qianyi7隊列兩個routing key,只要發送的消息指定其中任何一個routingkey,qianyi7都會接收到消息 //給qianyi8隊列用了通配符,只要發送的消息指定的routing key是以item開頭的,它都能收到 channel.queueBind("qianyi7","QY2","item.inset"); channel.queueBind("qianyi7","QY2","item.update"); channel.queueBind("qianyi8","QY2","item.*"); //創建消息(中文會亂碼) String message1 = "發布訂閱模式:歡迎光臨紅浪漫!111"; String message2 = "發布訂閱模式:歡迎光臨紅浪漫!222"; String message3 = "發布訂閱模式:歡迎光臨紅浪漫!333"; //消息發送 /** * 消息發送 * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage(不寫就填空串) * 參數2:路由key,簡單模式可以傳遞隊列名稱,廣播模式不傳遞隊列名稱 * 參數3:消息其它屬性(沒有填null) * 參數4:消息內容(消息內容是字符串,需要轉換成字節數組才能傳輸) */ channel.basicPublish("QY2", "item.inset", null, message1.getBytes()); channel.basicPublish("QY2", "item.update", null, message2.getBytes()); //這里routing key值寫item.aaa為了驗證只要是以item開頭的隊列,都可以接收到這條消息 channel.basicPublish("QY2", "item.aaa", null, message3.getBytes()); //關閉資源(連接和頻道的) channel.close(); connection.close(); } }
消費者1:
public class ConsumeOne { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //創建頻道(創建連接rabbitmq服務器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明隊列,指定到哪個隊列獲取消息 /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi7",true,false,false,null); //創建消費者,並設置消息處理(DefaultConsumer:消息消費者,參數傳入創建的頻道)然后再重寫handleDelivery方法,可以用lambab表達式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者標簽,在channel.basicConsume時候可以指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) * @param properties 屬性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機信息 String exchange = envelope.getExchange(); //獲取消息ID long deliveryTag = envelope.getDeliveryTag(); //獲取消息信息 String message = new String(body, "UTF-8"); //輸出獲得的消息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息監聽 /** * 消息監聽 * 要監聽哪個隊列?當消費者收到消息之后是否自動告訴rebbitmq服務器已經收到?收到消息之后,如何處理呢? * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調(傳入上面創建的消費者對象,這個消費者對象中對做了對收到的消息處理) */ channel.basicConsume("qianyi7",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽消息) //channel.close(); //connection.close(); } }
消費者2:
public class ConsumeTwo { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = QueueUtil.queueUtil(); //創建頻道(創建連接rabbitmq服務器之間的穩定、高效的頻道,持久通訊) Channel channel = connection.createChannel(); //申明隊列,指定到哪個隊列獲取消息 /** * 聲明隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接(其它連接是否能連接到本條隊列) * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 * **/ channel.queueDeclare("qianyi8",true,false,false,null); //創建消費者,並設置消息處理(DefaultConsumer:消息消費者,參數傳入創建的頻道)然后再重寫handleDelivery方法,可以用lambab表達式 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者標簽,在channel.basicConsume時候可以指定 * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送) * @param properties 屬性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //獲取交換機信息 String exchange = envelope.getExchange(); //獲取消息ID long deliveryTag = envelope.getDeliveryTag(); //獲取消息信息 String message = new String(body, "UTF-8"); //輸出獲得的消息內容 System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; //消息監聽 /** * 消息監聽 * 要監聽哪個隊列?當消費者收到消息之后是否自動告訴rebbitmq服務器已經收到?收到消息之后,如何處理呢? * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調(傳入上面創建的消費者對象,這個消費者對象中對做了對收到的消息處理) */ channel.basicConsume("qianyi8",true,defaultConsumer); //關閉資源(不建議關閉,建議一直監聽消息) //channel.close(); //connection.close(); } }
總結:以上代碼驗證了:1、可以給一個隊列指定多個routing key,只要消息發送給多個routing key中的任何一個,該隊列都會收到消息。
2、可以給routing key用通配符(*或者#)使用item.*,那么只要發送消息的時候,指定的routing key是以item開頭都可以被該隊列收到。
所以,以上代碼,qianyi7隊列收到了兩條消息,一條是item.inset路由key接收的,一條是item.update路由key接收的,而qiani8則收到了3條信息,因為每條信息的路由key都符合item.*的規則。
模式總結
RabbitMQ工作模式: 1、簡單模式 HelloWorld 一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)
2、工作隊列模式 Work Queue 一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)
3、發布訂閱模式 Publish/subscribe 需要設置類型為fanout的交換機,並且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列
5、通配符模式 Topic 需要設置類型為topic的交換機,交換機和隊列進行綁定,並且指定通配符方式的routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列
在Spring項目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp
尤其是在spring boot項目中只需要引入對應的amqp啟動器依賴即可,方便的使用RabbitTemplate發送消息,使用注解接收消息。
一般在開發過程中:
生產者工程:
-
application.yml文件配置RabbitMQ相關信息;
-
在生產者工程中編寫配置類,用於創建交換機和隊列,並進行綁定
-
注入RabbitTemplate對象,通過RabbitTemplate對象發送消息到交換機
消費者工程:
-
application.yml文件配置RabbitMQ相關信息
-
創建消息處理類,用於接收隊列中的消息並進行處理
創建生產者工程,添加依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.xxx</groupId> <artifactId>springboot-rabbitmq-producer</artifactId> <version>1.0-SNAPSHOT</version> <!--依賴--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
(1)application.yml配置文件
創建application.yml,內容如下:
spring: rabbitmq: host: localhost port: 5672 virtual-host: /aaa #虛擬主機名 username: guest password: guest
綁定交換機和隊列
創建RabbitMQ隊列與交換機綁定的配置類RabbitMQConfig,代碼如下:
@Configuration public class RabbitMQConfig { /*** * 聲明交換機 */ @Bean(name = "itemTopicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange("item_topic_exchange").durable(true).build(); } /*** * 聲明隊列 */ @Bean(name = "itemQueue") public Queue itemQueue(){ return QueueBuilder.durable("item_queue").build(); } /*** * 隊列綁定到交換機上 */ @Bean public Binding itemQueueExchange(@Qualifier("itemQueue")Queue queue, @Qualifier("itemTopicExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } }
搭建消費者工程
5.3.1. 創建工程
創建消費者工程,添加依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!--父工程--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.xxx</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <!--依賴--> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
啟動類:略
配置文件與上相同,略
編寫消息監聽器com.itheima.listener.MessageListener,代碼如下:
@Component public class MessageListener { /** * 監聽某個隊列的消息 * @param message 接收到的消息 */ @RabbitListener(queues = "item_queue") public void myListener1(String message){ System.out.println("消費者接收到的消息為:" + message); } }
+ confirm模式
生產者發送消息到交換機的時機
+ return模式
交換機轉發消息給queue的時機
1.生產者發送消息到交換機
2.交換機根據routingkey 轉發消息給隊列
3.消費者監控隊列,獲取隊列中信息
4.消費成功刪除隊列中的消息
實現:
先創建工程,添加依賴:(web依賴用於測試,test依賴無所謂,以及amqp的依賴,amqp是一種協議,里面集成了rabbitMQ的相關需要依賴)
<parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.3.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
2、創建啟動類,在啟動類里面創建隊列、交換機、綁定的對象,並指定各自的名字(對象名隨意,可指定可不指定,下面代碼指定了,在bean注解后面)
@SpringBootApplication public class RabbitMQDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitMQDemo01Application.class,args); } //創建隊列 @Bean(name = "queue_demo01") public Queue createQueue(){ //Q_demo01指定的是隊列名 return new Queue("Q_demo01"); } //創建交換機,直接使用directExchange(路由模式)它的父類也實現了Exchange接口 @Bean(name = "exchange_demo01") public DirectExchange createDirectExchange(){ //E_demo01指定的是交換機名 return new DirectExchange("E_demo01"); } //創建綁定對象,將交換機和隊列綁定 @Bean public Binding createBinDing(){ //將上面創建的隊列和交換機進行綁定,然后設置這個隊列接收哪些匹配的路由key:demo01 return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("demo01.01"); } }
3、創建配置文件,主要配置該微服務的端口號、rabbitMQ服務器的IP地址和端口號,以及rabbitMQ的用戶名和密碼、是否啟動confirm模式
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默認關閉 server: port: 8881
4、創建controller模擬接收到前端信息后,給rabbitMQ服務器發送消息,這個消息最終需要另一個微服務(消費者)接收,因為有confirm模式,所以在發消息之前需要先設置回調函數,當rabbitMQ中的交換機收到消息就會調用的函數(方法)
@RestController @RequestMapping("/demo01") public class Demo01Controller { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallback confirmCallback; @RequestMapping("/test01") public String demo01(){ System.out.println("接收請求"); System.out.println("處理請求中……"); //設置回調函數(當發送消息后,接收方會返回發送發調用方法,這個方法調用就能知道消息發送結果) rabbitTemplate.setConfirmCallback(confirmCallback); //發送消息 rabbitTemplate.convertAndSend("E_demo01","demo01.01","demo01……"); return "ok"; } }
5、編寫這個回調函數的具體方法,這個回調函數實現rabbitTemplate.confarmCallback接口,重寫方法即可,具體實現看注釋(我這里因為沒有下載源碼,所以自動生成的代碼,參數就變成了b、s這些不好讀,下載源碼即可)
//需要交給spring核心容器管理 //回調函數要實現rabbitTemplate中的confirmCallback接口 @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { /** * * @param correlationData 消息信息 * @param b 確認標識:true,MQ服務器exchange表示已經確認收到消息 false 表示沒有收到消息 * @param s 如果沒有收到消息,則指定為MQ服務器exchange消息沒有收到的原因,如果已經收到則指定為null */ if (b){ System.out.println("消息收到,內容為:"+correlationData); }else { System.out.println("消息未收到,原因為:"+s); } } }
然后就可以發送請求到編寫的controller類了,controller類接收到請求后會發消息到交換機上,交換機如果接收到消息,就會調用回調函數,如果發送消息時,故意填寫一個錯誤的交換機,並且這個錯誤的交換機是不存在的話,那么當消息一發送,沒有找到對應的交換機,在調用回調函數的時候就會進入沒有接收消息的判斷中,這樣就可以確定消息到底有沒有發送成功。
returncallback代碼實現
如上,已經實現了消息發送到交換機上的內容,但是如果是,交換機發送成功,但是在路由轉發到隊列的時候,發送錯誤,此時就需要用到returncallback模式了。接下來我們實現下。
實現步驟如下:
1.開啟returncallback模式
2.設置回調函數
3.發送消息
配置yml開啟returncallback:在配置文件中開啟:
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默認關閉 publisher-returns: true #配置returns模式,默認關閉 server: port: 8881
編寫returns回調函數:這個回調函數跟confirm的回調函數一脈相承,都差不多
@Component public class ReturnsCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { /** * * @param message 消息信息,因為message傳遞過來是字節,所以需要轉換成字符串 * @param i 退回的狀態碼 * @param s 退回的信息 * @param s1 交換機 * @param s2 路由key */ System.out.println("退回的消息是:"+new String(message.getBody())); System.out.println("退回的狀態碼是:"+i); System.out.println("退回的信息是:"+s); System.out.println("退回的交換機是:"+s1); System.out.println("退回的路由key是:"+s2); } }
還需要在controller里面加一行代碼,就是發送消息之前設置returns的回調函數:
@RestController @RequestMapping("/demo01") public class Demo01Controller { @Autowired private RabbitTemplate rabbitTemplate; //需要注入剛剛創建的confirm回調函數 @Autowired private ConfirmCallback confirmCallback; //需要注入剛剛創建的returns回調函數 @Autowired private ReturnsCallback returnsCallback; @RequestMapping("/test01") public String demo01(){ System.out.println("接收請求"); System.out.println("處理請求中……"); //設置confirm回調函數(當發送消息后,接收方會返回發送發調用方法,這個方法調用就能知道消息發送結果) rabbitTemplate.setConfirmCallback(confirmCallback); //設置returns的回調函數(當交換機收到消息發送給隊列后,,隊列就會調用這個回調函) rabbitTemplate.setReturnCallback(returnsCallback); //發送消息 rabbitTemplate.convertAndSend("E_demo01","demo01.01","demo01……"); return "ok"; } }
總結起來也就是新增加了三個步驟,第一個在配置文件中開啟returnsCallback,第二個寫一個returnsCallback的回調函數,第三個在發送消息之前指定好回調函數,這樣就完成了從生產者發消息到交換機,交換機會調用回調函數,從交換機發消息到隊列,隊列會調用回調函數,確保了這三個點發送消息不會有問題,唯一的小區別就是,交換機接收到消息,判定是否接收到的條件可以是錯誤內容,如果錯誤內容為null則說明交換機接收到信息且沒有異常,隊列是否接收到消息的判斷條件可以是狀態碼,如下圖,當發送消息時指定不存在的routting key(路由key),那么打印的消息就會如下:
+ returncallback模式,需要手動設置開啟
+ 該模式 指定 在路由的時候發送錯誤的時候調用回調函數,不影響消息發送到交換機
兩種模式的總結
但是一般情況下我們使用confirm即可,因為路由key 由開發人員指定,一般不會出現錯誤,並且從交換機到隊列,都是在rabbitMQ服務器中進行的,除非服務器掛掉,否則不會出問題。如果要保證消息在交換機和routingkey的時候那么需要結合兩者的方式來進行設置。
上邊我們學習了發送方的可靠性投遞,但是在消費方也有可能出現問題,比如沒有接受消息,比如接受到消息之后,在代碼執行過程中出現了異常,這種情況下我們需要額外的處理,那么就需要手動進行確認簽收消息。rabbtimq給我們提供了一個機制:ACK機制。
ACK機制:有三種方式
-
-
手動確認 acknowledge="manual"
-
根據異常情況來確認 acknowledge="auto"
其中自動確認是指:
當消息一旦被Consumer接收到,則自動確認收到,並將相應 message 從 RabbitMQ 的消息緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。
其中手動確認方式是指:
則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用channel.basicNack()等方法,讓其按照業務功能進行處理,比如:重新發送,比如拒絕簽收進入死信隊列等等。
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默認關閉 publisher-returns: true #配置returns模式,默認關閉 listener: simple: acknowledge-mode: manual #設置監聽端消息ACK確認模式為手動模式,默認自動確認接收消息,無論是否出異常 server: port: 8881
2.創建消息監聽器監聽消息:監聽隊列,接收消息,然后用cry/catch來判定是否接收消息,如果沒有異常,則接收消息,並打印,如果有異常,則可以選擇將消息返回給隊列或者丟棄消息
@Component //指定需要監聽的隊列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 頻道對象 他提供了ack/nack方法(簽收和拒絕簽收) * Message 消息本生的封裝的對象 * String msg 消息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收消息 System.out.println("消費者接收到的消息:"+msg); try { //處理本地業務 System.out.println("處理本地業務開始======start======"); Thread.sleep(2000); //模擬接收消息出錯 //int i = 1 / 0; //簽收消息 // 參數1 指定的是消息的序號(快遞號) // 參數2 指定是否需要批量的簽收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出現異常,則拒絕消息 可以重回隊列 也可以丟棄 可以根據業務場景來 //方式一:可以批量處理y用:basicNack,傳三個參數 //參數3 標識是否重回隊列 true 是重回 false 就是不重回:丟棄消息,如果重回隊列的話,異常沒有解決,就會進入死循環 //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //方式二:不批量處理:basicReject,傳兩個參數,第二個參數是否批量 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
按以上代碼,如果消費端沒有出現異常,則會正常接收消息,如果出現了異常,說明這個消費端的實施業務邏輯失敗,則必須告訴交換機,任務失敗,交易取消,可以選擇將消息返回給交換機,或者丟棄這個消息,返回給交換機,那么消息還會存放在交換機,但是交換機又會重新將返回的消息發送給消費端,消費的又出現異常,再返回給交換機,形成死循環。
以下為各種情況代碼結果演示:
1、配置文件開啟ACK手動確認模式,但是在消費端沒有寫代碼確認接收,也沒有拒絕接收,消費端代碼如下:
@Component @RabbitListener(queues = "queue_demo01") public class MyRabbitListener { /*@RabbitHandler public void msg(String message) { System.out.println("消費Duang接收消息:" + message); }*/ @RabbitHandler public void msg(Message message, Channel channel ,String msg) { System.out.println("消費Duang接收消息:" + msg); } }
那么執行的結果就會是這樣:
說明一直沒有被簽收,消息一直會在rabbitMQ服務器
2、配置文件開啟ACK手動確認模式,消費端出現異常,消息接收被拒絕后執行丟棄消息操作,消費端代碼如下:
@Component //指定需要監聽的隊列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 頻道對象 他提供了ack/nack方法(簽收和拒絕簽收) * Message 消息本生的封裝的對象 * String msg 消息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收消息 System.out.println("消費者接收到的消息:"+msg); try { //處理本地業務 System.out.println("處理本地業務開始======start======"); Thread.sleep(2000); //模擬接收消息出錯 int i = 1 / 0; //簽收消息 // 參數1 指定的是消息的序號(快遞號) // 參數2 指定是否需要批量的簽收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出現異常,則拒絕消息 可以重回隊列 也可以丟棄 可以根據業務場景來 //方式一:可以批量處理y用:basicNack,傳三個參數 //參數3 標識是否重回隊列 true 是重回 false 就是不重回:丟棄消息,如果重回隊列的話,異常沒有解決,就會進入死循環 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //方式二:不批量處理:basicReject,傳兩個參數,第二個參數是否批量 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
消息丟棄后,則不再出現:
3、配置文件開啟ACK手動確認模式,消費端出現異常,拒絕接收消息,然后將消息返回給隊列,代碼如下(第三個參數設置為重回隊列進行再次投遞):
@Component //指定需要監聽的隊列 @RabbitListener(queues = "Q_demo01") public class ListenerRabbitMQ { /** * channel 頻道對象 他提供了ack/nack方法(簽收和拒絕簽收) * Message 消息本生的封裝的對象 * String msg 消息的本身() */ @RabbitHandler public void msg(Message message, Channel channel, String msg){ //接收消息 System.out.println("消費者接收到的消息:"+msg); try { //處理本地業務 System.out.println("處理本地業務開始======start======"); Thread.sleep(2000); //模擬接收消息出錯 int i = 1 / 0; //簽收消息 // 參數1 指定的是消息的序號(快遞號) // 參數2 指定是否需要批量的簽收 如果是true,那就批量 如果是false 那就不批量 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //如果出現異常,則拒絕消息 可以重回隊列 也可以丟棄 可以根據業務場景來 //方式一:可以批量處理y用:basicNack,傳三個參數 //參數3 標識是否重回隊列 true 是重回 false 就是不重回:丟棄消息,如果重回隊列的話,異常沒有解決,就會進入死循環 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //方式二:不批量處理:basicReject,傳兩個參數,第二個參數是否批量 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException ioException) { ioException.printStackTrace(); } } } }
消息返回隊列后,會再次給消費端投遞該消息,異常不消失,死循環不停。
總結:
第一種:簽收
channel.basicAck()
第二種:拒絕簽收 批量處理
channel.basicNack()
第三種:拒絕簽收 不批量處理
channel.basicReject()
-
-
如果在消費端沒有出現異常,則調用channel.basicAck(deliveryTag,false);方法確認簽收消息
-
如何保證消息的高可靠性傳輸?
1.持久化(如果使用spring boot,則持久化的默認設置就是true,不需要額外進行設置)
• exchange要持久化
• queue要持久化
• message要持久化
2.生產方確認Confirm、Return
3.消費方確認Ack
如果並發量大的情況下,生產方不停的發送消息,可能處理不了那么多消息,此時消息在隊列中堆積很多,當消費端啟動,瞬間就會涌入很多消息,消費端有可能瞬間垮掉,這時我們可以在消費端進行限流操作,每秒鍾放行多少個消息。這樣就可以進行並發量的控制,減輕系統的負載,提供系統的可用性,這種效果往往可以在秒殺和搶購中進行使用。在rabbitmq中也有限流的一些配置。
spring: rabbitmq: host: 192.168.211.132 port: 5672 username: guest password: guest publisher-confirms: true #配置confirms模式,默認關閉 publisher-returns: true #配置returns模式,默認關閉 listener: simple: acknowledge-mode: manual #設置監聽端消息ACK確認模式為手動模式 prefetch: 1 #設置每一個消費端,可以同時處理的未確認的消息最大數量 server: port: 8881
這個限流默認是250個。
TTL
RabbitMQ設置過期時間有兩種:
-
針對某一個隊列設置過期時間 ;隊列中的所有消息在過期時間到之后,如果沒有被消費則被全部清除
-
針對某一個特定的消息設置過期時間;隊列中的消息設置過期時間之后,如果這個消息沒有被消息則被清除。
需要注意一點的是:
針對某一個特定的消息設置過期時間時,一定是消息在隊列中在隊頭的時候進行計算,如果某一個消息A 設置過期時間5秒,消息B在隊頭,消息B沒有設置過期時間,B此時過了已經5秒鍾了還沒被消費。注意,此時A消息並不會被刪除,因為它並沒有再隊頭。
一般在工作當中,單獨使用TTL的情況較少。后面會講到延時隊列。在這里有用處。
設置過期隊列,只需要在創建隊列的時候指定一下就可以了:
@SpringBootApplication public class RabbitMQDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitMQDemo01Application.class,args); } /*創建定時過期隊列,使用構建者模式,durable("Q_demo01"):設置隊列名 withArgument("x-message",100)第一個參數后面講,第二個是過期時間,單位毫秒*/ @Bean(name = "queue_demo01") public Queue createQueue(){ //Q_demo01指定的是隊列名 return QueueBuilder.durable("Q_demo01").withArgument("x-message",100).build(); } //創建交換機,直接使用directExchange(路由模式)它的父類也實現了Exchange接口 @Bean(name = "exchange_demo01") public DirectExchange createDirectExchange(){ //E_demo01指定的是交換機名 return new DirectExchange("E_demo01"); } //創建綁定對象,將交換機和隊列綁定 @Bean public Binding createBinDing(){ //將上面創建的隊列和交換機進行綁定,然后設置這個隊列接收哪些匹配的路由key:demo01 return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("demo01.01"); } }
死信隊列的介紹
如下圖的過程:
成為死信的三種條件:
-
隊列中消息的長度(數量)到達限制;
-
消費者拒接消費消息,basicNack/basicReject,並且不把消息重新放入原目標隊列,requeue=false;(丟棄)
-
原隊列存在消息過期設置,消息到達超時時間未被消費;(ddl設置的過期的時間到了)
死信的處理過程
DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列。
可以監聽這個隊列中的消息做相應的處理。(例如客戶下訂單,進入支付頁面,這個時候商品庫存已經在數據庫中進行減數操作,如果客戶突然不執行支付操作,那么就可以設置定時消息,如果超過時間,客戶沒有進行支付,則將這個死信消息放入死信交換機,發送給與私信交換機綁定的隊列中,用另一個消費端接收這個死信消息,這個消費端就是執行將庫存數量重新加回來的操作)
死信隊列的設置
剛才說到死信隊列也是一個正常的exchange.只需要設置一些參數即可。
給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key。
如上圖所示
1.創建queue1 正常隊列 用於接收死信隊列過期之后轉發過來的消息
2.創建queue2 可以針對他進行參數設置 死信隊列
3.創建交換機 死信交換機
4.綁定正常隊列到交換機