使用RabbitMQ最終一致性庫存解鎖


一、基本介紹

①延時隊列(實現定時任務)

場景:比如未付款訂單,超過一定時間后,系統自動取消訂單並釋放占有物品。

常用解決方案: spring的 schedule定時任務輪詢數據庫:
缺點:消耗系統內存、增加了數據庫的壓力、存在較大的時間誤差
解決: rabbitmqExchange的消息TTL和死信結合

②消息的TL(Time To Live)消息的TTL就是消息的存活時間。

 RabbitMQ可以對隊列和消息分別設置TTL
- 對隊列設置就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。
- 如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延遲任務的關鍵。可以通過設置消息的 expiration-message-字段或者x--ttl屬性來設置時間,兩者是一樣的效果。

③ Dead Letter Exchanges (DLX)

一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。(什么是死信)
- 一個消息被Consumer拒收了,並且 reject方法的參數里是。也就是說不會被再次放在隊列里,被其他消費者使用。(basic.reject/basic.nack) requeue=false上面的消息的TTL到了,消息過期了。-
- 一隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上
 Dead Letter Exchangeexch其實就是一種普通的,和創建其他exchange沒有兩樣。只是在某一個設置 Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到 Dead Letter Exchange中去。
·我們既可以控制消息在一段時間后變成死信,又可以控制變成死信的消息被路由到某一個指定的交換機,結合二者,其實就可以實現一個延時隊列

二、推薦:給隊列設置延時時間

①:因為RabbitMQ采用惰性檢查機制

RabbitMq采用惰性檢查機制,也就是懶檢查機制:比如消息隊列中存放了多條消息,第一條是5分鍾過期,第二條是1分鍾過期,第三條是1秒鍾過期,按照正常的過期邏輯,應該是1秒過期的先排出這個隊列,進入死信隊列中,但是實際RabbitMQ是先拿第一條消息,也就是5分鍾過期的,一看5分鍾還沒到過期時間,然后等待5分鍾會將第一條消息拿出來,放入死信隊列,這里就會出現問題,第二條設置1分鍾的和第三條設置1秒鍾的消息必須要等待第一條5分鍾過期后才能過期,等待第一條消息過期5分鍾了,拿第二條、三條的時候都不需要判斷就已經過期了,直接就放入死信隊列中,所以第二條、三條需要等待第一條消息過了5分鍾才能過期,這樣的延時根本就沒產生對應的效果。

②:理論結構圖

③:項目結構圖

 ④:代碼實現

4.1:基礎設置

4.1.1:創建信道、隊列、路由(結構圖)

4.1.2:MyRabbit配置

@Configuration
public class MyRabbitConfig {
    /**
     * 使用JSON序列化機制,進行消息轉移
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Exchange stockEventExchange() {
        TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
        return topicExchange;
    }

    @Bean
    public Queue stockReleaseStockQueue() {
        return new Queue("stock.release.stock.queue", true, false, false);
    }

@Bean
public Queue stockDelayQueue() { HashMap<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "stock-event-exchange"); args.put("x-dead-letter-routing-key", "stock.release"); args.put("x-message-ttl", 120000); //延時2min return new Queue("stock.delay.queue", true, false, false,args); } @Bean public Binding stockReleaseBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); } @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }

 4.1.3:設置監聽器

@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockRelaeaseListener {
    @Autowired
    WareSkuService wareskuService;

    @RabbitHandler
    public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
        System.out.println("收到解鎖信息");
        try {
            wareskuService.unlockStock(to); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

    @RabbitHandler
    public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {

        System.out.println("訂單關閉准備解鎖庫存");
        try {
            wareskuService.unlockStock(orderTo); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }
}

4.2、wareskuService類:

4.2.1:orderLockStock方法:

    /**
     * 為某個訂單鎖定庫存
     *
     * @param vo
     * @return
     * @Transactional(rollbackFor = NoStockException.class)運行出現異常時回滾
     */
    @Transactional
    @Override
    public Boolean orderLockStock(WareSkuLockVo vo) {
        /**
         * 保存庫存工作單的詳情,為了追溯哪個倉庫鎖了多少
         */
        WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
        taskEntity.setOrderSn(vo.getOrderSn());
        orderTaskService.save(taskEntity);

        //1、按照下單的收獲地址,找到就近倉庫進行鎖定庫存
        List<OrderItemVo> locks = vo.getLocks();

        List<SkuWareHasStock> collect = locks.stream().map(item -> {
            SkuWareHasStock stock = new SkuWareHasStock();
            Long skuId = item.getSkuId();
            stock.setSkuId(skuId);
            stock.setNum(item.getCount());
            //查詢這個商品在哪里有庫存
            List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
            stock.setWareId(wareIds);
            return stock;
        }).collect(Collectors.toList());

        for (SkuWareHasStock hasStock : collect) {
            Boolean skuStocked = false;
            Long skuId = hasStock.getSkuId();
            List<Long> wareIds = hasStock.getWareId();
            if (wareIds == null || wareIds.size() == 0) {
                //沒有庫存拋出異常
                throw new NoStockException(skuId);
            }
            //如果每一個商品都鎖成功,將當前商品鎖定了幾件發送給MQ
            //如果鎖定失敗,前面保存的工作單信息就回滾了。
            for (Long wareId : wareIds) {
                //成功就返回1,否則就是0
                Long count = wareSkuDao.lockSkuStock(skuId, wareId, hasStock.getNum());
                if (count == 1) {
                    //TODO:表明鎖住了,發消息告訴MQ庫存鎖定成功
                    //在數據表wms_ware_order_task_detail中存入庫存單(*倉庫/*商品/*數量/被鎖*件)做記號
                    WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
                    orderTaskDetailService.save(entity);
                    StockLockedTo lockedTo = new StockLockedTo();
                    lockedTo.setId(taskEntity.getId());
                    StockDetailTo stockDetailTo = new StockDetailTo();
                    //拷貝屬性和數值
                    BeanUtils.copyProperties(entity, stockDetailTo);
                    //防止wms_ware_order_task_detail表內數據因為回滾丟失,所以new一個StockLockedTo類記錄失敗提交的數據
                    lockedTo.setDetail(stockDetailTo);
                    //將庫存工作單的詳情放入exchange中
                    rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);
                    skuStocked = true;
                    break;
                } else {
                    //鎖失敗了,重試下一個倉庫
                }
            }
            if (skuStocked == false) {
                //當前商品所有倉庫都沒鎖住庫存數量
                throw new NoStockException(skuId);
            }
        }
        //肯定全部都是鎖定
        return true;
    }

4.2.1、unlockStock方法:

    @Override
    public void unlockStock(StockLockedTo to) {
        StockDetailTo detail = to.getDetail();
        Long detailId = detail.getId();
        /**
         * 1、查詢數據庫wms_ware_order_task_detail表關於這個訂單的鎖定庫存信息
         * ①表里有關於鎖庫存的信息,隊列設置延時時間,檢查訂單的狀態,確認是否需要進行解鎖
         * 1.1:解鎖前查看訂單情況:則需要解鎖庫存
         * 1.1.1查看訂單狀態,查看訂單狀態,若訂單已取消則必須解鎖庫存
         * 1.1.2查看訂單狀態,訂單未取消則不能解鎖庫存
         *
         * 1.2:如果沒有訂單情況:則必須解鎖庫存
         *
         * ②沒有則代表整個庫存鎖定失敗,事務回滾了,這種情況無需解鎖
         *
         * 只要解鎖庫存失敗,利用手動模式
         */
        WareOrderTaskDetailEntity byId = orderTaskDetailService.getById(detailId);
        if (byId != null) {
            //解鎖
            Long id = to.getId();
            WareOrderTaskEntity taskEntity = orderTaskService.getById(id);
            String orderSn = taskEntity.getOrderSn();//根據訂單號查詢訂單狀態
            //查找訂單是否創建成功,此處遠程調用會因為攔截器需要先登錄,因此按4.3進行修改
            R r = orderFeignService.getOrderStatus(orderSn);
            if (r.getCode() == 0) {
                //訂單數據返回成功
                OrderVo data = r.getData(new TypeReference<OrderVo>() {
                });
                //只有訂單狀態是取消狀態/或者訂單不存在才可以解鎖.4為狀態碼代表訂單是取消狀態
                System.out.println("Data1:" + data);
                if (data == null || data.getStatus() == 4) {
                    //當前庫存單詳情狀態1已鎖定但是未解鎖才可以解鎖
                    unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
                 /*   if (byId.getLockStatus() == 1) {
                        System.out.println("Data2:" + data);

                    }*/
                }
            } else {
                //消息拒絕以后重新放入隊列里,讓其他人繼續消費解鎖
                throw new RuntimeException("遠程服務失敗");
            }
        }
    }

 4.2.2:unLockStock方法:

    //庫存解鎖方法
    private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
        wareSkuDao.unlockStock(skuId, wareId, num);
    }

//wareSkuDao.xml中更新代碼:

/**<update id="unlockStock">
    UPDATE wms_ware_sku SET stock_locked = stock_locked- #{num} WHERE sku_id= #{skuId} AND ware_id=#{wareId}
</update>**/

 4.2.3:unlockStock(OrderTo orderTo)方法:

 /*
    *防止訂單服務卡頓,導致訂單消息一直更改不了,庫存優先到期,查訂單狀態新建狀態,什么都做不了就走了
    *導致卡頓的訂單,永遠不能解鎖
    */
    @Transactional
    @Override
    public void unlockStock(OrderTo orderTo) {
        String orderSn = orderTo.getOrderSn();
        //進行到這一步再查一下最新的狀態
       WareOrderTaskEntity task = orderTaskService.getOrderTaskByOrderSn(orderSn);
       //獲取was_ware_order_task中的id,從而以其獲取was_ware_order_task_detail狀態為1(未解鎖)的庫存
        Long id = task.getId();
        List<WareOrderTaskDetailEntity> entities = orderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1));
        //Long skuId, Long wareId, Integer num, Long taskDetailId
        for (WareOrderTaskDetailEntity entity:entities){
            unLockStock(entity.getSkuId(),entity.getWareId(),entity.getSkuNum(),entity.getId());
        }
    }

4.3:攔截器修改

@Component
public class LoginUserInterceptor implements HandlerInterceptor {
    public static ThreadLocal<MemberResVo> loginUser = new ThreadLocal<>();
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

        //此地址下不進行攔截
        String uri = request.getRequestURI(); boolean match = new AntPathMatcher().match("/order/order/status/**", uri); boolean match1 = new AntPathMatcher().match("/payed/notify", uri); if (match || match1){ return true; }
//獲取登錄用戶的鍵 MemberResVo attribute = (MemberResVo) request.getSession().getAttribute(AuthServerConstant.LONG_USER); if (attribute!=null){ loginUser.set(attribute); return true; }else { request.getSession().setAttribute("msg","請先進行登錄!"); response.sendRedirect("http://auth.gulimall.com/login.html"); return false; } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM