前情提要:rabbitmq 管理界面查看姿勢
一、快速搭建/基本信息發送和消費
1、引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、application.yml
spring: rabbitmq: host: ipXXX port: 5672 username: 賬戶XXX password: 密碼XXX virtual-host: /wen # 交換器名稱
以 direct模式為例
1、配置文件 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class RabbitConfig { //隊列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設置一下隊列的持久化就好,其余兩個就是默認false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設置一下隊列的持久化就好,其余兩個就是默認false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設置一下隊列的持久化就好,其余兩個就是默認false return new Queue("weixin.fanout.queue", true); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒后過期 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } //Direct交換機 起名:TestDirectExchange @Bean public DirectExchange fanoutOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("fanout_exchange", true, false); } //綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); } @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with(""); } } 2、生產者 package com.pit.barberShop.common.MQ.Rabbit.fanout; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author :wenye * @date :Created in 2021/6/15 21:41 * @description:廣播模式 * @version: $ */ @RestController @RequestMapping("/rabbitmq") public class ProducerFanout { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = "fanout_exchange"; // 2: 路由key private String routeKey = ""; @RequestMapping("/fanout") public void markerFanout() { String message ="shua"; // 發送消息 rabbitTemplate.convertAndSend(exchangeName, routeKey, message); } @RequestMapping("/ttl") public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 設置過期時間,單位:毫秒 byte[] msgBytes = "測試消息自動過期".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message); return "ok"; } } 3、消費者 package com.pit.barberShop.common.MQ.Rabbit.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @author :wenye * @date :Created in 2021/6/15 22:07 * @description:fanout消費者 * @version: $ */ @Component public class ConsumerFanout { @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "fanout_exchange", // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式 type = ExchangeTypes.DIRECT) )) public void messagerevice(String message){ // 此處省略發郵件的邏輯 System.out.println("sms-two111------------->" + message); } @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "fanout_exchange", // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式 type = ExchangeTypes.DIRECT) )) public void messageWXrevice(String message){ // 此處省略發郵件的邏輯 System.out.println("weixin----two---------->" + message); } }
二、過期時間
1、生產者發送消息時設置過期時間 public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 設置過期時間,單位:毫秒 byte[] msgBytes = "測試消息自動過期".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message); return "ok"; } 2、隊列中的所有消息設置過期時間 配置中添加 @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒后過期 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒后過期 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); }
三、消息確認機制配置
有兩種機制能保證消息一致性 1、消息確認機制 2、事務機制
事務機制、
參考:
[
https://spring.hhui.top/sprin...](http://)
@Service public class TransactionPublisher implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate transactionRabbitTemplate; @PostConstruct public void init() { // 將信道設置為事務模式 transactionRabbitTemplate.setChannelTransacted(true); transactionRabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("事務 " + message + " 發送失敗"); } /** * 一般的用法,推送消息 * 調用這個方法發送消息是:配置要開啟信道確認和ReturnCallback 消費者異常 *transactionRabbitTemplate.setChannelTransacted(true); * transactionRabbitTemplate.setReturnCallback(this); * @param ans * @return */ @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager") public String publish(String ans) { String msg = "transaction msg = " + ans; System.out.println("publish: " + msg); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData); return msg; } }
消息確認機制
參考:https://blog.csdn.net/qq33098...
默認是自動應答
spring: rabbitmq: # 開啟發送確認 publisher-confirms: true # 開啟發送失敗退回 publisher-returns: true
目前回調存在ConfirmCallback和ReturnCallback兩者。他們的區別在於
如果消息沒有到exchange,則ConfirmCallback回調,ack=false,
如果消息到達exchange,則ConfirmCallback回調,ack=true
exchange到queue成功,則不回調ReturnCallback
rabbitMQ 消息生產者發送消息的流程
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * correlationData:對象內部只有一個 id 屬性,用來表示當前消息的唯一性。 * ack:消息投遞到broker 的狀態,true表示成功。 * cause:表示投遞失敗的原因。 **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("消息發送異常!"); } else { log.info("發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } } import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { //重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey路由(隊列) @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } } 配置文件 1、防止重復簽發ack需要在配置類中重寫 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //此處也設置為手動ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } 2、重新創建設置交換器和隊列屬性 @Bean public Queue chongfuQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設置一下隊列的持久化就好,其余兩個就是默認false return new Queue("chongfu.fanout.queue", true); } //Direct交換機 起名:TestDirectExchange @Bean public DirectExchange chongfuExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("chongfu_exchange", true, false); } @Bean public Binding bindingDirect4() { return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with(""); } 生產者 public void markerchongfu() { /** * 確保消息發送失敗后可以重新返回到隊列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消費者確認收到消息后,手動ack回執回調處理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投遞到隊列失敗回調處理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 發送消息 */ String s = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帥哥", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(s)); } 消費者 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。 value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"), // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "chongfu_exchange", // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式 type = ExchangeTypes.DIRECT) )) public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); // log.info("序號:{}", message.getMessageProperties().getDeliveryTag()); // System.out.println(msg); //TODO 具體業務 // 收到消息 basicAck() channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重復處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息 } else { log.error("消息即將再次返回隊列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
消費消息有三種回執方法
1、basicAck
basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。
void basicAck(long deliveryTag, boolean multiple)
- deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行ack、nack、reject等操作。
- multiple:是否批量確認,值為 true 則會一次性 ack所有小於當前消息 deliveryTag 的消息。
舉個栗子: 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。
2、basicNack
basicNack :表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:表示消息投遞序號。
- multiple:是否批量確認。
- requeue:值為 true 消息將重新入隊列。
3、basicReject
basicReject:拒絕消息,與basicNack區別在於不能進行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:表示消息投遞序號。
- requeue:值為 true 消息將重新入隊列。
四、死信隊列
死信隊列其實和普通的隊列沒啥大的區別,都需要創建自己的Queue、Exchange,然后通過RoutingKey綁定到Exchange上去,只不過死信隊列的RoutingKey和Exchange要作為參數,綁定到正常的隊列上去,一種應用場景是正常隊列里面的消息被basicNack或者reject時,消息就會被路由到正常隊列綁定的死信隊列中,還有一種還有常用的場景就是開啟了自動簽收,然后消費者消費消息時出現異常,超過了重試次數,那么這條消息也會進入死信隊列,如果配置了話,
例子
//模擬異常用的交換器 ,topic交換器會通配符匹配,當然字符串一模一樣也會匹配 @Bean TopicExchange emailExchange() { return new TopicExchange("demoTopicExchange"); } //死信隊列 @Bean public Queue deadLetterQueue() { return new Queue("demo.dead.letter"); } //死信交換器 @Bean TopicExchange deadLetterExchange() { return new TopicExchange("demoDeadLetterTopicExchange"); } //綁定死信隊列 @Bean Binding bindingDeadLetterQueue() { return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter"); } 生產者 @RequestMapping("/sixin") public void sendEmailMessage() { CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData); log.info("---發送 email 消息---{}---messageId---{}","111",correlationData.getId()); } 消費者 /** * 郵件消費者 * @param message * @param channel * @throws IOException */ @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。 value = @Queue(value = "demo.email",autoDelete = "false", arguments = { @Argument(name = "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"), @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"), @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") }), key = "demo.email", // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "demoTopicExchange", // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式 type = ExchangeTypes.TOPIC) )) public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException { try { log.info("---接受到消息---{}",msg); //主動異常 int m=1/0; //手動簽收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //異常,ture 重新入隊,或者false,進入死信隊列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } } /** * 死信消費者,自動簽收開啟狀態下,超過重試次數,或者手動簽收,reject或者Nack * @param message */ @RabbitListener(queues = "demo.dead.letter") public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException { //可以考慮數據庫記錄,每次進來查數量,達到一定的數量,進行預警,人工介入處理 log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation")); //回復ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
同樣也可使用java類配置
@Bean public Queue emailQueue() { Map<String, Object> arguments = new HashMap<>(2); // 綁定死信交換機 arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange"); // 綁定死信的路由key arguments.put("x-dead-letter-routing-key", "demo.dead.letter"); arguments.put("x-message-ttl", 3000); return new Queue(emailQueue,true,false,false,arguments); } @Bean TopicExchange emailExchange() { return new TopicExchange(topicExchange); } @Bean Binding bindingEmailQueue() { return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#"); }
五、持久化機制和內存磁盤監控
1、持久化
RabbitMQ的持久化隊列分為:
1:隊列持久化
2:消息持久化
3:交換機持久化
不論是持久化的消息還是非持久化的消息都可以寫入到磁盤中,只不過非持久的是等內存不足的情況下才會被寫入到磁盤中。
2、內存磁盤監控
六、分布式事務
1、如何確保生產者消息能夠被消費
基於MQ的分布式事務消息的可靠生產問題-定時重發
![]()
使用RabbitMQ的確認機制在消息進入交換器后確認消息時修改數據中的消息狀態,定時重發:及由於某些原因消息沒成功發送到交換機,在DB存儲消息信息,一定時間內再進行發送,如果多次還未成功說明此消息存在問題(消息表多加個字段記錄次數就是)。
2、如何保證消費者一定消費了消息
問題:如何保證出現異常觸發重試機制而不會引發死循環
- 控制重發次數+死信隊列 重試次數只針對自動應答模式,與手動沖突
- try————catch + 手動ACk
- try__catch+ 手動ACk+死信隊列
七、配置詳解
rabbitmq: addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client連接到的server的地址,多個以逗號分隔(優先取addresses,然后再取host) # port: ##集群配置 addresses之間用逗號隔開 # addresses: ip:port,ip:port password: admin username: 123456 virtual-host: / # 連接到rabbitMQ的vhost requested-heartbeat: #指定心跳超時,單位秒,0為不指定;默認60s publisher-confirms: #是否啟用 發布確認 publisher-reurns: # 是否啟用發布返回 connection-timeout: #連接超時,單位毫秒,0表示無窮大,不超時 cache: channel.size: # 緩存中保持的channel數量 channel.checkout-timeout: # 當緩存數量被設置時,從緩存中獲取一個channel的超時時間,單位毫秒;如果為0,則總是創建一個新channel connection.size: # 緩存的連接數,只有是CONNECTION模式時生效 connection.mode: # 連接工廠緩存模式:CHANNEL 和 CONNECTION listener: simple.auto-startup: # 是否啟動時自動啟動容器 simple.acknowledge-mode: # 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto simple.concurrency: # 最小的消費者數量 simple.max-concurrency: # 最大的消費者數量 simple.prefetch: # 指定一個請求能處理多少個消息,如果有事務的話,必須大於等於transaction數量. simple.transaction-size: # 指定一個事務處理的消息數量,最好是小於等於prefetch的數量. simple.default-requeue-rejected: # 決定被拒絕的消息是否重新入隊;默認是true(與參數acknowledge-mode有關系) simple.idle-event-interval: # 多少長時間發布空閑容器時間,單位毫秒 simple.retry.enabled: # 監聽重試是否可用 simple.retry.max-attempts: # 最大重試次數 simple.retry.initial-interval: # 第一次和第二次嘗試發布或傳遞消息之間的間隔 simple.retry.multiplier: # 應用於上一重試間隔的乘數 simple.retry.max-interval: # 最大重試時間間隔 simple.retry.stateless: # 重試是有狀態or無狀態 template: mandatory: # 啟用強制信息;默認false receive-timeout: # receive() 操作的超時時間 reply-timeout: # sendAndReceive() 操作的超時時間 retry.enabled: # 發送重試是否可用 retry.max-attempts: # 最大重試次數 retry.initial-interval: # 第一次和第二次嘗試發布或傳遞消息之間的間隔 retry.multiplier: # 應用於上一重試間隔的乘數 retry.max-interval: #最大重試時間間隔