業務背景:存儲請求參數token ,token唯一 ,且新的生成舊的失效
思路:因為是多台機器,獲取token存入redis,保持唯一,考慮使用redis來加鎖,其實就是在redis中存一個key,其他機器發現key有值的話就不進行獲取token的請求。
SET操作會覆蓋原有值,SETEX雖然可設置key過期時間,但也會覆蓋原有值,所以考慮可以使用SETNX
SETNX Key value
看上去SETNX 配合 EXPIRE(過期時間)是個不錯的選擇,於是就有了加鎖錯誤示例1:
jedis.setnx("lockName","value");
//這里redis掛掉,就是一個死鎖
jedis.expire("lockName",10);
因為這兩個操作不具備原子性,所以可能出現死鎖,之所以有這樣的示例,是因為低版本的redis的SET還不支持多參數命令
從 Redis 2.6.12 版本開始, SET 命令的行為可以通過一系列參數來修改 EX second :設置鍵的過期時間為 second 秒。 SET key value EX second 效果等同於 SETEX key second value 。 PX millisecond :設置鍵的過期時間為 millisecond 毫秒。 SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。 NX :只在鍵不存在時,才對鍵進行設置操作。 SET key value NX 效果等同於 SETNX key value 。 XX :只在鍵已經存在時,才對鍵進行設置操作。
這里可以引出 redis正確的加鎖示例:
public static boolean lock(Jedis jedis, String lockKey, String uid, int expireTime) {
String result = jedis.set(lockKey, uid,"NX" "PX", expireTime);
if ("OK".equals(result)) {
return true;
}
return false;
}
其實就等於在redis中執行了 :set key value nx px 10000

再來看一下分布式鎖的要求:
互斥性。在任意時刻,只有一個客戶端能持有鎖。
不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
性能。排隊等待鎖的節點如果不知道鎖何時會被釋放,則只能隔一段時間嘗試獲取一次鎖,這樣無法保證資源的高效利用,因此當鎖釋放時,要能夠通知等待隊列,使一個等待節點能夠立刻獲得鎖。
重入。同一個線程可以重復拿到同一個資源的鎖。
NX保證互斥性
PS:因為 SET 命令可以通過參數來實現和 SETNX 、 SETEX 和 PSETEX 三個命令的效果,不知道將來的 Redis 版本會不會廢棄 SETNX 、 SETEX 和 PSETEX 這三個命令 ?
下面看一個釋放鎖的錯誤示例:
public static void wrongUnLock1(Jedis jedis, String lockKey, String requestId) {
// 判斷加鎖與解鎖是不是同一個線程
if (requestId.equals(jedis.get(lockKey))) {
// lockkey鎖失效,下一步刪除的就是別人的鎖
jedis.del(lockKey);
}
}
根本問題還是保證操作的原子性,因為是兩步操作,即便判斷到是當前線程的鎖,但是也有可能再刪除之前剛好過期,這樣刪除的就是其他線程的鎖。
如果業務要求精細,我們可以使用lua腳本來進行完美解鎖
/** * redis可以保證lua中的鍵的原子操作 unlock:lock調用完之后需unlock,否則需等待lock自動過期 * * @param lock * uid 只有線程已經獲取了該鎖才能釋放它(uid相同表示已獲取) */ public void unlock( String lock) { Jedis jedis = new Jedis("localhost"); final String uid= tokenMap.get(); if (StringUtil.isBlank(token)) return; try { final String script = "if redis.call(\"get\",\"" + lock + "\") == \"" + uid + "\"then return redis.call(\"del\",\"" + lock + "\") else return 0 end "; jedis.eval(script); } catch (Exception e) { throw new RedisException("error"); } finally { if (jedis != null) jedis.close(); } }
關於lua:
lua腳本優點
-
減少網絡開銷:本來多次網絡請求的操作,可以用一個請求完成,原先多次請求的邏輯放在redis服務器上完成。使用腳本,減少了網絡往返時延
-
原子操作:Redis會將整個腳本作為一個整體執行,中間不會被其他命令插入
-
復用:客戶端發送的腳本會永久存儲在Redis中,意味着其他客戶端可以復用這一腳本而不需要使用代碼完成同樣的邏輯
上面這個腳本很簡單
if redis.call(\"get\",\"" + lock + "\") // redisGET命令 == \"" +uid + // 判斷是否是當前線程 "\"then return redis.call(\"del\",\"" + lock + "\") // 如果是,執行redis DEL操作,刪除鎖 else return 0 end
同理我們可以使用lua給線程加鎖
local lockkey = KEYS[1] --唯一隨機數 local uid = KEYS[2] --失效時間,如果是當前線程,也是續期時間 local time = KEYS[3] if redis.call('set',lockkey,uid,'nx','px',time)=='OK' then return 'OK' else if redis.call('get',lockkey) == uid then if redis.call('EXPIRE',lockkey,time/1000)==1 then return 'OOKK' end end end
lua腳本也可以通過外部文件讀取,方便修改
public void luaUnLock() throws Exception{ Jedis jedis = new Jedis("localhost") ; InputStream input = new FileInputStream("unLock.lua"); byte[] by = new byte[input.available()]; input.read(by); String script = new String(by); Object obj = jedis.eval(script, Arrays.asList("key","123"), Arrays.asList("")); System.out.println("執行結果 " + obj); }
PS:跟同事討論的時候,想到可不可以利用redis的額事物來解鎖,並沒有實際使用,怕有坑。
redis事物解鎖
public boolean unLock(Jedis jedis, String lockName, String uid) throws Exception{ jedis.watch(lockName); //這里的判斷uid和下面的del雖然不是原子性,有了watch可以保證不會誤刪鎖 if (jedis.get(lockName).equals(uid)) { redis.clients.jedis.Transaction transaction = jedis.multi(); transaction.del(lockName); List<Object> exec = transaction.exec(); if (exec.get(0).equals("OK")) { transaction.close(); return true; } } return false; }
可重入鎖
//保存每個線程獨有的token private static ThreadLocal<String> tokenMap = new ThreadLocal<>(); /** * 這個例子還不太完善。 * redis實現分布式可重入鎖,並不保證在過期時間內完成鎖定內的任務,需根據業務邏輯合理分配seconds * * @param lock * 鎖的名稱 * @param mseconds * 鎖定時間,單位 毫秒 * token 對於同一個lock,相同的token可以再次獲取該鎖,不相同的token線程需等待到unlock之后才能獲取 * */ public boolean lock(final String lock, int mseconds ,Jedis jedis) { // token 對於同一個lock,相同的token可以再次獲取該鎖,不相同的token線程需等待到unlock之后才能獲取 String token = tokenMap.get(); if (StringUtil.isBlank(token)) { token = UUID.randomUUID().toString().replaceAll("-",""); tokenMap.set(token); } boolean flag = false; try { String ret = jedis.set(lock, token, "NX", "PX", mseconds); if (ret == null) {// 該lock的鎖已經存在 String origToken = jedis.get(lock);// 即使lock已經過期也可以 if (token.equals(origToken) || origToken==null) { // token相同默認為同一線程,所以token應該盡量長且隨機,保證不同線程的該值不相同 ret = jedis.set(lock, token, "NX", "PX", mseconds);// if ("OK".equalsIgnoreCase(ret)) flag = true; System.out.println("當前線程 " + token); } } else if ("OK".equalsIgnoreCase(ret)) flag = true; System.out.println("當前線程 " + token); } catch (Exception e) { } finally { if (jedis != null) jedis.close(); } return flag; }
繼續正題,說到lua腳本 和 可重入鎖,就不得不提 redission了
redission
可重入鎖(Reentrant Lock)
公平鎖(Fair Lock)
聯鎖(MultiLock)
紅鎖(RedLock)
讀寫鎖(ReadWriteLock)
信號量(Semaphore) 等等
下面分析一下可重入的源碼
/** * redission分布式鎖-重試時間 秒為單位 * @param lockName 鎖名 * @param waitTime 重試時間 * @param leaseTime 鎖過期時間 * @return */ public boolean tryLock(String lockName,long waitTime,long leaseTime){ try{ RLock rLock = redissonClient.getLock(lockName); return rLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); }catch (Exception e){ logger.error("redission lock error with waitTime",e); } return false; }
org.redisson.Redisson#getLock()
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name, id);
}
-
commandExecutor: 與 Redis 節點通信並發送指令的真正實現。需要說明一下,Redisson 的 CommandExecutor 實現是通過 eval 命令來執行 Lua 腳本,所以要求 Redis 的版本必須為 2.6 或以上
-
name: 鎖的全局名稱,例如上面代碼中的 "foobar",具體業務中通常可能使用共享資源的唯一標識作為該名稱。
-
id: Redisson 客戶端唯一標識。
org.redisson.RedissonLock#lock()
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 1.嘗試獲取鎖 Long ttl = tryAcquire(leaseTime, unit); // 2.獲得鎖成功 if (ttl == null) { return; } // 3.等待鎖釋放,並訂閱鎖 long threadId = Thread.currentThread().getId(); Future<RedissonLockEntry> future = subscribe(threadId); get(future); try { while (true) { // 4.重試獲取鎖 ttl = tryAcquire(leaseTime, unit); // 5.成功獲得鎖 if (ttl == null) { break; } // 6.等待鎖釋放 if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { // 7.取消訂閱 unsubscribe(future, threadId); } }
-
首先嘗試獲取鎖,具體代碼下面再看,返回結果是已存在的鎖的剩余存活時間,為 null 則說明沒有已存在的鎖並成功獲得鎖。
-
如果獲得鎖則結束流程,回去執行業務邏輯。
-
如果沒有獲得鎖,則需等待鎖被釋放,並通過 Redis 的 channel 訂閱鎖釋放的消息
-
訂閱鎖的釋放消息成功后,進入一個不斷重試獲取鎖的循環,循環中每次都先試着獲取鎖,並得到已存在的鎖的剩余存活時間。
-
如果在重試中拿到了鎖,則結束循環,跳過第 6 步。
-
如果鎖當前是被占用的,那么等待釋放鎖的消息,具體實現使用了 JDK 並發的信號量工具 Semaphore 來阻塞線程,當鎖釋放並發布釋放鎖的消息后,信號量的 release() 方法會被調用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續嘗試獲取鎖了。
-
在成功獲得鎖后,就沒必要繼續訂閱鎖的釋放消息了,因此要取消對 Redis 上相應 channel 的訂閱。
重點看一下 tryAcquire() 方法的實現
private Long tryAcquire(long leaseTime, TimeUnit unit) { return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId())); } private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 2.用默認的鎖超時時間去獲取鎖 Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // 成功獲得鎖 if (ttlRemaining == null) { // 3.鎖過期時間刷新任務調度 scheduleExpirationRenewal(); } } }); return ttlRemainingFuture; } <T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 3.使用 EVAL 命令執行 Lua 腳本獲取鎖 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
-
獲取鎖真正執行的命令,Redisson 使用 EVAL 命令執行上面的 Lua 腳本來完成獲取鎖的操作
-
通過 exists 命令發現當前 key 不存在,即鎖沒被占用,則執行 hset 寫入 Hash 類型數據 key:全局鎖名稱(例如共享資源ID), field:鎖實例名稱(Redisson客戶端ID:線程ID), value:1,並執行 pexpire 對該 key 設置失效時間,返回空值 nil,至此獲取鎖成功
-
如果通過 hexists 命令發現 Redis 中已經存在當前 key 和 field 的 Hash 數據,說明當前線程之前已經獲取到鎖,因為這里的鎖是可重入的,則執行 hincrby 對當前 key field 的值加一,並重新設置失效時間,返回空值,至此重入獲取鎖成功。
-
最后是鎖已被占用的情況,即當前 key 已經存在,但是 Hash 中的 Field 與當前值不同,則執行 pttl 獲取鎖的剩余存活時間並返回,至此獲取鎖失敗。
public void unlock() { // 1.通過 EVAL 和 Lua 腳本執行 Redis 命令釋放鎖 Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); // 2.非鎖的持有者釋放鎖時拋出異常 if (opStatus == null) { throw new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } // 3.釋放鎖后取消刷新鎖失效時間的調度任務 if (opStatus) { cancelExpirationRenewal(); }
-
使用 EVAL 命令執行 Lua 腳本來釋放鎖:
-
key 不存在,說明鎖已釋放,直接執行 publish 命令發布釋放鎖消息並返回 1。
-
key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,返回 nil。
-
因為鎖可重入,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執行 hincrby 對鎖的值減一。
-
釋放一把鎖后,如果還有剩余的鎖,則刷新鎖的失效時間並返回 0;如果剛才釋放的已經是最后一把鎖,則執行 del 命令刪除鎖的 key,並發布鎖釋放消息,返回 1。
-
上面執行結果返回 nil 的情況(即第2中情況),因為自己不是鎖的持有者,不允許釋放別人的鎖,故拋出異常。
-
執行結果返回 1 的情況,該鎖的所有實例都已全部釋放,所以不需要再刷新鎖的失效時間。
可以看到redission最終還是使用了lua腳本來加解鎖 :
加鎖腳本
if (redis.call('exists' KEYS[1]) == 0) then + -- exists 判斷key是否存在 redis.call('hset' KEYS[1] ARGV[2] 1); + --如果不存在,hset存哈希表 redis.call('pexpire' KEYS[1] ARGV[1]); + --設置過期時間 return nil; + -- 返回null 就是加鎖成功 end; + if (redis.call('hexists' KEYS[1] ARGV[2]) == 1) then + -- 如果key存在,查看哈希表中是否存在 redis.call('hincrby' KEYS[1] ARGV[2] 1); + -- 給哈希中的key加1,代表重入1次,以此類推 redis.call('pexpire' KEYS[1] ARGV[1]); + -- 重設過期時間 return nil; + end; + return redis.call('pttl' KEYS[1]); --如果前面的if都沒進去,說明ARGV2 的值不同,也就是不是同 一線程的鎖,這時候直接返回該鎖的過期時間
推薦使用sciTE來編輯lua

解鎖的腳本就不分析了,還是操作的redis命令,主要是lua腳本執行的時候能保證原子性。
lua腳本的缺點
a = 0
while(a < 3) do
print("x = " .. '我是循環')
end
幾個lua腳本示例
示例1——實現訪問頻率限制: 實現訪問者 $ip 在一定的時間 time 內只能訪問 limit 次
local key = "rate.limit:" .. KEYS[1] local limit = tonumber(ARGV[1]) local expire_time = ARGV[2] local is_exists = redis.call("EXISTS", key) if is_exists == 1 then if redis.call("INCR", key) > limit then return '拒絕訪問' else return '可以訪問' end else return redis.call("SET", key, "1","NX","PX",expire_time) end
示例2 —— 搶紅包
-- 腳本:嘗試獲得紅包,如果成功,則返回json字符串,如果不成功,則返回空 -- 參數:紅包隊列名, 已消費的隊列名,去重的Map名,用戶ID -- 返回值:nil 或者 json字符串,包含用戶ID:userId,紅包ID:id,紅包金額:money -- jedis.eval(getScript(), 4, hongBaoList, hongBaoConsumedList, hongBaoConsumedMap, "" + j) if redis.call('hexists', KEYS[3], KEYS[4]) ~= 0 then return nil else -- 先取出一個小紅包 local hongBao = redis.call('rpop', KEYS[1]) -- hongbao : {"Money":9,"Id":8} if hongBao then local x = cjson.decode(hongBao) -- 加入用戶ID信息 x['userId'] = KEYS[4] local re = cjson.encode(x) -- 把用戶ID放到去重的set里 redis.call('hset', KEYS[3], KEYS[4], KEYS[4]) -- 把紅包放到已消費隊列里 redis.call('lpush', KEYS[2], re) return re; end end return nil
SCRIPT LOAD 命令
redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'" "232fd51614574cf0867b83d384a5e898cfd24e5a" redis 127.0.0.1:6379> EVALSHA "232fd51614574cf0867b83d384a5e898cfd24e5a" 0 "hello moto"
-
SCRIPT LOAD "lua-script" 獲得 腳本的SHA1摘要 lua-script-sha1
-
SCRIPT EXISTS lua-script-sha1 可以判斷腳本是否存在 存在-1, 不存在-0
-
SCRIPT FLUSH 清空 SHA1摘要 redis將腳本的SHA1摘要加入到腳本緩存后會永久保留,不會刪除,但可以手動使用SCRIPT FLUSH命令情況腳本緩存
-
SCRIPT KILL 強制終止當前腳本
Java中使用 SCRIPT LOAD
public void scriptLoad()throws Exception{ Jedis jedis = new Jedis("localhost"); //從文件讀取lua腳本 InputStream input = new FileInputStream("return.lua"); byte[] by = new byte[input.available()]; input.read(by); byte[] scriptBy = jedis.scriptLoad(by); String sha1 = new String(scriptBy); //直接解析 String sha2 = jedis.scriptLoad("local key1 = KEYS[1]\n" + "local key2 = KEYS[2]\n" + "local argv1 = ARGV[1]\n" + "return \"key1:\"..key1 ..\" key2:\"..key2.. \" argv1:\"..argv1"); System.out.println("sha1 : " + sha1); System.out.println("sha2 : " + sha2); Object obj = jedis.evalsha(sha1, Arrays.asList("value1","value2"), Arrays.asList("value3")); System.out.println("執行結果: "+ obj); }
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
主要內容是以上這些,寫的不好,如果大家發現bug,請@我
20:49:54
最后是自己學習lua的一些筆記,含金量不高
lua簡介
print("Hello lua!")
標示符
關鍵詞
lua數據類型
print(b)
nil
b=10
print(b)
10
lua數據類型的自動轉換
##function
function fun1(n)
if n == 0 then
return 1
else
return n * fun1(n - 1)
end
end
print(fun1(3))
fun2 = fun1
print(fun2(4))
table 與Java不同,Lua是從1開始排序
tab = {"Hello","World","hello","lua"}
for k,v in pairs(tab) do
print(k.." "..v)
end
##for循環
for i=1,10
do print(i)
end
if判斷
--[ 0 為 true ]
if(0)
then
print("0 為 true")
else
print("0 為 false")
end
if(null)
then
print("nil 為 true")
else
print("nil 為 false")
end
