業務分析
一般而言,商品秒殺大概可以拆分成以下幾步:
- 用戶校驗
校驗是否多次搶單,保證每個商品每個用戶只能秒殺一次 - 下單
訂單信息進入消息隊列,等待消費 - 減少庫存
消費訂單消息,減少商品庫存,增加訂單記錄 - 付款
十五分鍾內完成支付,修改支付狀態
創建表
goods_info 商品庫存表
列 | 說明 |
---|---|
id | 主鍵(uuid) |
goods_name | 商品名稱 |
goods_stock | 商品庫存 |
package com.jason.seckill.order.entity;
/**
* 商品庫存
*/
public class GoodsInfo {
private String id;
private String goodsName;
private String goodsStock;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public String getGoodsStock() {
return goodsStock;
}
public void setGoodsStock(String goodsStock) {
this.goodsStock = goodsStock;
}
@Override
public String toString() {
return "GoodsInfo{" +
"id='" + id + '\'' +
", goodsName='" + goodsName + '\'' +
", goodsStock='" + goodsStock + '\'' +
'}';
}
}
order_info 訂單記錄表
列 | 說明 |
---|---|
id | 主鍵(uuid) |
user_id | 用戶id |
goods_id | 商品id |
pay_status | 支付狀態(0-超時未支付 1-已支付 2-待支付) |
package com.jason.seckill.order.entity;
/**
* 下單記錄
*/
public class OrderRecord {
private String id;
private String userId;
private String goodsId;
/**
* 0-超時未支付 1-已支付 2-待支付
*/
private Integer payStatus;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getGoodsId() {
return goodsId;
}
public void setGoodsId(String goodsId) {
this.goodsId = goodsId;
}
public Integer getPayStatus() {
return payStatus;
}
public void setPayStatus(Integer payStatus) {
this.payStatus = payStatus;
}
@Override
public String toString() {
return "OrderRecord{" +
"id='" + id + '\'' +
", userId='" + userId + '\'' +
", goodsId='" + goodsId + '\'' +
'}';
}
}
功能實現
1.用戶校驗
使用redis做用戶校驗,保證每個用戶每個商品只能搶一次,上代碼:
public boolean checkSeckillUser(OrderRequest order) {
String key = env.getProperty("seckill.redis.key.prefix") + order.getUserId() + order.getGoodsId();
return redisTemplate.opsForValue().setIfAbsent(key,"1");
}
userId+orderId的組合作為key,利用redis的setnx分布式鎖原理來實現。如果是限時秒殺,可以通過設置key的過期時間來實現。
2.下單
下單信息肯定是要先扔到消息隊列里的,這里采用RabbitMQ來做消息隊列,先來看一下消息隊列的模型圖:
rabbitmq的配置:
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消費者數量
spring.rabbitmq.listener.simple.concurrency=5
#最大消費者數量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費者每次從隊列獲取的消息數量。寫多了,如果長時間得不到消費,數據就一直得不到處理
spring.rabbitmq.listener.simple.prefetch=1
#消費接收確認機制-手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
mq.env=local
#訂單處理隊列
#交換機名稱
order.mq.exchange.name=${mq.env}:order:mq:exchange
#隊列名稱
order.mq.queue.name=${mq.env}:order:mq:queue
#routingkey
order.mq.routing.key=${mq.env}:order:mq:routing:key
rabbitmq配置類OrderRabbitmqConfig:
/**
* rabbitmq配置
*/
@Configuration
public class OrderRabbitmqConfig {
private static final Logger logger = LoggerFactory.getLogger(OrderRabbitmqConfig.class);
@Autowired
private Environment env;
/**
* channel鏈接工廠
*/
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 監聽器容器配置
*/
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 聲明rabbittemplate
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
//消息發送成功確認,對應application.properties中的spring.rabbitmq.publisher-confirms=true
connectionFactory.setPublisherConfirms(true);
//消息發送失敗確認,對應application.properties中的spring.rabbitmq.publisher-returns=true
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//設置消息發送格式為json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
//消息發送到exchange回調 需設置:spring.rabbitmq.publisher-confirms=true
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
//消息從exchange發送到queue失敗回調 需設置:spring.rabbitmq.publisher-returns=true
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
//---------------------------------------訂單隊列------------------------------------------------------
/**
* 聲明訂單隊列的交換機
* @return
*/
@Bean("orderTopicExchange")
public TopicExchange orderTopicExchange(){
//設置為持久化 不自動刪除
return new TopicExchange(env.getProperty("order.mq.exchange.name"),true,false);
}
/**
* 聲明訂單隊列
* @return
*/
@Bean("orderQueue")
public Queue orderQueue(){
return new Queue(env.getProperty("order.mq.queue.name"),true);
}
/**
* 將隊列綁定到交換機
* @return
*/
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(env.getProperty("order.mq.routing.key"));
}
/**
* 注入訂單對列消費監聽器
*/
@Autowired
private OrderListener orderListener;
/**
* 聲明訂單隊列監聽器配置容器
* @return
*/
@Bean("orderListenerContainer")
public SimpleMessageListenerContainer orderListenerContainer(){
//創建監聽器容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//將配置信息和鏈接信息賦給容器工廠
factoryConfigurer.configure(factory,connectionFactory);
//容器工廠創建監聽器容器
SimpleMessageListenerContainer container = factory.createListenerContainer();
//指定監聽器
container.setMessageListener(orderListener);
//指定監聽器監聽的隊列
container.setQueues(orderQueue());
return container;
}
}
配置類聲明了訂單隊列,交換機,通過指定的routingkey綁定了隊列與交換機。另外,rabbitTemplate用來發送消息,ListenerContainer指定監聽器(消費者)監聽的隊列。
客戶下單,生產消息,上代碼:
@Service
public class SeckillService {
private static final Logger logger = LoggerFactory.getLogger(SeckillService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
/**
* 生產消息
* @param order
*/
public void seckill(OrderRequest order){
//設置交換機
rabbitTemplate.setExchange(env.getProperty("order.mq.exchange.name"));
//設置routingkey
rabbitTemplate.setRoutingKey(env.getProperty("order.mq.routing.key"));
//創建消息體
Message msg = MessageBuilder.withBody(JSON.toJSONString(order).getBytes()).build();
//發送消息
rabbitTemplate.convertAndSend(msg);
}
}
很簡單,操作rabbitTemplate,指定交換機和routingkey,發送消息到綁定的隊列,等待消費處理。
3.減少庫存
消費者消費訂單消息,做業務處理。
看一下監聽器(消費者)OrderListener:
/**
* 消息監聽器(消費者)
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(OrderListener.class);
@Autowired
private OrderService orderService;
/**
* 處理接收到的消息
* @param message 消息體
* @param channel 通道,確認消費用
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
//獲取交付tag
long tag = message.getMessageProperties().getDeliveryTag();
String str = new String(message.getBody(),"utf-8");
logger.info("接收到的消息:{}",str);
JSONObject obj = JSONObject.parseObject(str);
//下單,操作數據庫
orderService.order(obj.getString("userId"),obj.getString("goodsId"));
//確認消費
channel.basicAck(tag,true);
}catch(Exception e){
logger.error("消息監聽確認機制發生異常:",e.fillInStackTrace());
}
}
}
業務處理 OrderService:
@Service
public class OrderService {
@Resource
private SeckillMapper seckillMapper;
/**
* 下單,操作數據庫
* @param userId
* @param goodsId
*/
@Transactional()
public void order(String userId,String goodsId){
//該商品庫存-1(當庫存>0時)
int count = seckillMapper.reduceGoodsStockById(goodsId);
//更新成功,表明搶單成功,插入下單記錄,支付狀態設為2-待支付
if(count > 0){
OrderRecord orderRecord = new OrderRecord();
orderRecord.setId(CommonUtils.createUUID());
orderRecord.setGoodsId(goodsId);
orderRecord.setUserId(userId);
orderRecord.setPayStatus(2);
seckillMapper.insertOrderRecord(orderRecord);
}
}
}
Dao接口和Mybatis文件就不往出貼了,這里的邏輯是,update goods_info set goods_stock = goods_stock-1 where goods_stock > 0 and id=#{goodsId},這條update相當於將查詢庫存和減少庫存合並為一個原子操作,避免高並發問題,執行成功,插入訂單記錄,執行失敗,則庫存不夠搶單失敗。
4.支付
訂單處理完成后,如果庫存減少,也就是搶單成功,那么需要用戶在十五分鍾內完成支付,這塊就要用到死信隊列(延遲隊列)來處理了,先看模型圖:
DLX:dead-letter Exchange 死信交換機
DLK:dead-letter RoutingKey 死信路由
ttl:time-to-live 超時時間
死信隊列中,消息到期后,會通過DLX和DLK進入到pay-queue,進行消費。這是另一組消息隊列,和訂單消息隊列是分開的。這里注意他們的綁定關系,主交換機綁定死信隊列,死信交換機綁定的是主隊列(pay queue)。
接下來聲明圖中的一系列組件,首先application.properties中增加配置:
#支付處理隊列
#主交換機
pay.mq.exchange.name=${mq.env}:pay:mq:exchange
#死信交換機(DLX)
pay.dead-letter.mq.exchange.name=${mq.env}:pay:dead-letter:mq:exchange
#主隊列
pay.mq.queue.name=${mq.env}:pay:mq:queue
#死信隊列
pay.dead-letter.mq.queue.name=${mq.env}:pay:dead-letter:mq:queue
#主routingkey
pay.mq.routing.key=${mq.env}:pay:mq:routing:key
#死信routingkey(DLK)
pay.dead-letter.mq.routing.key=${mq.env}:pay:dead-letter:mq:routing:key
#支付超時時間(毫秒)(TTL),測試原因,這里模擬5秒,如果是生產環境,這里可以是15分鍾等
pay.mq.ttl=5000
配置類OrderRabbitmqConfig中增加支付隊列和死信隊列的聲明:
/**
* 死信隊列,十五分鍾超時
* @return
*/
@Bean
public Queue payDeadLetterQueue(){
Map args = new HashMap();
//聲明死信交換機
args.put("x-dead-letter-exchange",env.getProperty("pay.dead-letter.mq.exchange.name"));
//聲明死信routingkey
args.put("x-dead-letter-routing-key",env.getProperty("pay.dead-letter.mq.routing.key"));
//聲明死信隊列中的消息過期時間
args.put("x-message-ttl",env.getProperty("pay.mq.ttl",int.class));
//創建死信隊列
return new Queue(env.getProperty("pay.dead-letter.mq.queue.name"),true,false,false,args);
}
/**
* 支付隊列交換機(主交換機)
* @return
*/
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(env.getProperty("pay.mq.exchange.name"),true,false);
}
/**
* 將主交換機綁定到死信隊列
* @return
*/
@Bean
public Binding payBinding(){
return BindingBuilder.bind(payDeadLetterQueue()).to(payTopicExchange()).with(env.getProperty("pay.mq.routing.key"));
}
/**
* 支付隊列(主隊列)
* @return
*/
@Bean
public Queue payQueue(){
return new Queue(env.getProperty("pay.mq.queue.name"),true);
}
/**
* 死信交換機
* @return
*/
@Bean
public TopicExchange payDeadLetterExchange(){
return new TopicExchange(env.getProperty("pay.dead-letter.mq.exchange.name"),true,false);
}
/**
* 將主隊列綁定到死信交換機
* @return
*/
@Bean
public Binding payDeadLetterBinding(){
return BindingBuilder.bind(payQueue()).to(payDeadLetterExchange()).with(env.getProperty("pay.dead-letter.mq.routing.key"));
}
/**
* 注入支付監聽器
*/
@Autowired
private PayListener payListener;
/**
* 支付隊列監聽器容器
* @return
*/
@Bean
public SimpleMessageListenerContainer payMessageListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
SimpleMessageListenerContainer listenerContainer = factory.createListenerContainer();
listenerContainer.setMessageListener(payListener);
listenerContainer.setQueues(payQueue());
return listenerContainer;
}
支付隊列和死信隊列的Queue、Exchange、routingkey都已就緒。
看生產者:
@Service
public class OrderService {
@Resource
private SeckillMapper seckillMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
/**
* 下單,操作數據庫
* @param userId
* @param goodsId
*/
@Transactional()
public void order(String userId,String goodsId){
//該商品庫存-1(當庫存>0時)
int count = seckillMapper.reduceGoodsStockById(goodsId);
//更新成功,表明搶單成功,插入下單記錄,支付狀態設為2-待支付
if(count > 0){
OrderRecord orderRecord = new OrderRecord();
orderRecord.setId(CommonUtils.createUUID());
orderRecord.setGoodsId(goodsId);
orderRecord.setUserId(userId);
orderRecord.setPayStatus(2);
seckillMapper.insertOrderRecord(orderRecord);
//將該訂單添加到支付隊列
rabbitTemplate.setExchange(env.getProperty("pay.mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("pay.mq.routing.key"));
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
String json = JSON.toJSONString(orderRecord);
Message msg = MessageBuilder.withBody(json.getBytes()).build();
rabbitTemplate.convertAndSend(msg);
}
}
}
在OrderService中,數據庫操作完成后,將訂單信息發送到死信隊列,死信隊列中的消息會在十五分鍾后進入到支付隊列,等待消費。
再看消費者:
@Component
public class PayListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(PayListener.class);
@Autowired
private PayService payService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Long tag = message.getMessageProperties().getDeliveryTag();
try {
String str = new String(message.getBody(), "utf-8");
logger.info("接收到的消息:{}",str);
JSONObject json = JSON.parseObject(str);
String orderId = json.getString("id");
//確認是否付款
payService.confirmPay(orderId);
//確認消費
channel.basicAck(tag, true);
}catch(Exception e){
logger.info("支付消息消費出錯:{}",e.getMessage());
logger.info("出錯的tag:{}",tag);
}
}
}
PayService:
@Service
public class PayService {
private static final Logger logger = LoggerFactory.getLogger(PayService.class);
@Resource
private SeckillMapper seckillMapper;
/**
* 確認是否支付
* @param orderId
*/
public void confirmPay(String orderId){
OrderRecord orderRecord = seckillMapper.selectNoPayOrderById(orderId);
//根據訂單號校驗該用戶是否已支付
if(checkPay(orderId)){
//已支付
orderRecord.setPayStatus(1);
seckillMapper.updatePayStatus(orderRecord);
logger.info("用戶{}已支付",orderId);
}else{
//未支付
orderRecord.setPayStatus(0);
seckillMapper.updatePayStatus(orderRecord);
//取消支付后,商品庫存+1
seckillMapper.returnStock(orderRecord.getGoodsId());
logger.info("用戶{}未支付",orderId);
}
}
/**
* 模擬判斷訂單支付成功或失敗,成功失敗隨機
* @param orderId
* @return
*/
public boolean checkPay(String orderId){
Random random = new Random();
int res = random.nextInt(2);
return res==0?false:true;
}
這里checkPay()方法模擬調用第三方支付接口來判斷用戶是否已支付。若支付成功,訂單改為已支付狀態,支付失敗,改為已取消狀態,庫存退回。
總結
整個demo,是兩組消息隊列撐起來的,一組訂單消息隊列,一組支付消息隊列,而每一組隊列都是由queue、exchange、routingkey、生產者以及消費者組成。交換機通過routingkey綁定隊列,rabbitTemplate通過指定交換機和routingkey將消息發送到指定隊列,消費者監聽該隊列進行消費。不同的是第二組支付隊列里嵌入了死信隊列來做一個十五分鍾的延遲支付。