淘寶的七天自動收貨,在我們簽收商品后,物流系統會在七天之后延時發送一個消息給支付系統,通知系統將款打給商家,這個過程會持續七天-------------因為使用了消息中間件的延遲推送功能。
在比如說12306購票支付確認頁面,我們在選好票點擊確定跳轉的頁面中往往都會有倒計時,代表着30分鍾內訂單不確認的話,將會自動取消訂單-------------其實在下訂單的那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鍾,就告訴訂單系統未完成,如果我們在30分鍾內完成了訂單的話,則可以通過邏輯代碼判斷來忽略收到的消息。
————————————————————在上面的兩種場景中,我們可以使用兩種傳統方案降低系統的整體性能和吞吐量————————————————————————
1.使用redis給訂單設置過期時間,最后通過判斷redis中是否還有該訂單來解決訂單是否完成。但是這種消息的延遲推送性能較低,因為redis都是儲存在內存中的,如果我們遇到惡意刷單或者惡意下單的將會給內存帶來巨大的壓力。
2.如果使用傳統的數據庫的話,輪詢來判斷數據庫表中的訂單的狀態,這樣的性能極低。
3.如果使用JVM原生的DelayQueue,也是大量的占用內存,並且沒有持久化策略,如果遇到系統宕機或者重啟都會丟失訂單信息。
消息延遲推送的實現
在RabbitMQ 3.6.x之前一般采用的是死信隊列+TTL過期時間來實現
在RabbitMQ 3.6.x開始,我們可以使用延遲隊列的插件
首先我們創建交換機和消息隊列,application.properties中配置
1 import org.springframework.amqp.core.*; 2 import org.springframework.context.annotation.Bean; 3 import org.springframework.context.annotation.Configuration; 4 5 import java.util.HashMap; 6 import java.util.Map; 7 8 @Configuration 9 public class MQConfig { 10 11 public static final String LAZY_EXCHANGE = "Ex.LazyExchange"; 12 public static final String LAZY_QUEUE = "MQ.LazyQueue"; 13 public static final String LAZY_KEY = "lazy.#"; 14 15 @Bean 16 public TopicExchange lazyExchange(){ 17 //Map<String, Object> pros = new HashMap<>(); 18 //設置交換機支持延遲消息推送 19 //pros.put("x-delayed-message", "topic"); 20 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros); 21 exchange.setDelayed(true); 22 return exchange; 23 } 24 25 @Bean 26 public Queue lazyQueue(){ 27 return new Queue(LAZY_QUEUE, true); 28 } 29 30 @Bean 31 public Binding lazyBinding(){ 32 return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY); 33 } 34 }
我們在Exchange的聲明中可以設置exchange.setDelayed(true)來開啟延時隊列,當然也可以使用以下內容傳入交換機的方法中,因為第一種方式的底層就是通過這種方式實現的。
1 //Map<String, Object> pros = new HashMap<>(); 2 //設置交換機支持延遲消息推送 3 //pros.put("x-delayed-message", "topic"); 4 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
發送消息我們需要指定的延遲推送的時間,我們在這里發送消息的方法中傳入參數 new MessagePostProcessor() 是為了獲得Message()對象,因為需要借助Message對象的api來設置時間。
1 import com.anqi.mq.config.MQConfig; 2 import org.springframework.amqp.AmqpException; 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.core.MessageDeliveryMode; 5 import org.springframework.amqp.core.MessagePostProcessor; 6 import org.springframework.amqp.rabbit.connection.CorrelationData; 7 import org.springframework.amqp.rabbit.core.RabbitTemplate; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.stereotype.Component; 10 11 import java.util.Date; 12 13 @Component 14 public class MQSender { 15 16 @Autowired 17 private RabbitTemplate rabbitTemplate; 18 19 //confirmCallback returnCallback 代碼省略,請參照上一篇 20 21 public void sendLazy(Object message){ 22 rabbitTemplate.setMandatory(true); 23 rabbitTemplate.setConfirmCallback(confirmCallback); 24 rabbitTemplate.setReturnCallback(returnCallback); 25 //id + 時間戳 全局唯一 26 CorrelationData correlationData = new CorrelationData("12345678909"+new Date()); 27 28 //發送消息時指定 header 延遲時間 29 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message, 30 new MessagePostProcessor() { 31 @Override 32 public Message postProcessMessage(Message message) throws AmqpException { 33 //設置消息持久化 34 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); 35 //message.getMessageProperties().setHeader("x-delay", "6000"); 36 message.getMessageProperties().setDelay(6000); 37 return message; 38 } 39 }, correlationData); 40 } 41 }
我們可以觀察setDelay(Integer i)底層代碼,也是在header中設置x-delay。等同於我們手動設置
1 message.getMessageProperties().setHeader("x-delay", "6000"); 2 /** 3 * Set the x-delay header. 4 * @param delay the delay. 5 * @since 1.6 6 */ 7 public void setDelay(Integer delay) { 8 if (delay == null || delay < 0) { 9 this.headers.remove(X_DELAY); 10 } 11 else { 12 this.headers.put(X_DELAY, delay); 13 } 14 }
——————————————————————————消化端進行消費————————————————————
1 import com.rabbitmq.client.Channel; 2 import org.springframework.amqp.rabbit.annotation.*; 3 import org.springframework.amqp.support.AmqpHeaders; 4 import org.springframework.stereotype.Component; 5 6 import java.io.IOException; 7 import java.util.Map; 8 9 @Component 10 public class MQReceiver { 11 12 @RabbitListener(queues = "MQ.LazyQueue") 13 @RabbitHandler 14 public void onLazyMessage(Message msg, Channel channel) throws IOException{ 15 long deliveryTag = msg.getMessageProperties().getDeliveryTag(); 16 channel.basicAck(deliveryTag, true); 17 System.out.println("lazy receive " + new String(msg.getBody())); 18 19 } 20 ``` 21 22 ## 測試結果[#](https://www.cnblogs.com/haixiang/p/10966985.html#3724420099) 23 24 ```java 25 import org.junit.Test; 26 import org.junit.runner.RunWith; 27 import org.springframework.beans.factory.annotation.Autowired; 28 import org.springframework.boot.test.context.SpringBootTest; 29 import org.springframework.test.context.junit4.SpringRunner; 30 31 @SpringBootTest 32 @RunWith(SpringRunner.class) 33 public class MQSenderTest { 34 35 @Autowired 36 private MQSender mqSender; 37 38 @Test 39 public void sendLazy() throws Exception { 40 String msg = "hello spring boot"; 41 42 mqSender.sendLazy(msg + ":"); 43 } 44 }
我們在6秒收到了消息 lazy receive hello spring boot