基於redis的分布式鎖二種應用場景


“分布式鎖”是用來解決分布式應用中“並發沖突”的一種常用手段,實現方式一般有基於zookeeper及基於redis二種。具體到業務場景中,我們要考慮二種情況:

一、搶不到鎖的請求,允許丟棄(即:忽略)

比如:一些不是很重要的場景,比如“監控數據持續上報”,某一篇文章的“已讀/未讀”標識位更新,對於同一個id,如果並發的請求同時到達,只要有一個請求處理成功,就算成功。

用活動圖表示如下:

點擊查看原圖

 

二、並發請求,不論哪一條都必須要處理的場景(即:不允許丟數據)

比如:一個訂單,客戶正在前台修改地址,管理員在后台同時修改備注。地址和備注字段的修改,都必須正確更新,這二個請求同時到達的話,如果不借助db的事務,很容易造成行鎖競爭,但用事務的話,db的性能顯然比不上redis輕量。

解決思路:A,B二個請求,誰先搶到分布式鎖(假設A先搶到鎖),誰先處理,搶不到的那個(即:B),在一旁不停等待重試,重試期間一旦發現獲取鎖成功,即表示A已經處理完,把鎖釋放了。這時B就可以繼續處理了。

但有二點要注意:

a、需要設置等待重試的最長時間,否則如果A處理過程中有bug,一直卡死,或者未能正確釋放鎖,B就一直會等待重試,但是又永遠拿不到鎖。

b、等待最長時間,必須小於鎖的過期時間。否則,假設鎖2秒過期自動釋放,但是A還沒處理完(即:A的處理時間大於2秒),這時鎖會因為redis key過期“提前”誤釋放,B重試時拿到鎖,造成A,B同時處理。(注:可能有同學會說,不設置鎖的過期時間,不就完了么?理論上講,確實可以這么做,但是如果業務代碼有bug,導致處理完后沒有unlock,或者根本忘記了unlock,分布式鎖就會一直無法釋放。所以綜合考慮,給分布式鎖加一個“保底”的過期時間,讓其始終有機會自動釋放,更為靠譜)

用活動圖表示如下:

點擊查看原圖

寫了一個簡單的工具類:

package com.cnblogs.yjmyzz.redisdistributionlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.StringUtils;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 利用redis獲取分布式鎖
 *
 * @author 菩提樹下的楊過
 * @blog http://yjmyzz.cnblogs.com/
 */
public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * simple lock嘗試獲取鍋的次數
     */
    private int retryCount = 3;

    /**
     * 每次嘗試獲取鎖的重試間隔毫秒數
     */
    private int waitIntervalInMS = 100;


    public RedisLock(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 利用redis獲取分布式鎖(未獲取鎖的請求,允許丟棄!)
     *
     * @param redisKey       鎖的key值
     * @param expireInSecond 鎖的自動釋放時間(秒)
     * @return
     * @throws DistributionLockException
     */
    public String simpleLock(final String redisKey, final int expireInSecond) throws DistributionLockException {
        String lockValue = UUID.randomUUID().toString();
        boolean flag = false;
        if (StringUtils.isEmpty(redisKey)) {
            throw new DistributionLockException("key is empty!");
        }
        if (expireInSecond <= 0) {
            throw new DistributionLockException("expireInSecond must be bigger than 0");
        }
        try {
            for (int i = 0; i < retryCount; i++) {
                boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS);
                if (success) {
                    flag = true;
                    break;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
                } catch (Exception ignore) {
                    logger.warn("redis lock fail: " + ignore.getMessage());

                }
            }
            if (!flag) {
                throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ...");
            }
            return lockValue;
        } catch (DistributionLockException be) {
            throw be;
        } catch (Exception e) {
            logger.warn("get redis lock error, exception: " + e.getMessage());
            throw e;
        }
    }

    /**
     * 利用redis獲取分布式鎖(未獲取鎖的請求,將在timeoutSecond時間范圍內,一直等待重試)
     *
     * @param redisKey       鎖的key值
     * @param expireInSecond 鎖的自動釋放時間(秒)
     * @param timeoutSecond  未獲取到鎖的請求,嘗試重試的最久等待時間(秒)
     * @return
     * @throws DistributionLockException
     */
    public String lock(final String redisKey, final int expireInSecond, final int timeoutSecond) throws DistributionLockException {
        String lockValue = UUID.randomUUID().toString();
        boolean flag = false;
        if (StringUtils.isEmpty(redisKey)) {
            throw new DistributionLockException("key is empty!");
        }
        if (expireInSecond <= 0) {
            throw new DistributionLockException("expireInSecond must be greater than 0");
        }
        if (timeoutSecond <= 0) {
            throw new DistributionLockException("timeoutSecond must be greater than 0");
        }
        if (timeoutSecond >= expireInSecond) {
            throw new DistributionLockException("timeoutSecond must be less than expireInSecond");
        }
        try {
            long timeoutAt = System.currentTimeMillis() + timeoutSecond * 1000;
            while (true) {
                boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS);
                if (success) {
                    flag = true;
                    break;
                }
                if (System.currentTimeMillis() >= timeoutAt) {
                    break;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
                } catch (Exception ignore) {
                    logger.warn("redis lock fail: " + ignore.getMessage());
                }
            }
            if (!flag) {
                throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ...");
            }
            return lockValue;
        } catch (DistributionLockException be) {
            throw be;
        } catch (Exception e) {
            logger.warn("get redis lock error, exception: " + e.getMessage());
            throw e;
        }
    }


    /**
     * 鎖釋放
     *
     * @param redisKey
     * @param lockValue
     */
    public void unlock(final String redisKey, final String lockValue) {
        if (StringUtils.isEmpty(redisKey)) {
            return;
        }
        if (StringUtils.isEmpty(lockValue)) {
            return;
        }
        try {
            String currLockVal = redisTemplate.opsForValue().get(redisKey);
            if (currLockVal != null && currLockVal.equals(lockValue)) {
                boolean result = redisTemplate.delete(redisKey);
                if (!result) {
                    logger.warn(Thread.currentThread().getName() + " unlock redis lock fail");
                } else {
                    logger.info(Thread.currentThread().getName() + " unlock redis lock:" + redisKey + " successfully!");
                }
            }
        } catch (Exception je) {
            logger.warn(Thread.currentThread().getName() + " unlock redis lock error:" + je.getMessage());
        }
    }
}

  

然后寫個spring-boot來測試一下:

package com.cnblogs.yjmyzz.redisdistributionlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class RedisDistributionLockApplication {

    private static Logger logger = LoggerFactory.getLogger(RedisDistributionLockApplication.class);

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(RedisDistributionLockApplication.class, args);

        //初始化
        StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
        RedisLock redisLock = new RedisLock(redisTemplate);
        String lockKey = "lock:test";


        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch threadsLatch = new CountDownLatch(2);

        final int lockExpireSecond = 5;
        final int timeoutSecond = 3;

        Runnable lockRunnable = () -> {
            String lockValue = "";
            try {
                //等待發令槍響,防止線程搶跑
                start.await();

                //允許丟數據的簡單鎖示例
                lockValue = redisLock.simpleLock(lockKey, lockExpireSecond);


                //不允許丟數據的分布式鎖示例
                //lockValue = redisLock.lock(lockKey, lockExpireSecond, timeoutSecond);

                //停一會兒,故意讓后面的線程搶不到鎖
                TimeUnit.SECONDS.sleep(2);
                logger.info(String.format("%s get lock successfully, value:%s", Thread.currentThread().getName(), lockValue));

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unlock(lockKey, lockValue);
                //執行完后,計數減1
                threadsLatch.countDown();
            }

        };

        Thread t1 = new Thread(lockRunnable, "T1");
        Thread t2 = new Thread(lockRunnable, "T2");

        t1.start();
        t2.start();

        //預備:開始!
        start.countDown();

        //等待所有線程跑完
        threadsLatch.await();

        logger.info("======>done!!!");

    }

}

 用2個線程模擬並發場景,跑起來后,輸出如下:

點擊查看原圖

可以看到T2線程沒搶到鎖,直接拋出了預期的異常。

把44行的注釋打開,即:換成不允許丟數據的模式,再跑一下:

點擊查看原圖

可以看到,T1先搶到鎖,然后經過2秒的處理后,鎖釋放,這時T2重試拿到了鎖,繼續處理,最終釋放。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM