“分布式鎖”是用來解決分布式應用中“並發沖突”的一種常用手段,實現方式一般有基於zookeeper及基於redis二種。具體到業務場景中,我們要考慮二種情況:
一、搶不到鎖的請求,允許丟棄(即:忽略)
比如:一些不是很重要的場景,比如“監控數據持續上報”,某一篇文章的“已讀/未讀”標識位更新,對於同一個id,如果並發的請求同時到達,只要有一個請求處理成功,就算成功。
用活動圖表示如下:
二、並發請求,不論哪一條都必須要處理的場景(即:不允許丟數據)
比如:一個訂單,客戶正在前台修改地址,管理員在后台同時修改備注。地址和備注字段的修改,都必須正確更新,這二個請求同時到達的話,如果不借助db的事務,很容易造成行鎖競爭,但用事務的話,db的性能顯然比不上redis輕量。
解決思路:A,B二個請求,誰先搶到分布式鎖(假設A先搶到鎖),誰先處理,搶不到的那個(即:B),在一旁不停等待重試,重試期間一旦發現獲取鎖成功,即表示A已經處理完,把鎖釋放了。這時B就可以繼續處理了。
但有二點要注意:
a、需要設置等待重試的最長時間,否則如果A處理過程中有bug,一直卡死,或者未能正確釋放鎖,B就一直會等待重試,但是又永遠拿不到鎖。
b、等待最長時間,必須小於鎖的過期時間。否則,假設鎖2秒過期自動釋放,但是A還沒處理完(即:A的處理時間大於2秒),這時鎖會因為redis key過期“提前”誤釋放,B重試時拿到鎖,造成A,B同時處理。(注:可能有同學會說,不設置鎖的過期時間,不就完了么?理論上講,確實可以這么做,但是如果業務代碼有bug,導致處理完后沒有unlock,或者根本忘記了unlock,分布式鎖就會一直無法釋放。所以綜合考慮,給分布式鎖加一個“保底”的過期時間,讓其始終有機會自動釋放,更為靠譜)
用活動圖表示如下:
寫了一個簡單的工具類:
package com.cnblogs.yjmyzz.redisdistributionlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.util.StringUtils; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 利用redis獲取分布式鎖 * * @author 菩提樹下的楊過 * @blog http://yjmyzz.cnblogs.com/ */ public class RedisLock { private StringRedisTemplate redisTemplate; private Logger logger = LoggerFactory.getLogger(this.getClass()); /** * simple lock嘗試獲取鍋的次數 */ private int retryCount = 3; /** * 每次嘗試獲取鎖的重試間隔毫秒數 */ private int waitIntervalInMS = 100; public RedisLock(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 利用redis獲取分布式鎖(未獲取鎖的請求,允許丟棄!) * * @param redisKey 鎖的key值 * @param expireInSecond 鎖的自動釋放時間(秒) * @return * @throws DistributionLockException */ public String simpleLock(final String redisKey, final int expireInSecond) throws DistributionLockException { String lockValue = UUID.randomUUID().toString(); boolean flag = false; if (StringUtils.isEmpty(redisKey)) { throw new DistributionLockException("key is empty!"); } if (expireInSecond <= 0) { throw new DistributionLockException("expireInSecond must be bigger than 0"); } try { for (int i = 0; i < retryCount; i++) { boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS); if (success) { flag = true; break; } try { TimeUnit.MILLISECONDS.sleep(waitIntervalInMS); } catch (Exception ignore) { logger.warn("redis lock fail: " + ignore.getMessage()); } } if (!flag) { throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ..."); } return lockValue; } catch (DistributionLockException be) { throw be; } catch (Exception e) { logger.warn("get redis lock error, exception: " + e.getMessage()); throw e; } } /** * 利用redis獲取分布式鎖(未獲取鎖的請求,將在timeoutSecond時間范圍內,一直等待重試) * * @param redisKey 鎖的key值 * @param expireInSecond 鎖的自動釋放時間(秒) * @param timeoutSecond 未獲取到鎖的請求,嘗試重試的最久等待時間(秒) * @return * @throws DistributionLockException */ public String lock(final String redisKey, final int expireInSecond, final int timeoutSecond) throws DistributionLockException { String lockValue = UUID.randomUUID().toString(); boolean flag = false; if (StringUtils.isEmpty(redisKey)) { throw new DistributionLockException("key is empty!"); } if (expireInSecond <= 0) { throw new DistributionLockException("expireInSecond must be greater than 0"); } if (timeoutSecond <= 0) { throw new DistributionLockException("timeoutSecond must be greater than 0"); } if (timeoutSecond >= expireInSecond) { throw new DistributionLockException("timeoutSecond must be less than expireInSecond"); } try { long timeoutAt = System.currentTimeMillis() + timeoutSecond * 1000; while (true) { boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS); if (success) { flag = true; break; } if (System.currentTimeMillis() >= timeoutAt) { break; } try { TimeUnit.MILLISECONDS.sleep(waitIntervalInMS); } catch (Exception ignore) { logger.warn("redis lock fail: " + ignore.getMessage()); } } if (!flag) { throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ..."); } return lockValue; } catch (DistributionLockException be) { throw be; } catch (Exception e) { logger.warn("get redis lock error, exception: " + e.getMessage()); throw e; } } /** * 鎖釋放 * * @param redisKey * @param lockValue */ public void unlock(final String redisKey, final String lockValue) { if (StringUtils.isEmpty(redisKey)) { return; } if (StringUtils.isEmpty(lockValue)) { return; } try { String currLockVal = redisTemplate.opsForValue().get(redisKey); if (currLockVal != null && currLockVal.equals(lockValue)) { boolean result = redisTemplate.delete(redisKey); if (!result) { logger.warn(Thread.currentThread().getName() + " unlock redis lock fail"); } else { logger.info(Thread.currentThread().getName() + " unlock redis lock:" + redisKey + " successfully!"); } } } catch (Exception je) { logger.warn(Thread.currentThread().getName() + " unlock redis lock error:" + je.getMessage()); } } }
然后寫個spring-boot來測試一下:
package com.cnblogs.yjmyzz.redisdistributionlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootApplication public class RedisDistributionLockApplication { private static Logger logger = LoggerFactory.getLogger(RedisDistributionLockApplication.class); public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext applicationContext = SpringApplication.run(RedisDistributionLockApplication.class, args); //初始化 StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class); RedisLock redisLock = new RedisLock(redisTemplate); String lockKey = "lock:test"; CountDownLatch start = new CountDownLatch(1); CountDownLatch threadsLatch = new CountDownLatch(2); final int lockExpireSecond = 5; final int timeoutSecond = 3; Runnable lockRunnable = () -> { String lockValue = ""; try { //等待發令槍響,防止線程搶跑 start.await(); //允許丟數據的簡單鎖示例 lockValue = redisLock.simpleLock(lockKey, lockExpireSecond); //不允許丟數據的分布式鎖示例 //lockValue = redisLock.lock(lockKey, lockExpireSecond, timeoutSecond); //停一會兒,故意讓后面的線程搶不到鎖 TimeUnit.SECONDS.sleep(2); logger.info(String.format("%s get lock successfully, value:%s", Thread.currentThread().getName(), lockValue)); } catch (Exception e) { e.printStackTrace(); } finally { redisLock.unlock(lockKey, lockValue); //執行完后,計數減1 threadsLatch.countDown(); } }; Thread t1 = new Thread(lockRunnable, "T1"); Thread t2 = new Thread(lockRunnable, "T2"); t1.start(); t2.start(); //預備:開始! start.countDown(); //等待所有線程跑完 threadsLatch.await(); logger.info("======>done!!!"); } }
用2個線程模擬並發場景,跑起來后,輸出如下:
可以看到T2線程沒搶到鎖,直接拋出了預期的異常。
把44行的注釋打開,即:換成不允許丟數據的模式,再跑一下:
可以看到,T1先搶到鎖,然后經過2秒的處理后,鎖釋放,這時T2重試拿到了鎖,繼續處理,最終釋放。