redis 讀寫鎖實現


先搞清楚讀寫鎖要做什么。

 

基本就是
讀讀不互斥,讀寫互斥,寫寫互斥。可重入。

 

關於redis讀寫鎖,我寫了一次之后,總覺得很怪,然后就上網看到大神的redisson了,果斷借鑒一番。

讀行為

當寫鎖未獲取,加上讀鎖(通知其他請求數據在讀狀態),讀數據

當寫鎖被獲取,等待,直到寫鎖未獲取,加讀鎖,讀數據

寫行為

當寫鎖未獲取,等待獲取寫鎖

當寫鎖被獲取,加寫鎖。讀鎖未獲取,等待獲取讀鎖

當寫鎖被獲取,讀鎖被獲取,寫數據

 

可以看出讀鎖可重入一定意義都沒有,寫鎖才有意義

 

三 初版

先說下總結

1.重入也只是本機重入,不能實現鎖在其他服務器的重入。

2.讀寫鎖獲取鎖的時候,是兩個redis操作,原子性不行,所以要用redis的eval命令或者直接使用lua腳本。

3.用switch來判斷讀寫模式太蠢了,代碼可讀性低,早期想的簡單,但是邏輯一復雜就很麻煩了。

 

ps.

spring自帶的redisTemplate則沒有提供eval的接口,只提供使用lua腳本,相應的讀寫鎖代碼要自己寫。

netty自帶的redisson則是用了eval命令,則已經寫好了代碼,只需要傻瓜式調用就好了。

 

代碼

--存放讀寫鎖的信息
public
enum LockModel { READ("%s:READ"), WRITE("%s:WRITE"),; String lockFormat; LockModel(String lockFormat) { this.lockFormat = lockFormat; } public String getLockModelName() { return super.name(); } public String getLockFormat() { return lockFormat; } public static void main(String[] args) { LockModel read = LockModel.READ; System.out.println(read.getLockFormat()); System.out.println(read.getLockModelName()); } }
--實現java自帶的讀寫鎖接口
public class ReadWriteLock implements java.util.concurrent.locks.ReadWriteLock {
  /**
    * 應該是唯一標識組成的key,可以使線程id,可以使用戶id,可以使服務器id
   */

String name;
/**
* 毫秒
* */
Long timeInterval;

public ReadWriteLock(String name, Long timeInterval) {
this.name = name;
this.timeInterval = timeInterval;
}

@Override
public Lock readLock() {
return new ReentrantLock(this, LockModel.READ);
}

@Override
public Lock writeLock() {
return new ReentrantLock(this, LockModel.WRITE);
}

}

--重入鎖
public class ReentrantLock implements Lock {

@Autowired
RedisTemplate redisTemplate;

ReadWriteLock rwLock;
LockModel lockModel;
String lockName;
Long deadTime = 0L;
boolean localWriteLocked = false;

public ReentrantLock(ReadWriteLock rwLock, LockModel lockModel) {
this.rwLock = rwLock;
this.lockModel = lockModel;
setLockName(lockModel);
}

@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void lockInterruptibly() throws InterruptedException {
switch (getLockModel()) {
case WRITE:
if (!isLocalWriteLocked()) {
setLockModel(LockModel.READ);
while (!tryLock()) {
Thread.sleep(500);
}
redisTemplate.opsForValue().set(getLockName(), getDeadTime(), getRwLock().getTimeInterval());

setLockModel(LockModel.WRITE);
while (!tryLock()) {
Thread.sleep(500);
}
setLocalWriteLocked(true);
} else {
/**
* 本機持有寫鎖,重入,但要等待之前的寫操作完成
* */
while (!isLocalWriteLocked()) {
Thread.sleep(500);
}

/**
* 更新寫鎖的過期時間
* */
redisTemplate.opsForValue().set(getLockName(), getDeadTime(), getRwLock().getTimeInterval());
setLocalWriteLocked(true);
}
break;
case READ:
while (!tryLock()) {
Thread.sleep(500);
}
setDeadTime();
redisTemplate.opsForValue().set(getLockName(), getDeadTime(), getRwLock().getTimeInterval());
break;
}
}

@Override
public boolean tryLock() {
return null != redisTemplate.opsForValue().get(getOpposeLockName());
}

@Override
public void unlock() {
switch (getLockModel()) {
case WRITE:
if (isLocalWriteLocked()) {
setLocalWriteLocked(false);
}
redisTemplate.delete(getLockName());
break;
case READ:
redisTemplate.delete(getLockName());
break;
}
}


public Long getTimeInterval() {
return rwLock.getTimeInterval();
}

public void setDeadTime() {
this.deadTime = System.currentTimeMillis() + getTimeInterval();
}

private String getOpposeLockName() {
String opposeLockName = "";
switch (getLockModel()) {
case READ:
opposeLockName = String.format(LockModel.WRITE.getLockFormat(), getRwLock().getName());
break;
case WRITE:
opposeLockName = String.format(LockModel.READ.getLockFormat(), getRwLock().getName());
break;
default:
break;
}
return opposeLockName;
}

}

 

 

四 redisson分析

還是先總結

1.用hashmap存讀寫鎖的信息。讀鎖寫鎖的本質則是model的不同。讀鎖寫鎖只是不同的mapfield。而讀鎖還有過期時間為屬性。

2.用頻道記錄線程的操作。具體為什么用頻道就要看LockPubSub和PublishSubscribe,這里因為不涉及我就不細說了。

 

RedissonReadLock

//    判斷有沒有鎖
  @Override
public boolean isLocked() { RFuture<String> future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode"); String res = get(future); return "read".equals(res); }

可以看出嘗試獲取鎖的狀態的代碼都寫的很簡單,但是redisson用了hashmap來存放。

 

    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                     //鎖出錯 "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('set', KEYS[2] .. ':1', 1); " + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " +
                     //在讀模式或者本線程獲取寫鎖的時候進行讀 "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" + "redis.call('set', key, 1); " + "redis.call('pexpire', key, ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); }

注意里面的getWriteLockName(threadId)

 

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
        String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
          //鎖不存在 "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + "if (lockExists == 0) then " + "return nil;" + "end; " + //給讀鎖的值-1,返回結果值 "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
          // 結果值為0,刪除讀鎖 "if (counter == 0) then " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" +
          //  把自己的超時標記刪除 "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + //還有其他讀 "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " + "if maxRemainTime > 0 then " + "redis.call('pexpire', KEYS[1], maxRemainTime); " + "return 0; " + "end;" + //有寫鎖直接返回 "if mode == 'write' then " + "return 0;" + "end; " + "end; " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.unlockMessage, getLockName(threadId)); }

解鎖還給其他鎖續命,,,最大存活時間maxRemainTime很有意思,存在就給他加上等量的剩余存活時間,而不是固定加多少。那是不是無限續然后過期不了?但是這里是讀寫鎖的存活時間而不是讀鎖的時間。

並且publish到相應的頻道,更新狀態。

 

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
        String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
        
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
            //不是false "if (counter ~= false) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "if (redis.call('hlen', KEYS[1]) > 1) then " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + "end; " + "end; " + "end; " + "end; " + "return 1; " + "end; " + "return 0;", Arrays.<Object>asList(getName(), keyPrefix), internalLockLeaseTime, getLockName(threadId)); }

刷新存活時間沒啥特殊的

 

    @Override
    public RFuture<Boolean> forceUnlockAsync() {
        cancelExpirationRenewal(null);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0; ",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage);
    }

沒有之前續命的操作了。並且整個刪除

 

RedissonWriteLock

    @Override
    public RFuture<Boolean> forceUnlockAsync() {
        cancelExpirationRenewal(null);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " +
                  "redis.call('del', KEYS[1]); " +
                  "redis.call('publish', KEYS[2], ARGV[1]); " +
                  "return 1; " +
              "end; " +
              "return 0; ",
              Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.readUnlockMessage);
    }

    @Override
    public boolean isLocked() {
        RFuture<String> future = commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.HGET, getName(), "mode");
        String res = get(future);
        return "write".equals(res);
    }

這兩方法和讀鎖類似就不說了,而且增加過期時間寫鎖不支持這功能

 

 @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +
                                  "redis.call('hset', KEYS[1], 'mode', 'write'); " +
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                              "if (mode == 'write') then " +
                                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                      "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                      "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                      "return nil; " +
                                  "end; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getName()), 
                        internalLockLeaseTime, getLockName(threadId));
    }

顯然,如果寫鎖是這個線程持有的才可以進行寫操作。

 

    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
        String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
                "if (lockExists == 0) then " +
                    "return nil;" +
                "end; " +
                    
                "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
                "if (counter == 0) then " +
                    "redis.call('hdel', KEYS[1], ARGV[2]); " + 
                "end;" +
                "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
                
                "if (redis.call('hlen', KEYS[1]) > 1) then " +
                    "local maxRemainTime = -3; " + 
                    "local keys = redis.call('hkeys', KEYS[1]); " + 
                    "for n, key in ipairs(keys) do " + 
                        "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                        "if type(counter) == 'number' then " + 
                            "for i=counter, 1, -1 do " + 
                                "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                                "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                            "end; " + 
                        "end; " + 
                    "end; " +
                            
                    "if maxRemainTime > 0 then " +
                        "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                        "return 0; " +
                    "end;" + 
                        
                    "if mode == 'write' then " + 
                        "return 0;" + 
                    "end; " +
                "end; " +
                    
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; ",
                Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
                LockPubSub.unlockMessage, getLockName(threadId));
    }

這里的也是給讀寫鎖續命,看來就是數據使用次數越多讀寫鎖存活的時間越長,而具體的讀鎖寫鎖的存活時間則是hashmap里面的一個屬性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM