關於Redisson MultiLock 的改良


1. 背景:什么時候需要聯鎖(MultiLock)?

     當我們需要對多個實例進行鎖定,禁止別人同時修改任意一個鎖定的實例,我們就需要一個聯鎖(MultiLock);
     比如業務上,我需要同時操作1000條單據,處理過程是原子的,無法拆分;那么我們就必須使用上聯鎖(當然樂觀鎖也可以是一種選擇);

2. 我想要什么:優化Redisson MultiLock在非常大的鎖數量時的性能問題

     在Redission官方提供的聯鎖(MultiLock)示例中,在如下示例中,即使使用同一個redissonInstrance實例,IO消耗還是令人沮喪。


RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 所有的鎖都上鎖成功才算成功。
lock.lock();
...
lock.unlock();


MultiLock性能測試


     上圖是我在本地同一機器上測試的性能趨勢,可以看到上鎖時間,隨着鎖的數量,線性上升(由於圖標X軸是非線性的,所以折線無法體現);平均下來每增加一個鎖,就要增加1~2毫秒。如果有成千上萬個鎖對象,那么時間消耗會是非常難以忽視的;

3. 解決辦法:使用Redis中的Hash數據結構來實現一個MultiLock

     基本思路是,把所有需要鎖定的對象集合,放到一個公共緩存空間中,每次執行鎖定任務時,檢查是否有鎖對象已經在緩存空間中了,如果任一鎖對象已經存在,那么lock fail,如果不存在,那么將所有鎖對象存到緩存空間中。
     當任務執行完畢后,將之前鎖定的鎖對象集合,從緩存空間中移除;
     如果只是這樣,那么redis的Set數據結構已經夠用了,但是因為在Set中的數據沒法做獨立的過期時間設置,如果一個鎖對象沒有清除成功,那么其他任何線程都無法在對這個鎖對象進行操作;
     所以這里使用Hash的key來存儲鎖對象集合,value是每個鎖對象的過期時間
     那么當任務執行時,需要比對是否有鎖對象已經在緩存空間的同時,如果存在,還要校驗緩存空間的鎖對象是否已經過期


/**
     * 同步執行任務
     *
     * @param leaseTime              鎖占用釋放時間,超時會自動釋放鎖對象, 如果鎖對象太多,leaseTime不能小於上鎖的耗時
     * @param timeUnit               時間單位
     * @param runnable               待執行的任務,不返回任何值
     * @param lockName             鎖名稱,避免不同地方使用同一個鎖名稱
     * @param multiLockValue         鎖名稱集合
     * @param lockAcquireFailMessage 鎖獲取失敗的時候,log.error 的錯誤信息
     */
    public <T> void runWithMultiLock(int leaseTime, TimeUnit timeUnit, Runnable runnable, String lockName, List<T> multiLockValue, String lockAcquireFailMessage) {

        Long start = System.currentTimeMillis();

        Assert.notEmpty(multiLockValue, "multiLockValue can not be empty!");

        String[] multiLockValueStrArr = new String[multiLockValue.size()];
        int i = 0;
        for (T t : multiLockValue) {
            if (t == null) {
                throw new IllegalArgumentException("lock value can not be null!");
            }
            multiLockValueStrArr[i] = String.valueOf(t);
            ++i;
        }
        long afterConvert = System.currentTimeMillis();

        String realLockName = MULTI_LOCK_MAP_PREFIX + lockName;

        boolean lockSuccess = tryLockWithMultiLock(realLockName, multiLockValueStrArr, leaseTime, timeUnit);

        Long afterLock = System.currentTimeMillis();

        if (lockSuccess) {
            try {
                runnable.run();
            } finally {
                releaseMultiLock(realLockName, multiLockValueStrArr);
            }

            Long finished = System.currentTimeMillis();
            log.info("afterConvert:{}, lockTime:{}, releaseTime:{}", afterConvert - start, afterLock - afterConvert, finished - afterLock );

        } else {
            log.error("DistributionSyncJob execute error! lock require fail, \r\n errorMsg:{}", lockAcquireFailMessage);
            throw new LockFailException();
        }
    }






    /**
     * 釋放鎖
     * @param lockName 鎖名
     * @param multiLockValue 多個key值
     */
    public void releaseMultiLock(String lockName, String[] multiLockValue) {
        RMap<String, Long> originalLocksAndExpires = redissonClient.getMap(lockName);
        originalLocksAndExpires.fastRemove(multiLockValue);
    }


    /**
     * 上鎖方法
     * 如果multiLockValue size 太大,那么可能執行上鎖的時間太長
     * @param lockName 鎖名
     * @param multiLockValue 多個key值
     * @param leaseTime 釋放時間限制
     * @param timeUnit 時間單位
     * @return
     */
    public boolean tryLockWithMultiLock(String lockName, String[] multiLockValue, int leaseTime, TimeUnit timeUnit) {

        return this.execute(() -> {
            //原始的lock集合(RMap是沒有本次緩存的,所以基於RMap的每次操作都是一次IO)
            RMap<String, Long> originalLocksAndExpires = redissonClient.getMap(lockName);
            //需要新增的lock集合
            HashMap<String, Long> addOnLockAndExpires = new HashMap<>(multiLockValue.length);
            //新增lock集合的stl時間
            long newExpireTime = System.currentTimeMillis() + timeUnit.toMillis(leaseTime);
            //在這里一次獲取,緩存本地,而不是在for循環內,循環獲取(循環IO)
            Map<String, Long> repeatLockAndExpires = originalLocksAndExpires.getAll(Arrays.stream(multiLockValue).collect(Collectors.toSet()));

            for (String lockValue : multiLockValue) {
                if (repeatLockAndExpires != null && ! repeatLockAndExpires.isEmpty()) {
                    Long expireTime = repeatLockAndExpires.get(lockValue);
                    if (expireTime != null
                            && expireTime > System.currentTimeMillis()) {
                        //如果任何一個lockValue已經存在,且過期時間大於當前時間,那么獲鎖失敗
                        return false;
                    }
                }

                addOnLockAndExpires.put(lockValue, newExpireTime);
            }
            //新鎖加入RMap中
            originalLocksAndExpires.putAll(addOnLockAndExpires);
            //重新設置map的整體過期時間
            originalLocksAndExpires.expire(defaultMultiLockMapExpireTime, defaultTimeUnit);

            if (newExpireTime <= System.currentTimeMillis()) {
                //整體的multiLock上鎖時間,超過了multiLock的leaseTime,這意味着,上鎖完成后,就已經部分鎖失效了
                //所以不能算作上鎖成功
                return false;
            }
            return true;

        }, lockName + "_OUT_LOCK");
    }

    改造后的性能趨勢如圖

改造后的性能趨勢


4. 缺點 及 待優化:

  1. 還不支持象java原生鎖一樣能夠在獲取不到鎖的時候阻塞住,直到獲取鎖;當然可以優化一下代碼,做成循環嘗試獲取鎖對象的方式,但是在大數據量的情況下並不划算;
  2. 目前還是只支持單節點,如果redis節點掛掉,那么就無法正常工作
  3. 如果沒有把鎖釋放,程序崩潰了,那么可能這個鎖對象會長時間在緩存空間中,雖然有過期時間,對別的線程影響不是特別大,但是還是會占用空間,成為廢數據。以后可以加一個定時清除過期鎖對象的定時任務。
  4. 還不支持鎖重入


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM