原文:https://blog.csdn.net/linpeng_1/article/details/80505828
AmqpTemplate,RabbitTemplate
Spring AMQP提供了一個發送和接收消息的操作模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一個實現。
RabbitTemplate支持消息的確認與返回,為了返回消息,RabbitTemplate 需要設置mandatory 屬性為true,並且CachingConnectionFactory 的publisherReturns屬性也需要設置為true。返回的消息會根據它注冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,
一個RabbitTemplate僅能支持一個ReturnCallback 。
為了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也需要設置為true,確認的消息會根據它注冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
SpringBoot集成RabbitMQ
server.port=8083
#服務器配置
spring.application.name=rabbitmq-hello-sending
#rabbitmq連接參數
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3 #集群或單機 都可配置
spring.rabbitmq.username=linpeng
spring.rabbitmq.password=123456
# rabbitmq服務器的虛擬主機名,可以在后台管理系統上查看和新建
spring.rabbitmq.virtual-host=/test
# 連接超時
spring.rabbitmq.connection-timeout=5s
# 發送方
# 開啟發送確認(未到達MQ服務器)
spring.rabbitmq.publisher-confirms=true
# 開啟發送失敗退回(未找到對應queue)
spring.rabbitmq.publisher-returns=true
# 消費方 開啟手動ACK(坑:當序列化為JSON時,此配置會失效,見下文)
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 消費方
spring.rabbitmq.listener.concurrency=2 //最小消息監聽線程數
spring.rabbitmq.listener.max-concurrency=2 //最大消息監聽線程數
#消費者每次從隊列獲取的消息數量 (默認一次250個)
#通過查看后台管理器中queue的unacked數量
spring.rabbitmq.listener.simple.prefetch= 5
#消費者自動啟動
spring.rabbitmq.listener.simple.auto-startup=true
#消費失敗,自動重新入隊
spring.rabbitmq.listener.simple.default-requeue-rejected= true
#啟用發送重試
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0
RabbitTemplate
默認一個RabbitTemplate在RabbitMQ中相當於一個connection,每發送一次消息相當於channel,MQ接收消息后釋放channel。每個connection最多支持2048個channel,加入從一個connection同時超過2048個線程並發發送,channel超過2048,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
測試啟動publisher-confirms后,400個線程通過一個RabbitTemplate並發發送10000消息,同時就可能產生1000左右的channel。因為channel等在confirm。10000消息全部發送在幾秒內完成,10000消息全部confirm回調完成用時22秒。
后台管理頁面查看connection+channel
此connection中有10個線程並發發送消息,監控到10個channel生成,MQ完成接收后釋放channel。如果是publisher-confirms模式,channel會保持到confirm回調完成再釋放,影響並發性能。每個connection最多支持2048個channel。
測試啟動publisher-confirms后,500個線程並發發送,部分消息報AmqpResourceNotAvailableException。400個線程通過一個RabbitTemplate並發發送10000消息,最高同時就可能產生1000多的channel。因為channel在等待執行confirm回調。10000消息全部發送在幾秒內完成,10000消息全部confirm回調完成用時22秒,此時所有channel全部釋放。
綁定隊列
若在rabbitmq的管理頁面手動創建隊列和交換機,則可以不再代碼中聲明
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue QueueA() {
return new Queue("hello");
}
@Bean
public Queue QueueB() {
return new Queue("helloObj");
}
/**
* Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的所有隊列都收到這個消息。
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("ABExchange");
}
@Bean
DirectExchange Exchange() {
return new DirectExchange("DExchange");
}
@Bean
Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(QueueA).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(QueueB).to(fanoutExchange);
}
@Bean
Binding bindingExchange() {
return BindingBuilder.bind(QueueA()).to(Exchange()).with("TEST");//routingKey
}
}
消息發送者
ConfirmCallback :ACK=true僅僅標示消息已被Broker接收到,並不表示已成功投放至消息隊列中, ACK=false標示消息由於Broker處理錯誤,消息並未處理成功。如未找到對應交換機返回ACK=false。
ReturnCallback:當消息發送出去找不到對應路由隊列時,將會把消息退回 。如果有任何一個路由隊列接收投遞消息成功,則不會退回消息。MQ成功接收,但是未找到對應隊列觸發
通過以上異步確認機制,增加降級、補償處理。比如發送時保存信息和消息ID,ConfirmCallback 通過ID找到對應信息重發,注意要保證冪等性。
package com.example.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.Date;
//RabbitTemplate.ConfirmCallback
@Service
public class HelloSender implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String context = "你好現在是 " + new Date() +"";
System.out.println("HelloSender發送內容 : " + context);
//消息序列化設置
//rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//自身實現ReturnCallback接口 設置異步回調對象為this
this.rabbitTemplate.setReturnCallback(this);
//若是當前類實現RabbitTemplate.ConfirmCallback接口,則可以設置為this
//發送前給RabbitTemplate設置一個異步回調對象 RabbitTemplate.ConfirmCallback接口的匿名類
this.rabbitTemplate.setConfirmCallback((correlationData, confirm, cause) -> {
//若發送時沒有CorrelationData,則這里correlationData==null
if (!confirm) {
System.out.println("HelloSender消息發送失敗" + cause + correlationData.getId() );
//correlationData.getReturnedMessage(); Message
//correlationData.toString();
} else {
System.out.println("HelloSender 消息發送成功 ");
}
});
//this.rabbitTemplate.setConfirmCallback(this);
//rabbitTemplate.convertAndSend("hello", context);
//這里指定路由鍵,注意不是隊列名
//發送時 可以指定消息ID,方便在ConfirmCallback時候二次處理消息
rabbitTemplate.convertAndSend("DExchange","QueueRoutingKey", context, new CorrelationData("自定義消息ID"));
}
public void sendObj() {
MessageObj obj = new MessageObj();
obj.setACK(false);
obj.setId(123);
obj.setName("zhangsan");
obj.setValue("data");
System.out.println("發送 : " + obj);
this.rabbitTemplate.convertAndSend("helloObj", obj);
}
@Override
public void returnedMessage(Message message, int i, String cause, String exchange, String queue) {
//沒有找到queue
//Message中的成員,Body為消息內容
//(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
}
// @Override
// public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
// System.out.println("sender success");
// }
}
測試發送:
使用Spring默認的rabbitTemplate發送消息,CorrelationData可以重復。
交換機+路由鍵+消息Object+CorrelationData
rabbitTemplate.convertAndSend("TEST.EX","TEST","String:message",new CorrelationData("111"));
在rabbitmq控制台上getmessage查看 ,rabbitTemplate默認發送deliverymode=2消息,已經設置了消息持久化。
測試速度:
測試100個線程同時並發向同一隊列發送簡單消息(15左右長度的字符串)。從發送到100個消息全部完成ConfirmCallback,用時為600ms左右。此過程不計入消費速度。
400個線程通過一個RabbitTemplate並發發送10000消息,同時就可能產生1000左右的channel。因為channel等在confirm。10000消息全部發送在幾秒內完成,10000消息全部confirm回調完成用時22秒。
測試ConfirmCallback回調:
public void confirm(CorrelationData correlationData, boolean confirm, String cause) ;
confirm==true僅僅標示消息已被Broker接收到,並不表示已成功投放至消息隊列中, confirm==false標示消息由於Broker處理錯誤,消息並未處理成功。如未找到對應交換機返回confirm==false。
在此方法中針對confirm==false的消息實現降級/補償處理:重發、本地緩存、計入數據庫/Redis等、更新狀態.....
測試環境:實例化一個ConfirmCallback接口對象,作為rabbitTemplate共用回調處理對象。
回調測試結果:
1 先發送到MQ的消息,先完成confirm回調。
2 ConfirmCallback默認是由同一個線程執行回調,打印線程名可以看到線程名為【AMQP Connection rabbitmqIp:port】
3 若發送時沒有攜帶CorrelationData,回調時這里correlationData==null
4.設置消息確認會影響並發性能,每個線程發送生成一個channel,channel會保持到confirm回調完成再釋放。因為每個connection最多支持2048個channel,當channel達到2048時,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
測試ReturnCallback 回調:
public void returnedMessage(Message message, int i, String cause, String exchange, String queue) ;
MQ成功接收消息,但是未找到對應路由鍵的隊列后回調。實現降級/補償處理。
測試環境:實例化一個ReturnCallback接口對象,作為rabbitTemplate共用回調處理對象。
回調測試結果:
默認是由同一個線程執行回調,打印線程名可以看到線程名為【AMQP Connection rabbitmqIp:port】
message=返回的Message對象中的成員,Body為發送時的消息內容 ,receivedDeliveryMode=PERSISTENT=2 為持久化消息。spring_returned_message_correlation=發送時的CorrelationData
(Body:'String:message' MessageProperties [headers={spring_returned_message_correlation=111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
cause=NO_ROUTE
exchange、queue 為發送時的配置
消息消費者
設置QOS,避免觸發流控機制
#消費者每次從隊列獲取的消息數量 (默認一次250個)
spring.rabbitmq.listener.simple.prefetch= 5
當QUEUE達到5條Unacked消息時,不會再推送消息給Consumer。查看后台管理器中queue的unacked數量
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
@Component
public class HelloReceiver {
@RabbitListener(queues = "hello") //這里是隊列名,不是路由鍵
public void process(String msg,Channel channel, Message message) throws IOException {
System.out.println("HelloReceiver收到 : " + msg +"收到時間"+new Date());
try {
//告訴MQ服務器收到這條消息 已經被我消費了 可以在隊列刪掉 這樣以后就不會再發了 否則消息服務器以為這條消息沒處理掉 后續還會在發
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("receiver success");
} catch (IOException e) {
e.printStackTrace();
//丟棄這條消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("receiver fail");
}
}
}
msg是消息內容,相當於Message對象中的body。
Message對象的成員:
可以看到有消息信息BODY,發送方生成的消息CorrelationData,還有執行的Method對象(@RabbitListener標注的方法),目標BEAN
備注:我們用注解的方式來接受消息 就不要用 自己創建對象實現ChannelAwareMessageListener的方式來接受消息 這種方式還要去全局里面配置麻煩,直接用@RabbitListener(queues = "hello")最簡單
消息確認 因為我在屬性配置文件里面開啟了ACK確認 所以如果代碼沒有執行ACK確認 你在RabbitMQ的后台會看到消息會一直留在隊列里面未消費掉 只要程序一啟動開始接受該隊列消息的時候 又會收到
方法參數詳解:https://www.cnblogs.com/piaolingzxh/p/5448927.html
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
deliveryTag:該消息的index,由發送方生成
multiple:是否批量.true:將一次性ack所有小於deliveryTag的消息。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
deliveryTag:該消息的index
multiple:是否批量.true:將一次性拒絕所有小於deliveryTag的消息。
requeue:被拒絕的是否重新入隊列,true 放在隊首,false 消息進入綁定的DLX。一定注意:若此消息一直Nack重入隊會導致的死循環
channel.basicNack 與 channel.basicReject 的區別在於basicNack可以拒絕多條消息,而basicReject一次只能拒絕一條消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
deliveryTag:該消息的index
requeue:被拒絕的是否重新入隊列。false 消息進入綁定的DLX
ShutdownSignalException
1 隊列名找不到
2 代碼中有ack,但是沒有配置手動ACK
消費超時
消費超時,queue中unacked的消息會退回到queue中,且消費者ACK時會失敗。
使用@Payload和@Headers注解
@Component
public class MessageHandler {
//獲取消息的頭屬性和body屬性
@RabbitListener(queues = "zhihao.miao.order")
public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){
System.out.println("====消費消息===handleMessage");
System.out.println(headers);
System.out.println(body);
}
}
@RabbitListener 和 @RabbitHandler 搭配使用
- @RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
- @RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換后的參數類型
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
序列化
當中默認的序列化類為SimpleMessageConverter。
僅僅有調用了convertAndSend方法才會使用對應的MessageConvert進行消息的序列化與反序列化。
SimpleMessageConverter對於要發送的消息體body為字節數組時。不進行處理。
對於假設是String。則將String轉成字節數組。
對於假設是Java對象,則使用jdk序列化Serializable將消息轉成字節數組。轉出來的結果較大,含class類名。類對應方法等信息。因此性能較差。
當使用RabbitMq作為中間件時,數據量比較大,此時就要考慮使用類似Jackson2JsonMessageConverter。hessian等序列化形式。以此提高性能。
使用 JSON 序列化與反序列化
https://www.jianshu.com/p/911d987b5f11
發送
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
User user = new User("linyuan");
rabbitTemplate.convertAndSend("topic.exchange","queue1",user);
接收
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//json序列化時,若想手動ACK,則必須配置
return factory;
}
@Component
@RabbitListener(queues = "queue1")
public class Receiver {
@RabbitHandler
public void processMessage1(@Payload User user) {
System.out.println(user.getName());
}
}
消費者+json反序列化 造成手動ACK配置失效
解決方案: https://blog.csdn.net/m912595719/article/details/83787486
這是springboot集成RabbitMQ的一個大坑。當消費者配置JSON反序列化時,配置文件中的手動ACK會失效,消費者會變成自動ACK模式。spring.rabbitmq.listener.direct.acknowledge-mode=manual,spring.rabbitmq.listener.simple.acknowledge-mode=manual 配置失效。
解決方法是消費者配置RabbitListenerContainerFactory這個Bean時(見上),設置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)。把消費者強制轉換為手動ACK。
如果配置失效切換為自動ACK,但是代碼中又使用channel.basicAck手動ACK。這樣會造成雙ACK的ERROR,接着信道會重啟重連。如下:
o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
unknown delivery tag 1表示當前Channel中找不到delivery-tag=1的消息,其實是這個消息已經自動ACK了,basicAck時就會出錯。測試顯示,消息並不會丟失而是在出現ERROR異常后走向Nack后重新入隊,再多次重復消費后最終ACK成功,嚴重降低消費者的執行效率。
Delivery Tags投遞的標識
當一個消費者向RabbitMQ注冊后,RabbitMQ會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它在一個channel中唯一代表了一次投遞。delivery tag的唯一標識范圍限於channel. delivery tag是單調遞增的正整數,客戶端獲取投遞的方法用用dellivery tag作為一個參數。
TestController測試
@Autowired
private HelloSender helloSender;
/**
* 單生產者-單個消費者
*/
@RequestMapping("/test")
public void hello() throws Exception {
helloSender.send();
}
發送消息
ACK場景測試
我們把HelloReceiver的ACK確認代碼注釋掉 ,那消息就算程序收到了, 但是未確認ACK導致消息服務器以為他是未成功消費的,若此時消費者斷開則消息返回隊列,后續還會再發。