分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分布式鎖;3. 基於ZooKeeper的分布式鎖。本篇博客將介紹第二種方式,基於Redis實現分布式鎖。雖然網上已經有各種介紹Redis分布式鎖實現的博客,然而他們的實現卻有着各種各樣的問題,為了避免誤人子弟,本篇將介紹如何正確地實現Redis分布式鎖。
首先,為了確保分布式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:
-
互斥性。在任意時刻,只有一個客戶端能持有鎖。
-
不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
-
具有容錯性。只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。
-
解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
分布式鎖的簡單實現代碼:
package com.gdut.redis.lock.test1; import java.util.Collections; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class DistributedLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private static void validParam(JedisPool jedisPool, String lockKey, String requestId, int expireTime) { if (null == jedisPool) { throw new IllegalArgumentException("jedisPool obj is null"); } if (null == lockKey || "".equals(lockKey)) { throw new IllegalArgumentException("lock key is blank"); } if (null == requestId || "".equals(requestId)) { throw new IllegalArgumentException("requestId is blank"); } if (expireTime < 0) { throw new IllegalArgumentException("expireTime is not allowed less zero"); } } /** * * @param jedis * @param lockKey * @param requestId * @param expireTime * @return */ public boolean tryLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) { validParam(jedisPool, lockKey, requestId, expireTime); Jedis jedis = null; try { jedis = jedisPool.getResource(); String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } } catch (Exception e) { throw e; } finally { if (null != jedis) { jedis.close(); } } return false; } /** * * @param jedis * @param lockKey * @param requestId * @param expireTime */ public void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) { validParam(jedisPool, lockKey, requestId, expireTime); while (true) { if (tryLock(jedisPool, lockKey, requestId, expireTime)) { System.out.println("lock "+ Thread.currentThread().getName()+ " requestId:" + requestId); return; } } } /** * * @param jedis * @param lockKey * @param requestId * @return */ public boolean unLock(JedisPool jedisPool, String lockKey, String requestId) { validParam(jedisPool, lockKey, requestId, 1); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Jedis jedis = null; try { jedis = jedisPool.getResource(); Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { System.out.println("unlock "+ Thread.currentThread().getName()+ " requestId:" + requestId); return true; } } catch (Exception e) { throw e; } finally { if (null != jedis) { jedis.close(); } } return false; } }
說明:String redis.clients.jedis.Jedis.set(String key, String value, String nxxx, String expx, int time) 方法參數說明
- 其中前面兩個是key,value值;
- nxxx為模式,這里我們設置為NX,意思是說如果key不存在則插入該key對應的value並返回OK,否者什么都不做返回null;
- 參數expx這里我們設置為PX,意思是設置key的過期時間為time 毫秒
通過tryLock方法嘗試獲取鎖,內部是具體調用Redis的set方法,多個線程同時調用tryLock時候會同時調用set方法,但是set方法本身是保證原子性的,對應同一個key來說,多個線程調用set方法時候只有一個線程返回OK,其它線程因為key已經存在會返回null,所以返回OK的線程就相當與獲取到了鎖,其它返回null的線程則相當於獲取鎖失敗。
另外這里我們要保證value(requestId)值唯一是為了保證只有獲取到鎖的線程才能釋放鎖,這個下面釋放鎖時候會講解。
通過lock 方法讓使用tryLock獲取鎖失敗的線程本地自旋轉重試獲取鎖,這類似JUC里面的CAS。
Redis有一個叫做eval的函數,支持Lua腳本執行,並且能夠保證腳本執行的原子性,也就是在執行腳本期間,其它執行redis命令的線程都會被阻塞。這里解鎖時候使用下面腳本:
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
其中keys[1]為unLock方法傳遞的key,argv[1]為unLock方法傳遞的requestId;腳本redis.call(‘get’, KEYS[1])的作用是獲取key對應的value值,這里會返回通過Lock方法傳遞的requetId, 然后看當前傳遞的RequestId是否等於key對應的值,等於則說明當前要釋放鎖的線程就是獲取鎖的線程,則繼續執行redis.call(‘del’, KEYS[1])腳本,刪除key對應的值。
測試剛才實現的分布式鎖
例子中使用50個線程模擬秒殺一個商品,使用–運算符來實現商品減少,從結果有序性就可以看出是否為加鎖狀態。
模擬秒殺服務,在其中配置了jedis線程池,在初始化的時候傳給分布式鎖,供其使用。
package com.gdut.redis.lock.test1; import java.util.UUID; import com.gdut.redis.lock.test1.DistributedLock; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class Service1 { private static JedisPool pool = null; private DistributedLock lock = new DistributedLock(); static { JedisPoolConfig config = new JedisPoolConfig(); // 設置最大連接數 config.setMaxTotal(500); // 設置最大空閑數 config.setMaxIdle(100); // 設置最大等待時間 config.setMaxWaitMillis(1000 * 100); // 在borrow一個jedis實例時,是否需要驗證,若為true,則所有jedis實例均是可用的 config.setTestOnBorrow(true); pool = new JedisPool(config, "127.0.0.1", 6379, 300000); } public void seckill() throws InterruptedException { String requestId = UUID.randomUUID().toString(); lock.lock(pool, "resource", requestId, 3000); lock.unLock(pool, "resource", requestId); } }
模擬線程進行秒殺服務:
package com.gdut.redis.lock.test1; import com.gdut.redis.lock.test1.Service1; public class TaskThread extends Thread { private Service1 service; public TaskThread(Service1 service) { this.service = service; } @Override public void run() { try { synchronized (this) { service.seckill(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Service1 service = new Service1(); for (int i = 0; i < 400; i++) { TaskThread thread = new TaskThread(service); thread.start(); } } }
console結果:
一共800行輸出,lock 和unlock的輸出都是400行,表示400個線程都獲得了鎖和釋放了鎖
總結:
本文使用redis單實例結合redis的set方法和eval函數實現了一個簡單的分布式鎖,但是這個實現還是明顯有問題的。雖然使用set方法設置了超時時間,以避免線程獲取到鎖后redis掛了后鎖沒有被釋放的情況,但是超時時間設置為多少合適那?如果設置太小,可能會存在線程獲取鎖后執行業務邏輯時間大於鎖超時時間,那么就會存在邏輯還沒執行完,鎖已經因為超時自動釋放了,而其他線程可能獲取到鎖,那么之前獲取鎖的線程的業務邏輯的執行就沒有保證原子性。
另外還有一個問題是Lock方法里面是自旋調用tryLock進行重試,這就會導致像JUC中的AtomicLong一樣,在高並發下多個線程競爭同一個資源時候造成大量線程占用cpu進行重試操作。這時候其實可以隨機生成一個等待時間,等時間到后在進行重試,以減少潛在的同時對一個資源進行競爭的並發量。
資料:http://ifeve.com/redis-distributedlock/