“分布式鎖”是用來解決分布式應用中“並發沖突”的一種常用手段,實現方式一般有基於zookeeper及基於redis二種。具體到業務場景中,我們要考慮二種情況:
一、搶不到鎖的請求,允許丟棄(即:忽略)
比如:一些不是很重要的場景,比如“監控數據持續上報”,某一篇文章的“已讀/未讀”標識位更新,對於同一個id,如果並發的請求同時到達,只要有一個請求處理成功,就算成功。
用活動圖表示如下:

二、並發請求,不論哪一條都必須要處理的場景(即:不允許丟數據)
比如:一個訂單,客戶正在前台修改地址,管理員在后台同時修改備注。地址和備注字段的修改,都必須正確更新,這二個請求同時到達的話,如果不借助db的事務,很容易造成行鎖競爭,但用事務的話,db的性能顯然比不上redis輕量。
解決思路:A,B二個請求,誰先搶到分布式鎖(假設A先搶到鎖),誰先處理,搶不到的那個(即:B),在一旁不停等待重試,重試期間一旦發現獲取鎖成功,即表示A已經處理完,把鎖釋放了。這時B就可以繼續處理了。
但有二點要注意:
a、需要設置等待重試的最長時間,否則如果A處理過程中有bug,一直卡死,或者未能正確釋放鎖,B就一直會等待重試,但是又永遠拿不到鎖。
b、等待最長時間,必須小於鎖的過期時間。否則,假設鎖2秒過期自動釋放,但是A還沒處理完(即:A的處理時間大於2秒),這時鎖會因為redis key過期“提前”誤釋放,B重試時拿到鎖,造成A,B同時處理。(注:可能有同學會說,不設置鎖的過期時間,不就完了么?理論上講,確實可以這么做,但是如果業務代碼有bug,導致處理完后沒有unlock,或者根本忘記了unlock,分布式鎖就會一直無法釋放。所以綜合考慮,給分布式鎖加一個“保底”的過期時間,讓其始終有機會自動釋放,更為靠譜)
用活動圖表示如下:

寫了一個簡單的工具類:
package com.cnblogs.yjmyzz.redisdistributionlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.StringUtils;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 利用redis獲取分布式鎖
*
* @author 菩提樹下的楊過
* @blog http://yjmyzz.cnblogs.com/
*/
public class RedisLock {
private StringRedisTemplate redisTemplate;
private Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* simple lock嘗試獲取鍋的次數
*/
private int retryCount = 3;
/**
* 每次嘗試獲取鎖的重試間隔毫秒數
*/
private int waitIntervalInMS = 100;
public RedisLock(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 利用redis獲取分布式鎖(未獲取鎖的請求,允許丟棄!)
*
* @param redisKey 鎖的key值
* @param expireInSecond 鎖的自動釋放時間(秒)
* @return
* @throws DistributionLockException
*/
public String simpleLock(final String redisKey, final int expireInSecond) throws DistributionLockException {
String lockValue = UUID.randomUUID().toString();
boolean flag = false;
if (StringUtils.isEmpty(redisKey)) {
throw new DistributionLockException("key is empty!");
}
if (expireInSecond <= 0) {
throw new DistributionLockException("expireInSecond must be bigger than 0");
}
try {
for (int i = 0; i < retryCount; i++) {
boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS);
if (success) {
flag = true;
break;
}
try {
TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
} catch (Exception ignore) {
logger.warn("redis lock fail: " + ignore.getMessage());
}
}
if (!flag) {
throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ...");
}
return lockValue;
} catch (DistributionLockException be) {
throw be;
} catch (Exception e) {
logger.warn("get redis lock error, exception: " + e.getMessage());
throw e;
}
}
/**
* 利用redis獲取分布式鎖(未獲取鎖的請求,將在timeoutSecond時間范圍內,一直等待重試)
*
* @param redisKey 鎖的key值
* @param expireInSecond 鎖的自動釋放時間(秒)
* @param timeoutSecond 未獲取到鎖的請求,嘗試重試的最久等待時間(秒)
* @return
* @throws DistributionLockException
*/
public String lock(final String redisKey, final int expireInSecond, final int timeoutSecond) throws DistributionLockException {
String lockValue = UUID.randomUUID().toString();
boolean flag = false;
if (StringUtils.isEmpty(redisKey)) {
throw new DistributionLockException("key is empty!");
}
if (expireInSecond <= 0) {
throw new DistributionLockException("expireInSecond must be greater than 0");
}
if (timeoutSecond <= 0) {
throw new DistributionLockException("timeoutSecond must be greater than 0");
}
if (timeoutSecond >= expireInSecond) {
throw new DistributionLockException("timeoutSecond must be less than expireInSecond");
}
try {
long timeoutAt = System.currentTimeMillis() + timeoutSecond * 1000;
while (true) {
boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS);
if (success) {
flag = true;
break;
}
if (System.currentTimeMillis() >= timeoutAt) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
} catch (Exception ignore) {
logger.warn("redis lock fail: " + ignore.getMessage());
}
}
if (!flag) {
throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ...");
}
return lockValue;
} catch (DistributionLockException be) {
throw be;
} catch (Exception e) {
logger.warn("get redis lock error, exception: " + e.getMessage());
throw e;
}
}
/**
* 鎖釋放
*
* @param redisKey
* @param lockValue
*/
public void unlock(final String redisKey, final String lockValue) {
if (StringUtils.isEmpty(redisKey)) {
return;
}
if (StringUtils.isEmpty(lockValue)) {
return;
}
try {
String currLockVal = redisTemplate.opsForValue().get(redisKey);
if (currLockVal != null && currLockVal.equals(lockValue)) {
boolean result = redisTemplate.delete(redisKey);
if (!result) {
logger.warn(Thread.currentThread().getName() + " unlock redis lock fail");
} else {
logger.info(Thread.currentThread().getName() + " unlock redis lock:" + redisKey + " successfully!");
}
}
} catch (Exception je) {
logger.warn(Thread.currentThread().getName() + " unlock redis lock error:" + je.getMessage());
}
}
}
然后寫個spring-boot來測試一下:
package com.cnblogs.yjmyzz.redisdistributionlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
public class RedisDistributionLockApplication {
private static Logger logger = LoggerFactory.getLogger(RedisDistributionLockApplication.class);
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext applicationContext = SpringApplication.run(RedisDistributionLockApplication.class, args);
//初始化
StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
RedisLock redisLock = new RedisLock(redisTemplate);
String lockKey = "lock:test";
CountDownLatch start = new CountDownLatch(1);
CountDownLatch threadsLatch = new CountDownLatch(2);
final int lockExpireSecond = 5;
final int timeoutSecond = 3;
Runnable lockRunnable = () -> {
String lockValue = "";
try {
//等待發令槍響,防止線程搶跑
start.await();
//允許丟數據的簡單鎖示例
lockValue = redisLock.simpleLock(lockKey, lockExpireSecond);
//不允許丟數據的分布式鎖示例
//lockValue = redisLock.lock(lockKey, lockExpireSecond, timeoutSecond);
//停一會兒,故意讓后面的線程搶不到鎖
TimeUnit.SECONDS.sleep(2);
logger.info(String.format("%s get lock successfully, value:%s", Thread.currentThread().getName(), lockValue));
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unlock(lockKey, lockValue);
//執行完后,計數減1
threadsLatch.countDown();
}
};
Thread t1 = new Thread(lockRunnable, "T1");
Thread t2 = new Thread(lockRunnable, "T2");
t1.start();
t2.start();
//預備:開始!
start.countDown();
//等待所有線程跑完
threadsLatch.await();
logger.info("======>done!!!");
}
}
用2個線程模擬並發場景,跑起來后,輸出如下:

可以看到T2線程沒搶到鎖,直接拋出了預期的異常。
把44行的注釋打開,即:換成不允許丟數據的模式,再跑一下:

可以看到,T1先搶到鎖,然后經過2秒的處理后,鎖釋放,這時T2重試拿到了鎖,繼續處理,最終釋放。
