在多線程開發中我們使用鎖來避免線程爭奪共享資源。在分布式系統中,程序在多個節點上運行無法使用單機鎖來避免資源競爭,因此我們需要一個鎖服務來避免多個節點上的進程爭奪資源。
Redis數據庫基於內存,具有高吞吐量、便於執行原子性操作等特點非常適合開發對一致性要求不高的鎖服務。
本文介紹了簡單分布式鎖、Redisson分布式鎖的實現以及解決單點服務的RedLock分布式鎖概念。
Redis是一致性較低的數據庫,若對鎖服務的一致性要求較高建議使用zookeeper等中間件開發鎖服務。
基於單點Redis的分布式鎖
Redis實現分布式鎖的原理非常簡單, 節點在訪問共享資源前先查詢redis中是否有該資源對應的鎖記錄, 若不存在鎖記錄則寫入一條鎖記錄(即獲取鎖)隨后訪問共享資源. 若節點查詢到redis中已經存在了資源對應的鎖記錄, 則放棄操作共享資源.
下面給出一個非常簡單的分布式鎖示例:
import redis.clients.jedis.Jedis;
import java.util.Random;
import java.util.UUID;
public class MyRedisLock {
private Jedis jedis;
private String lockKey;
private String value;
private static final Integer DEFAULT_TIMEOUT = 30;
private static final String SUFFIX = ":lock";
public MyRedisLock(Jedis jedis) {
this.jedis = jedis;
}
public boolean acquire(String key, long time) throws InterruptedException {
Long outdatedTime = System.currentTimeMillis() + time;
lockKey = key + SUFFIX;
while (true) {
if (System.currentTimeMillis() >= outdatedTime) {
return false;
}
value = UUID.randomUUID().toString(); // 1
return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2
}
}
public boolean check() {
return value != null && value.equals(jedis.get(lockKey)); // 3
}
public boolean release() {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3
}
}
加鎖后所有對共享資源的操作都應該先檢查當前線程是否仍持有鎖。
在分布式鎖的實現中有幾點需要注意:
- 加鎖過程:
- 鎖的過期時間應設置到redis中,保證在加鎖客戶端故障的情況下鎖可以被自動釋放
- 使用
set key value EX seconds NX
命令進行加鎖,不要使用setnx和expire兩個命令加鎖。
若setnx執行成功而expire失敗(如執行setnx后客戶端崩潰),則可能造成死鎖。 - 鎖記錄的值不能使用固定值。 使用固定值可能導致嚴重錯誤: 線程A的鎖因為超時被釋放, 隨后線程B成功加鎖。 B寫入的鎖記錄與A的鎖記錄沒有區別, 因此A在檢查時會誤判為自己仍持有鎖。
- 解鎖過程:
- 解鎖操作使用lua腳本執行get和del兩個操作,為了保證兩個操作的原子性。若兩個操作不具有原子性則可能出現錯誤時序: 線程A執行get操作判斷自己仍持有鎖 -> 鎖超時釋放 -> 線程B成功加鎖 -> 線程A刪除鎖記錄(線程A認為刪除了自己的鎖記錄,實際上刪除了線程B的鎖記錄)。
上文只是提供了簡單示例,還有一些重要功能沒有實現:
- 阻塞加鎖:可以使用redis的發布訂閱功能,獲取鎖失敗的線程訂閱鎖被釋放的消息再次嘗試加鎖
- 無限期鎖:應寫入有TTL的鎖記錄,設置定時任務在鎖失效前刷新鎖過期的時間。這種方式可以避免持有鎖的線程崩潰導致的死鎖
- 可重入鎖(持有鎖的線程可以再次加鎖):示例中持有鎖的線程無法對同一個資源再次加鎖,即不可重入鎖。實現可重入鎖需要鎖記錄由(key:資源標記, value:持有者標記)的鍵值對結構變為(key:資源標記, field:持有者標記, value:計數器)這樣的hash結構。持有鎖的線程每次重入鎖計數器加1,每次釋放鎖計數器減1,計數器為0時刪除鎖記錄。
總結來看實現Redis分布式鎖有幾點需要注意:
- 加解鎖操作應保證原子性,避免多個線程同時操作出現異常
- 應考慮進程崩潰、Redis崩潰、操作成功執行但未收到成功響應等異常狀況,避免死鎖
- 解鎖操作必須避免 某個線程釋放了不屬於自己的鎖 的異常
Redisson
這里我們以基於Java的Redisson為例討論一下成熟的Redis分布式鎖的實現。
redisson實現了java.util.concurrent.locks.Lock
接口,可以像使用普通鎖一樣使用redisson:
RLock lock = redisson.getLock("key");
lock.lock();
try {
// do sth.
} finally {
lock.unlock();
}
分析一下RLock的實現類org.redisson.RedissonLock
:
加鎖操作
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
再看等待加鎖的方法lockInterruptibly
:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
lockInterruptibly
方法會嘗試獲取鎖,若獲取失敗則會訂閱釋放鎖的消息。收到鎖被釋放的通知后再次嘗試獲取鎖,直到成功或者超時。
接下來分析tryAcquire
:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId)); // 調用異步獲得鎖的實現,使用get(future)實現同步
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 設置了超時時間
if (leaseTime != -1) {
// tryLockInnerAsync 加鎖成功返回 null, 加鎖失敗在 Future 中返回鎖記錄剩余的有效時間
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 未設置超時時間,嘗試獲得無限期的鎖
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 避免對共享資源操作完成前鎖就被釋放掉,定期刷新鎖失效的時間
// 默認鎖失效時間的三分之一即進行刷新
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
tryAcquireAsync
中主要邏輯是無限期鎖的實現,Redisson並非設置了永久的鎖記錄,而是定期刷新鎖失效的時間。
這種方式避免了持有鎖的進程崩潰無法釋放鎖導致死鎖。
真正實現獲取鎖邏輯的是tryLockInnerAsync
方法:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(
getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 資源未被加鎖
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 寫入鎖記錄, 鎖記錄是一個hash; key:共享資源名稱, field:鎖實例名稱(Redisson客戶端ID:線程ID), value: 1(value是一個計數器,記錄當前線程獲取該鎖的次數,實現可重入鎖)
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 設置鎖記錄過期時間
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若當前線程已經持有該資源的鎖
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 將鎖計數器加1,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 資源已被其它線程加鎖,加鎖失敗。獲取鎖剩余生存時間后返回
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上述操作使用eval命令執行lua腳本保證了操作的原子性。
unlock
解鎖過程相對簡單:
@Override
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
}
unlockInnerAsync
方法實現了具體的解鎖邏輯:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 資源未被加鎖,可能鎖已被超時釋放
"redis.call('publish', KEYS[2], ARGV[1]); " + // 發布鎖被釋放的消息
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 鎖的持有者不是自己,拋出異常
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 自己持有鎖,因為鎖是可重入的將計數器減1
"if (counter > 0) then " + // 計數器大於0,鎖未被完全釋放,刷新鎖過期時間
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + // 鎖被完全釋放,刪除鎖記錄,發布鎖被釋放的消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
RedLock
基於單點的分布式鎖無法解決redis故障的問題. 為了保證redis的可用性我們通常采用主從備份的方法, 即 使用一個master實例和至少一個slave實例.
當有寫入請求時先寫入master然后寫入到所有slave, 當master實例故障時選擇一個slave實例升級為master實例繼續提供服務.
其中存在的問題是, 寫入master和寫入slave存在時間差. 若線程A成功將鎖記錄寫入了master, 隨后在同步寫入slave之前, master故障轉移到slave.
因為slave(新master)中沒有鎖記錄, 因此線程B也可以成功加鎖, 因此可能出現A和B同時持有鎖的錯誤.
為了解決redis失效可能造成的問題, redis的作者antirez提出了RedLock實現方案:
-
客戶端獲取當前時間
-
客戶端嘗試獲取N個節點的鎖, 每個節點使用相同的key和value. 請求超時時間要遠小於鎖超時時間, 避免在節點或者網絡故障時浪費時間.
-
客戶端計算在加鎖時消耗的時間, 只有客戶端成功獲得超過一半節點的鎖且總時間小於鎖超時間時才能成功加鎖. 客戶端持有鎖的時間為鎖超時時間減去加鎖消耗的時間.
-
若獲取鎖失敗則訪問所有節點, 發起釋放鎖的請求.
釋放鎖時需要向所有Redis節點發出釋放鎖的請求, 原因在於可能某個Redis實例中成功寫入了鎖記錄, 但是沒有響應沒有到達客戶端.
為了保證所有鎖記錄都被正確釋放, 所以需要向所有Redis實例發送釋放請求.
關於安全性的討論
關於RedLock的安全性問題, Martin Kleppmann和作者antirez進行了一些討論:
- Martin Kleppmann: How to do distributed locking
- [antirez:Is Redlock safe?](http://antirez.com/news/101)
關於這場討論的分析可以參考: