【連載】redis庫存操作,分布式鎖的四種實現方式[三]--基於Redis watch機制實現分布式鎖


一、redis的事務介紹

1、 Redis保證一個事務中的所有命令要么都執行,要么都不執行。如果在發送EXEC命令前客戶端斷線了,則Redis會清空事務隊列,事務中的所有命令都不會執行。而一旦客戶端發送了EXEC命令,所有的命令就都會被執行,即使此后客戶端斷線也沒關系,因為Redis中已經記錄了所有要執行的命令。

2、 除此之外,Redis的事務還能保證一個事務內的命令依次執行而不被其他命令插入。試想客戶端A需要執行幾條命令,同時客戶端B發送了一條命令,如果不使用事務,則客戶端B的命令可能會插入到客戶端A的幾條命令中執行。如果不希望發生這種情況,也可以使用事務。

3、 若一個事務中有多條命令,若有一條命令錯誤,事務中的所有命令都不會執行。若在執行階段有命令執行錯誤,其他的命令也會正確的執行,需要注意。

4、與mysql的事務不同,redis的事務執行中時不會回滾的,哪怕出現錯誤,之前已經執行的命令結果也不會回滾。

二、Redis watch介紹

1、 WATCH命令可以監控一個或多個鍵,一旦其中有一個鍵被修改(或刪除),之后的事務就不會執行。監控一直持續到EXEC命令(事務中的命令是在EXEC之后才執行的,所以在MULTI命令后可以修改WATCH監控的鍵值)

2、watch一般配合事務使用

例:啟動一個線程,連接redis,監控key watchKeyTest,sleep10s模擬業務邏輯處理,此時再啟動另一個進程去修改該key的值,那么當前線程就會返回null

/**
 * @author LiJunJun
 * @date 2018/12/10
 */
public class Test {

    private static Jedis jedis;

    static {
        jedis = new Jedis("192.168.10.109", 6379);
        jedis.auth("aaa@leadeon.cn");
        jedis.sadd("watchKeyTest", "290");
    }

    public static void main(String[] args) {

        jedis.watch("watchKeyTest");

        System.out.println("開始監控key: watchKeyTest");

        Transaction transaction = jedis.multi();

        try {
            // sleep 10秒,模擬業務邏輯處理
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("開始獲取key: watchKeyTest");
        transaction.sismember("watchKeyTest", "290");

        List<Object> result = transaction.exec();
        System.out.println("執行結果:" + result);
        jedis.disconnect();
    }
}

啟動另一個進程,修改同一個key

public class Test2 {

    public static void main(String[] args) {

        Jedis jedis = new Jedis("192.168.10.109", 6379);
        jedis.auth("common@leadeon.cn");
        long result = jedis.sadd("watchKeyTest", "358");
        System.out.println(result);
        jedis.disconnect();
    }
}

此時,進程1就會返回null

若在進程1執行期間,該key沒有被其他進程修改,則返回正確的值。

三、實現思路

基於以上介紹的redis的事務以及watch機制,我們可以做分布式鎖處理,即在分布式系統中,高並發情況下,一個線程watch相應的key后,其他進程若修改了key,則該進程所在的事務就不執行,返回null,我們可以增加重試機制,來做庫存操作

四、業務代碼實現

采用watch機制,做樂觀鎖處理,重試三次,三次返回均未成功,則接口返回失敗
    /**
     * 減庫存(基於redis watch機制實現)
     *
     * @param trace 請求流水
     * @param stockManageReq(stockId、decrNum)
     * @return -1為失敗,大於-1的正整數為減后的庫存量,-2為庫存不足無法減庫存
     */
    @Override
    @ApiOperation(value = "減庫存", notes = "減庫存")
    @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) {

        long startTime = System.currentTimeMillis();

        LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq));

        int res = 0;
        String stockId = stockManageReq.getStockId();
        Integer decrNum = stockManageReq.getDecrNum();

        boolean decrByStock = false;

        try {
            if (null != stockId && null != decrNum) {

                stockId = PREFIX + stockId;

                // 采用watch機制,做樂觀鎖處理,重試三次,三次返回均未成功,則接口返回失敗
                for (int i = 0; i < TRY_COUNT; i++) {
                    Integer decrByStockRes = decrByStock(stockId, decrNum, trace);

                    // 更新庫存時key對應的value發生變更,重試
                    if (decrByStockRes != -1) {
                        res = decrByStockRes;
                        decrByStock = true;
                        break;
                    }
                }

                if (!decrByStock) {
                    res = -2;
                    LOGGER.info("本次請求減庫存失敗!decrByStockFailure=1");
                }
            }
        } catch (Exception e) {
            LOGGER.error(trace, "decr sku stock failure.", e);
            res = -1;
        } finally {
            LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res));
        }
        return res;
    }

    /**
     * 減庫存邏輯
     *
     * @param stockId 庫存id
     * @param decrNum 減少的量
     * @return 減庫存結果(-1:表示更新庫存時key對應的value發生變更,即提示調用方重試;-2: 庫存不夠減,售罄;其它值表示減庫存后的值)
     */
    private Integer decrByStock(String stockId, int decrNum, String trace) {

        Response<Long> v = null;
        List<Object> result = null;

        try (Jedis jedis = jedisPool.getWriteResource()) {

            if (!jedis.select(0).equals("OK")) {
                LOGGER.error(trace, "減庫存,本次請求未獲取到jedis連接!");
                return -1;
            }

            jedis.watch(stockId);

            // redis 減庫存邏輯
            String vStock = jedis.get(stockId);

            long realV = 0L;

            if (StringUtils.isNotEmpty(vStock)) {
                realV = Long.parseLong(vStock);
            }
            //庫存數  大於等於 要減的數目,則執行減庫存
            if (realV < decrNum) {

                return -2;
            }

            Transaction transaction = jedis.multi();

            v = transaction.decrBy(stockId, decrNum);

            result = transaction.exec();
        }

        return (result == null || result.isEmpty()) ? -1 : v.get().intValue();
    }

五、ab壓測及分析
同樣的,我們以5000的請求量100的並發量來壓、tps在640左右,比zk做分布式鎖來看,提升了20倍的性能,比redisson分布式鎖提升了2倍,性能提升不大

同時我們發現,5000個請求,有4561個請求失敗,我們看下日志統計,有多少請求沒有成功執行事務

也是4561,說明有4561個事務沒有成功執行,並不是運行錯誤。

六、總結

watch可以用來控制對redis的操作同步執行,但失敗的幾率較大,用該機制做搶購的業務還行,但對redis操作結果依賴較強的業務來說,不太適用,下一篇我們講下終極解決方案,適用redis的lua腳本編程,變相的實現分布式鎖。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM