Redis實現分布式鎖
1.根據lockKey區進行setnx(set not exist,如果key值為空,則正常設置,返回1,否則不會進行設置並返回0)操作,如果設置成功,表示已經獲得鎖,否則並沒有獲取鎖。
2.如果沒有獲得鎖,去Redis上拿到該key對應的值,在該key上我們存儲一個時間戳(用毫秒表示,t1),為了避免死鎖以及其他客戶端占用該鎖超過一定時間(5秒),使用該客戶端當前時間戳,與存儲的時間戳作比較。
3.如果沒有超過該key的使用時限,返回false,表示其他人正在占用該key,不能強制使用;如果已經超過時限,那我們就可以進行解鎖,使用我們的時間戳來代替該字段的值。
4.但是如果在setnx失敗后,get該值卻無法拿到該字段時,說明操作之前該鎖已經被釋放,這個時候,最好的辦法就是重新執行一遍setnx方法來獲取其值以獲得該鎖。
釋放鎖:刪除redis中key
1 public class RedisKeyLock { 2 private static Logger logger = Logger.getLogger(RedisKeyLock.class); 3 private final static long ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000; 4 private final static int EXPIRE_IN_SECOND = 5;//鎖失效時間 5 private final static long WAIT_INTERVAL_IN_MS = 100; 6 private static RedisKeyLock lock; 7 private JedisPool jedisPool; 8 private RedisKeyLock(JedisPool pool){ 9 this.jedisPool = pool; 10 } 11 public static RedisKeyLock getInstance(JedisPool pool){ 12 if(lock == null){ 13 lock = new RedisKeyLock(pool); 14 } 15 return lock; 16 } 17 18 public void lock(final String redisKey) { 19 Jedis resource = null; 20 try { 21 long now = System.currentTimeMillis(); 22 resource = jedisPool.getResource(); 23 long timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS; 24 boolean flag = false; 25 while (true) { 26 String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000); 27 long ret = resource.setnx(redisKey, expireAt); 28 if (ret == 1) {//已獲取鎖 29 flag = true; 30 break; 31 } else {//未獲取鎖,重試獲取鎖 32 String oldExpireAt = resource.get(redisKey); 33 if (oldExpireAt != null && Long.parseLong(oldExpireAt) < now) { 34 oldExpireAt = resource.getSet(redisKey, expireAt); 35 if (Long.parseLong(oldExpireAt) < now) { 36 flag = true; 37 break; 38 } 39 } 40 } 41 if (timeoutAt < now) { 42 break; 43 } 44 TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS); 45 } 46 if (!flag) { 47 throw new RuntimeException("canot acquire lock now ..."); 48 } 49 } catch (JedisException je) { 50 logger.error("lock", je); 51 je.printStackTrace(); 52 if (resource != null) { 53 jedisPool.returnBrokenResource(resource); 54 } 55 } catch (Exception e) { 56 e.printStackTrace(); 57 logger.error("lock", e); 58 } finally { 59 if (resource != null) { 60 jedisPool.returnResource(resource); 61 } 62 } 63 } 64 public boolean unlock(final String redisKey) { 65 Jedis resource = null; 66 try { 67 resource = jedisPool.getResource(); 68 resource.del(redisKey); 69 return true; 70 } catch (JedisException je) { 71 je.printStackTrace(); 72 if (resource != null) { 73 jedisPool.returnBrokenResource(resource); 74 } 75 return false; 76 } catch (Exception e) { 77 logger.error("lock", e); 78 return false; 79 } finally { 80 if (resource != null) { 81 jedisPool.returnResource(resource); 82 } 83 } 84 } 85 }
另一個版本:
SET my:lock 隨機值 NX PX 30000
這個的NX的意思就是只有key不存在的時候才會設置成功,PX 30000的意思是30秒后鎖自動釋放。別人創建的時候如果發現已經有了就不能加鎖了。
釋放鎖就是刪除key,但是一般可以用lua腳本刪除,判斷value一樣才刪除
為啥要用隨機值呢?因為如果某個客戶端獲取到了鎖,但是阻塞了很長時間才執行完,此時可能已經自動釋放鎖了,此時可能別的客戶端已經獲取到了這個鎖,要是你這個時候直接刪除key的話會有問題,所以得用隨機值加上面的lua腳本來釋放鎖。(就是根據這個隨機值來判斷這個鎖是不是自己加的)
如果是Redis是單機,會有問題。因為如果是普通的redis單實例,那就是單點故障。單節點掛了會導致鎖失效。
如果是redis普通主從,那redis主從異步復制,如果主節點掛了,key還沒同步到從節點,此時從節點切換為主節點,別人就會拿到鎖。
RedLock算法
這個場景是假設有一個redis cluster,有5個redis master實例。然后執行如下步驟獲取一把鎖:
獲取當前時間戳,單位是毫秒
跟上面類似,輪流嘗試在每個master節點上創建鎖,過期時間較短,一般就幾十毫秒
嘗試在大多數節點上建立一個鎖,比如5個節點就要求是3個節點(n / 2 +1)
客戶端計算建立好鎖的時間,如果建立鎖的時間小於超時時間,就算建立成功了
要是鎖建立失敗了,那么就依次刪除這個鎖
只要別人建立了一把分布式鎖,你就得不斷輪詢去嘗試獲取鎖

Zookeeper實現分布式鎖
基於臨時順序節點:
1.客戶端調用create()方法創建名為“locknode/guid-lock-”的節點,需要注意的是,這里節點的創建類型需要設置為EPHEMERAL_SEQUENTIAL。
2.客戶端調用getChildren(“locknode”)方法來獲取所有已經創建的子節點。
3.客戶端獲取到所有子節點path之后,如果發現自己在步驟1中創建的節點是所有節點中序號最小的,那么就認為這個客戶端獲得了鎖。
4.如果創建的節點不是所有節點中序號最小的,那么則監視比自己創建節點的序列號小的最大的節點,進入等待。直到下次監視的子節點變更的時候,再進行子節點的獲取,判斷是否獲取鎖。
釋放鎖的過程相對比較簡單,就是刪除自己創建的那個子節點即可。
不太嚴謹的代碼:
1 public class ZooKeeperDistributedLock implements Watcher{ 2 3 private ZooKeeper zk; 4 private String locksRoot= "/locks"; 5 private String productId; 6 private String waitNode; 7 private String lockNode; 8 private CountDownLatch latch; 9 private CountDownLatch connectedLatch = new CountDownLatch(1); 10 private int sessionTimeout = 30000; 11 12 public ZooKeeperDistributedLock(String productId){ 13 this.productId = productId; 14 try { 15 String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181"; 16 zk = new ZooKeeper(address, sessionTimeout, this); 17 connectedLatch.await(); 18 } catch (IOException e) { 19 throw new LockException(e); 20 } catch (KeeperException e) { 21 throw new LockException(e); 22 } catch (InterruptedException e) { 23 throw new LockException(e); 24 } 25 } 26 27 public void process(WatchedEvent event) { 28 if(event.getState()==KeeperState.SyncConnected){ 29 connectedLatch.countDown(); 30 return; 31 } 32 33 if(this.latch != null) { 34 this.latch.countDown(); 35 } 36 } 37 38 public void acquireDistributedLock() { 39 try { 40 if(this.tryLock()){ 41 return; 42 } 43 else{ 44 waitForLock(waitNode, sessionTimeout); 45 } 46 } catch (KeeperException e) { 47 throw new LockException(e); 48 } catch (InterruptedException e) { 49 throw new LockException(e); 50 } 51 } 52 53 public boolean tryLock() { 54 try { 55 // 傳入進去的locksRoot + “/” + productId 56 // 假設productId代表了一個商品id,比如說1 57 // locksRoot = locks 58 // /locks/10000000000,/locks/10000000001,/locks/10000000002 59 lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 60 61 // 看看剛創建的節點是不是最小的節點 62 // locks:10000000000,10000000001,10000000002 63 List<String> locks = zk.getChildren(locksRoot, false); 64 Collections.sort(locks); 65 66 if(lockNode.equals(locksRoot+"/"+ locks.get(0))){ 67 //如果是最小的節點,則表示取得鎖 68 return true; 69 } 70 71 //如果不是最小的節點,找到比自己小1的節點 72 int previousLockIndex = -1; 73 for(int i = 0; i < locks.size(); i++) { 74 if(lockNode.equals(locksRoot + “/” + locks.get(i))) { 75 previousLockIndex = i - 1; 76 break; 77 } 78 } 79 80 this.waitNode = locks.get(previousLockIndex); 81 } catch (KeeperException e) { 82 throw new LockException(e); 83 } catch (InterruptedException e) { 84 throw new LockException(e); 85 } 86 return false; 87 } 88 89 private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException { 90 Stat stat = zk.exists(locksRoot + "/" + waitNode, true); 91 if(stat != null){ 92 this.latch = new CountDownLatch(1); 93 this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; 94 } 95 return true; 96 } 97 98 public void unlock() { 99 try { 100 // 刪除/locks/10000000000節點 101 // 刪除/locks/10000000001節點 102 System.out.println("unlock " + lockNode); 103 zk.delete(lockNode,-1); 104 lockNode = null; 105 zk.close(); 106 } catch (InterruptedException e) { 107 e.printStackTrace(); 108 } catch (KeeperException e) { 109 e.printStackTrace(); 110 } 111 } 112 113 public class LockException extends RuntimeException { 114 private static final long serialVersionUID = 1L; 115 public LockException(String e){ 116 super(e); 117 } 118 public LockException(Exception e){ 119 super(e); 120 } 121 } 122 123 // 如果有一把鎖,被多個人給競爭,此時多個人會排隊,第一個拿到鎖的人會執行,然后釋放鎖,后面的每個人都會去監聽排在自己前面的那個人創建的node上,一旦某個人釋放了鎖,排在自己后面的人就會被zookeeper給通知,一旦被通知了之后,就ok了,自己就獲取到了鎖,就可以執行代碼了 124 125 }
另一個版本:
zk分布式鎖,就是某個節點嘗試創建臨時znode,此時創建成功了就獲取了這個鎖;這個時候別的客戶端來創建鎖會失敗,只能注冊個監聽器監聽這個鎖。
釋放鎖就是刪除這個znode,一旦釋放掉就會通知客戶端,然后有一個等待着的客戶端就可以再次重新加鎖。
redis分布式鎖,其實需要自己不斷去嘗試獲取鎖,比較消耗性能
zk分布式鎖,獲取不到鎖,注冊個監聽器即可,不需要不斷主動嘗試獲取鎖,性能開銷較小
另外一點就是,如果是redis獲取鎖的那個客戶端bug了或者掛了,那么只能等待超時時間之后才能釋放鎖;而zk的話,因為創建的是臨時znode,只要客戶端掛了,znode就沒了,此時就自動釋放鎖
