// redis 鎖的優勢
對於分布式服務的情況下,當只使用java原生相關鎖(ReentrantLock)操作時,只能保證一個jvm進程中的操作受到鎖的保護,但對於多個jvm進程就無法進行有效鎖保護控制;
因此為了滿足分布式場景, 需要使用一個統一管理位置,因此通過redis 來做作為鎖控制
spring 提供的redis支持
https://docs.spring.io/spring-integration/reference/html/redis.html#redis-lock-registry
其利用java 本地鎖和 redis SET相關指令 雙重保證
- 本地鎖是使用的java.util.concurrent.locks.ReentrantLock#ReentrantLock() 來實現可重入的特性
- 使用redis相關命令 ,從以下代碼中可以看出實際其利用 redis SET 中的指令
// 來源於 spring-integration-redis-4.3.11.RELEASE 版本中 org.springframework.integration.redis.util.RedisLockRegistry
private boolean obtainLock() {
Thread currentThread = Thread.currentThread();
if (currentThread.equals(this.thread)) {
this.reLock++;
return true;
}
toHardThreadStorage(this);
/*
* Set these now so they will be persisted if successful.
*/
this.lockedAt = System.currentTimeMillis();
this.threadName = currentThread.getName();
Boolean success = false;
try {
success = RedisLockRegistry.this.redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
/* 核心代碼點
Perform Redis command 'SET resource-name anystring NX EX max-lock-time' directly.
As it is recommended by Redis: http://redis.io/commands/set.
This command isn't supported directly by RedisTemplate.
*/
long expireAfter = TimeoutUtils.toSeconds(RedisLockRegistry.this.expireAfter,
TimeUnit.MILLISECONDS);
RedisSerializer<String> serializer = RedisLockRegistry.this.redisTemplate.getStringSerializer();
byte[][] actualArgs = new byte[][] {
// 指定存儲到redis 中key的名稱為 "創建RedisLockRegistry對象時傳遞的registryKey名稱" + ":" + "在RedisLockRegistry對象調用obtain方法時傳遞的lockKey"
serializer.serialize(constructLockKey()),
// 指定存儲到redis中的值為 當前新建的RedisLock對象序列化 ; 對於當前序列化操作做了一定的自定義性能提升
RedisLockRegistry.this.lockSerializer.serialize(RedisLock.this),
// 對於redis SET NX 命令的含義為 如果key不存在則會執行新增操作
// 官方解釋 : NX -- Only set the key if it does not already exist.
serializer.serialize("NX"),
// 對於 redis SET EX 命令代表的是, 指定當前key過期時間
// EX seconds -- Set the specified expire time, in seconds.
serializer.serialize("EX"),
// 指定當前key 的過期時間 默認單位為 秒
// EX seconds -- Set the specified expire time, in seconds.
// 對於"RedisLockRegistry#expireAfter"表示當前key過期時間,默認為60s
serializer.serialize(String.valueOf(expireAfter))
};
return connection.execute("SET", actualArgs) != null;
}
});
}
finally {
if (!success) {
this.lockedAt = 0;
this.threadName = null;
toWeakThreadStorage(this);
}
else {
this.thread = currentThread;
if (logger.isDebugEnabled()) {
logger.debug("New lock; " + this);
}
}
}
return success;
}
-
關於redis中存儲的value使用到的 LockSerializer 序列化和反序列化進行分析
private class LockSerializer implements RedisSerializer<RedisLockRegistry.RedisLock> { /** * 其序列化操作並未直接將 {@code RedisLock} 對象進行序列化操作, 而主要通過自定義字節數組長度 根據 redisLock中的必需屬性值的字節長度來定義; * 其主要存儲了以下屬性 * {@code RedisLock#localHost} 當前請求redis的服務器ip * {@code RedisLock#lockKey} 當前redis 存儲的數據 key * {@code RedisLock#threadName} 記錄當前執行加鎖操作的線程名 * {@code RedisLock#lockedAt} 記錄當前加鎖時間 */ @Override public byte[] serialize(RedisLockRegistry.RedisLock t) throws SerializationException { int hostLength = t.lockHost.length; int keyLength = t.lockKey.length(); int threadNameLength = t.threadName.length(); byte[] value = new byte[1 + hostLength + 1 + keyLength + 1 + threadNameLength + 8]; // 使用 nio中的 最新io操作 API {@code ByteBuffer} 來快速操作字節數組 {@code value} ByteBuffer buff = ByteBuffer.wrap(value); buff.put((byte) hostLength) .put(t.lockHost) .put((byte) keyLength) .put(t.lockKey.getBytes()) .put((byte) threadNameLength) .put(t.threadName.getBytes()) .putLong(t.lockedAt); return value; } /** * 對於反序列化操作其通過在序列化過程中存儲了 每個保存的屬性值的字節長度,通過獲取到每個屬性的長度,利用 {@link java.nio.ByteBuffer#get(byte[])} 實現快速讀取操作 */ @Override public RedisLockRegistry.RedisLock deserialize(byte[] bytes) throws SerializationException { if (bytes == null) { return null; } ByteBuffer buff = ByteBuffer.wrap(bytes); byte[] host = new byte[buff.get()]; buff.get(host); byte[] lockKey = new byte[buff.get()]; buff.get(lockKey); byte[] threadName = new byte[buff.get()]; buff.get(threadName); long lockedAt = buff.getLong(); RedisLockRegistry.RedisLock lock = new RedisLockRegistry.RedisLock(new String(lockKey)); lock.lockedAt = lockedAt; lock.lockHost = host; lock.threadName = new String(threadName); return lock; } }
關於當前 redis分布式鎖的缺點(不適合redis集群操作)
由於在redis數據寫入過程中,其實際只在一台redis機器上進行了數據保存,對於redis主從或集群模式下由於存在多台機器,這就會導致 多個相同服務的java機器會請求到不同的redis機器從而導致 相同意義的redis 鎖數據被保存到了不同的redis機器上,使不同來源的客戶端都加鎖成功;
舉例而言: 存在 S1 和 S2 兩台分布式 java服務, 存在 R1(master) 和 R2 HA模式的redis機器, 此時 S1請求加鎖操作,會在R1中存儲相關redis數據,而此時S2也准備請求相同key的加鎖操作,而在此時R1機器宕機了,R2機器作為備份機器開始工作,但對於 R1中的數據在R2中是不會存在的,因此當S2請求到R2執行 SET NX 命令時,其會寫入成功,從而導致 從結果體現而言,同一個redis鎖變量同時被兩個不同的客戶端獲取,這實際就沒有達到分布式鎖排他性的目的了
//TODO : 因此這里引入我們下一章討論的 redis集群模式下采用RedLock操作來實現分布式鎖
