消息隊列如何實現流量削峰?
要對流量進行削峰,最容易想到的解決方案就是用消息隊列來緩沖瞬時流量,把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去。
這里就不講springboot
和rabbitmq
如何集成了,參考文章https://www.cnblogs.com/fantongxue/p/12493497.html
一,准備工作:
數據庫有一張商品表,庫存量是100。現在有1000個消費者准備開搶這100個庫存。
t_product
表維護商品編號與商品庫存剩余數量。編號No123321的這種商品的庫存量有100個。
t_product_record
維護搶到商品的用戶ID。理論上t_product
表開搶后的 記錄數量應該是100條(共有100個人搶到了商品)。
我們使用壓力測試工具jweter
對其進行並發性測試。
二,springboot開始集成rabbitmq
1,加入amqp的依賴
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
2,配置application.yml配置文件
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
username: root
password: 1234
rabbitmq:
host: 101.201.101.206
username: guest
password: guest
publisher-confirms: true # 開啟Rabbitmq發送消息確認機制,發送消息到隊列並觸發回調方法
publisher-returns: true
listener:
simple:
concurrency: 10 #消費者數量
max-concurrency: 10 #最大消費者數量
prefetch: 1 #限流(消費者每次從隊列獲取的消息數量)
auto-startup: true #啟動時自動啟動容器
acknowledge-mode: manual #開啟ACK手動確認模式
3,RabbitConfig配置類
1, 定義消息轉換實例 ,轉化成 JSON
傳輸
2 , 配置啟用rabbitmq
事務
package com.aaa.springredis.controller;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitConfig {
/**
* 定義消息轉換實例 ,轉化成 JSON傳輸
*
* @return Jackson2JsonMessageConverter
*/
@Bean
public MessageConverter integrationEventMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 配置啟用rabbitmq事務
*
* @param connectionFactory connectionFactory
* @return RabbitTransactionManager
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
4,初始化rabbitmq的回調函數
說明:被@PostConstruct
修飾的方法會在服務器加載Servlet
的時候運行,並且只會被服務器執行一次。如果想在生成對象時完成某些初始化操作,而偏偏這些初始化操作又依賴於依賴注入,那么久無法在構造函數中實現。為此,可以使用@PostConstruct
注解一個方法來完成初始化,@PostConstruct
注解的方法將會在依賴注入完成后被自動調用。
回調函數的使用前提是配置文件中開啟了rabitmq
消息確認機制
Constructor >> @Autowired >> @PostConstruct
@Autowired
RabbitTemplate rabbitTemplate;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitController.class);
@PostConstruct
private void init(){
/**
* 消息發送到交換器Exchange后觸發回調。
* 使用該功能需要開啟確認,spring-boot中配置如下:
* spring.rabbitmq.publisher-confirms = true
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println("消息已確認 cause:{"+s+"} - {"+correlationData+"}");
} else {
System.out.println("消息未確認 cause:{"+s+"} - {"+correlationData+"}");
}
}
});
/**
* 通過實現ReturnCallback接口,
* 如果消息從交換器發送到對應隊列失敗時觸發
* 比如根據發送消息時指定的routingKey找不到隊列時會觸發
* 使用該功能需要開啟確認,spring-boot中配置如下:
* spring.rabbitmq.publisher-returns = true
*/
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
LOGGER.error("消息被退回:{}", message);
LOGGER.error("消息使用的交換機:{}", exchange);
LOGGER.error("消息使用的路由鍵:{}", routingKey);
LOGGER.error("描述:{}", replyText);
}
});
}
三,開始測試
1,寫搶單測試類
寫搶單測試類,我們使用jweter
壓力測試工具開啟1000個線程進行測試(開啟多線程並發測試),所以為了區別每一個模擬的用戶,使用userId
累加的方式進行區分。
private int userId=0;
//開始搶單
@RequestMapping("/begin")
@ResponseBody
public void begin(){
userId++;
this.send(userId);
}
而上面的send
方法就是把接收到的用戶請求發送到rabbitmq
消息中間件中。
@RequestMapping("/send")
@ResponseBody
public String send(Integer messge){
//第一個參數:交換機名字 第二個參數:Routing Key的值 第三個參數:傳遞的消息對象
rabbitTemplate.convertAndSend("test.direct","test",messge);
return "發送消息成功";
}
2,配置rabbitmq監聽方法
rabbitmq
監聽上篇文章也說過了,作用就是監聽指定隊列中收到來自交換機的消息,來一條收一條,收完為止!
通過 ACK
確認是否被正確接收,每個Message
都要被確認,可以手動去 ACK
或自動 ACK
,如果信息消費失敗的話會拒絕當前消息,並把消息返回原隊列。
從隊列中收到用戶的userId
,然后進行購買商品模擬操作(減少一個庫存,新增一條購買記錄)
@Autowired
RabbitController controller;
/**
* @RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
* @RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,
* 根據 MessageConverter 轉換后的參數類型
*
* 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
*
* 通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK
*/
@RabbitListener(queues = "test") //指定監聽的隊列名
public void receiver(@Payload Integer userId, @Headers Channel channel, Message message) throws IOException {
LOGGER.info("用戶{}開始搶單", userId);
try {
//處理消息
controller.robbingProduct(userId);
// 確認消息已經消費成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
LOGGER.error("消費處理異常:{} - {}", userId, e);
// 拒絕當前消息,並把消息返回原隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
購買商品的方法
public void robbingProduct(Integer userId){
Product product = testDao.selectProductByNo("No123321");
if (product != null && product.getTotal() > 0) {
//更新庫存表,庫存量減少1。返回1說明更新成功。返回0說明庫存已經為0
int i = testDao.updateProduct("No123321");
if(i>0){
//插入記錄
testDao.insertProductRecord(new ProductRecord("No123321", userId));
//發送短信
LOGGER.info("用戶{}搶單成功", userId);
}else {
LOGGER.error("用戶{}搶單失敗", userId);
}
} else {
LOGGER.error("用戶{}搶單失敗", userId);
}
}
3,jweter工具測試並發
jweter
壓力測試工具如何使用百度吧,這里忽略!
控制台打印
而數據庫中的庫存變成了0
購買記錄中存放了搶單成功的用戶id
(100條記錄)
當然,剩下的900個用戶都搶單失敗了!
rabbitmq
隊列是先進先出的順序,先來后到,1000個請求你也得給我排隊,前100個請求搶單成功之后就注定了后900個請求是搶單失敗的!
使用RabbitMQ
的最主要變化就是:以前搶單操作請求直接由我們搶單應用程序執行,現在請求被轉移到了RabbitMQ
服務器中。RabbitMQ
服務器把接收到的搶單請求進行排隊,最后由RabbitMQ
服務器把搶單請求轉發到我們的搶單應用程序,這樣的好處就是避免我們的搶單應用程序短時間直接處理大量請求。RabbitMQ
服務器主要作用是減緩搶單應用程序的並發壓力,相當於在我們的搶單程序之前加了一道請求緩沖區。
實戰結束!
工程地址:https://github.com/fantongxue666/rabbitmq-seckill