一. 簡介
分布式鎖在很多場景中都非常的有用,分布式鎖是一個概念,實現他的方式有很多,本篇文章是基於Redis實現的單機分布式鎖。
主要解決多並發編程中由於鎖競爭而帶來的數據不一致的問題。
二. 應用場景
在本篇文章中主要解決Redis中緩存擊穿問題。
並發的訪問一條數據,數據庫有,但是緩存中不存在(沒人訪問這條數據或者Redis中數據剛好過期),導致一瞬間多個請求訪問數據庫,數據庫壓力增大,這類數據通常為熱點數據。
三. 模擬緩存擊穿
以下程序模擬100個線程同時去訪問一條沒有緩存的數據。
1. 業務代碼(service層)
@Override
public Object listByRedis(String id) {
HashMap<Object, Object> result = new HashMap<>();
//通過布隆過濾器 解決緩存穿透問題. 會有誤判 但是沒有關系 不會有太多誤判。
if (!bloomFilter.isExist(id)){
result.put("status", 400);
result.put("msg", "非法訪問");
return result;
}
//查詢緩存
Object redisData = redisTemplate.opsForValue().get(id);
//是否命中
if(redisData != null){
//返回結果
result.put("status", 200);
result.put("msg", "緩存命中");
result.put("data", redisData);
return result;
}
try {
UserInfo userInfo = userInfoMapper.selectById(id);
if (userInfo != null){
redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
result.put("status", 200);
result.put("msg", "查詢數據庫");
result.put("data", userInfo);
return result;
}else{
result.put("status", 200);
result.put("msg", "沒有數據");
return result;
}
}finally {
}
}
2. 並發模擬
並發訪問id=1096這條數據
public class ReadTest {
private static CountDownLatch countDownLatch = new CountDownLatch(99);
@Test
public void test() throws InterruptedException {
TicketsRunBle ticketsRunBle = new TicketsRunBle();
for (int i = 0;i<=99;i++){
new Thread(ticketsRunBle, "窗口"+i).start();
countDownLatch.countDown();
}
Thread.currentThread().join();
}
public class TicketsRunBle implements Runnable{
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
RestTemplate restTemplate = new RestTemplate();
R forObject = restTemplate.getForObject("http://localhost:8082/user?id=1096", R.class);
System.out.println("結果:" + forObject);
}
}
}
3. 執行結果
截取部分,都是數據庫查詢出來的,日志打印也有Mybatis的記錄。
四. 單機Redis分布式鎖的實現
Redis分布式鎖原理上是使用Setnx命令實現:
SET resource_name my_random_value NX PX 30000。
這個命令僅在不存在key的時候才能被執行成功(NX選項),並且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是“my_random_value”(一個隨機值),這個值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個值都不能一樣。
當client嘗試獲取鎖時,我們將事先定義的key設置一個值,之后的client再設置時則會不成功。
釋放鎖時實現:
為什么value要使用一個唯一的值,主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在並且存儲的值和我指定的值一樣才能告訴我刪除成功。可以通過以下Lua腳本實現:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end使用這種方式可以避免刪除別的Client獲得的鎖。舉個栗子:
客戶端A取得資源鎖,但是緊接着被一個其他操作阻塞了,當客戶端A運行完畢其他操作后要釋放鎖時,原來的鎖早已超時並且被Redis自動釋放,並且在這期間資源鎖又被客戶端B再次獲取到。如果僅使用DEL命令將key刪除,那么這種情況就會把客戶端B的鎖給刪除掉。使用Lua腳本就不會存在這種情況,因為腳本僅會刪除value等於客戶端A的value的key(value相當於客戶端的一個簽名)。
本篇文章獲取鎖有一點不同,上面說的是設置一個固定的key,而本篇文章解決的問題是基於單條數據的一個並發查詢。
所以需要對單條數據的ID作為key進行加鎖,防止查詢同一條數據多次訪問數據庫。
1. 鎖實現:
/**
* 自定義分布式鎖
* 這里主要實現對單條數據進行加鎖,通過id進行加鎖
* 多個線程同時訪問該數據會阻塞
* @author
* @Date 2022/1/6
*/
@Component
public class RedisLock {
private static JedisPool jedisPool;
@Autowired
public void setJedisPool(JedisPool jedisPool){
RedisLock.jedisPool = jedisPool;
}
/**
* 鎖健
*/
private final static String KEY = "lock_key_";
/**
* 鎖過期時間
*/
private final static long LOCK_EXPIRED = 30000;
/**
* 鎖競爭超時時間
*/
private final static long LOCK_WAIT_TIME_OUT = 999999;
/**
* SET命令參數
*/
static SetParams params = SetParams.setParams().nx().px(LOCK_EXPIRED);
/**
* ThreadLocal用於保存某個線程共享變量:對於同一個static ThreadLocal
* 不同的線程只能從中get,set到自己線程的副本
*/
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
/**
* 嘗試獲取鎖
* @param key
* @return
*/
public Boolean tryLock(String key){
String value = UUID.randomUUID().toString();
Jedis resource = jedisPool.getResource();
long startTime = System.currentTimeMillis();
try {
for(;;){
//SET命令返回OK,獲取鎖成功
String set = resource.set(KEY.concat(key), value, params);
if ("OK".equals(set)){
threadLocal.set(value);
return true;
}
//增加一個超時時間判斷
if(System.currentTimeMillis() - startTime > LOCK_WAIT_TIME_OUT){
return false;
}
//休眠一段時間 遞歸調用
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
resource.close();
}
}
/**
* 釋放鎖 通過lua腳本實現
* @param key
* @return
*/
public boolean unLock(String key){
Jedis resource = null;
try {
resource = jedisPool.getResource();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then" +
" return redis.call('del', KEYS[1]) " +
"else" +
" return 0 " +
"end";
Object eval = resource.eval(script, Collections.singletonList(KEY.concat(key)), Collections.singletonList(threadLocal.get()));
if ("1".equals(eval.toString())) {
return true;
}
return false;
}catch (Exception e){
e.printStackTrace();
return false;
}finally {
if (resource != null){
resource.close();
}
}
}
}
2. 業務代碼(service層)
@Override
public Object listByRedis(String id) {
HashMap<Object, Object> result = new HashMap<>();
//通過布隆過濾器 解決緩存穿透問題. 會有誤判 但是沒有關系 不會有太多誤判。
if (!bloomFilter.isExist(id)){
result.put("status", 400);
result.put("msg", "非法訪問");
return result;
}
//查詢緩存
Object redisData = redisTemplate.opsForValue().get(id);
//是否命中
if(redisData != null){
//返回結果
result.put("status", 200);
result.put("msg", "緩存命中");
result.put("data", redisData);
return result;
}
try {
//添加分布式鎖,進來后在查詢一次緩存,如果上一個線程已經查詢並且存入緩存
Boolean lock = redisLock.tryLock(id);
if (!lock){
result.put("status", 500);
result.put("msg", "訪問超時,稍后再試");
return result;
}
//查詢緩存
redisData = redisTemplate.opsForValue().get(id);
//是否命中
if(redisData != null){
//返回結果
result.put("status", 200);
result.put("msg", "緩存命中");
result.put("data", redisData);
return result;
}
UserInfo userInfo = userInfoMapper.selectById(id);
if (userInfo != null){
redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
result.put("status", 200);
result.put("msg", "查詢數據庫");
result.put("data", userInfo);
return result;
}else{
result.put("status", 200);
result.put("msg", "沒有數據");
return result;
}
}finally {
redisLock.unLock(id);
}
}
3. 測試結果
打印日志顯示,只會有一次數據庫查詢。
返回的結果除了第一次是查詢數據庫,后面的都是緩存命中
五. 總結
該鎖的實現有很多不足之處,不斷了解學習的一個過程,可使用Redisson中實現的分布式鎖可用性更高。