目前幾乎很多大型網站及應用都是分布式部署的,分布式場景中的數據一致性問題一直是一個比較重要的話題。分布式的CAP理論告訴我們“任何一個分布式系統都無法同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多只能同時滿足兩項。”所以,很多系統在設計之初就要對這三者做出取舍。在互聯網領域的絕大多數的場景中,都需要犧牲強一致性來換取系統的高可用性,系統往往只需要保證“最終一致性”,只要這個最終時間是在用戶可以接受的范圍內即可。
在很多場景中,我們為了保證數據的最終一致性,需要很多的技術方案來支持,比如分布式事務、分布式鎖等。有的時候,我們需要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,Java中其實提供了很多並發處理相關的API,但是這些API在分布式場景中就無能為力了。也就是說單純的Java Api並不能提供分布式鎖的能力。所以針對分布式鎖的實現目前有多種方案。
針對分布式鎖的實現,目前比較常用的有以下幾種方案:
基於數據庫實現分布式鎖 基於緩存(redis,memcached,tair)實現分布式鎖 基於Zookeeper實現分布式鎖
一、zookeeper中分布式鎖實現原理
(1)、普通節點思路
現在模擬一個使用Zookeeper實現分布式鎖,假設有A,B,C三台客戶端去訪問資源,調用zookeeper獲得鎖。客戶端三個在zookeeper的 /locks節點下創建一個/lock節點,由於節點是唯一性的特性,只有一個人會創建成功,其余兩個創建失敗,會進入監聽/locks節點的變化,如果/locks下子節點/lock節點發生變化,其余兩個可以去拿鎖,這樣是否好呢? 這樣子會導致驚群效應。就是一個觸發使得在短時間呢會觸發大量的watcher事件,但是只有一個客戶端能拿到鎖
--眾人搶,大量watcher事件
(2)、有序節點思路
1、在獲取分布式鎖的時候在locker節點下創建臨時順序節點,釋放鎖的時候刪除該臨時節點。
2、客戶端調用createNode方法在locks下創建臨時順序節點,然后調用getChildren(“locks”)來獲取locks下面的所有子節點,注意此時不用設置任何Watcher。
3、客戶端獲取到所有的子節點path之后,如果發現自己創建的子節點序號最小,那么就認為該客戶端獲取到了鎖。
4、如果發現自己創建的節點並非locks所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,然后對其調用exist()方法,同時對其注冊事件監聽器。
5、之后,讓這個被關注的節點刪除,則客戶端的Watcher會收到相應通知,此時再次判斷自己創建的節點是否是locks子節點中序號最小的,如果是則獲取到了鎖,如果不是則重復以上步驟繼續獲取到比自己小的一個節點並注冊監聽。
二、代碼實現
package com.lf.zookeeper.lock; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /* *實現分布式鎖 */ public class DestributeLock implements Lock,Watcher{ private ZooKeeper zk = null; private String ROOT_LOCK ="/locks";//定義根節點 private String CURRENT_LOCK;//當前鎖 private String WAIT_LOCK ;//等待前一個對象釋放鎖 private CountDownLatch countDownLatch; public DestributeLock() { try { zk= new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, this); //判斷根節點是否存在 Stat stat = zk.exists(ROOT_LOCK, false); if(stat==null){ zk.create(ROOT_LOCK, "1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { if(countDownLatch != null){ this.countDownLatch.countDown(); } } @Override public void lock() { if(tryLock()){ System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",獲取鎖成功!"); return; } try { waitForLock(WAIT_LOCK);//如果沒有獲得鎖,繼續等待 } catch (Exception e) { e.printStackTrace(); } } private boolean waitForLock(String prev) throws Exception, InterruptedException { Stat stat = zk.exists(prev, true); if(stat!=null){ System.out.println(Thread.currentThread().getName()+"->等待"+ROOT_LOCK+prev+"釋放鎖"); countDownLatch = new CountDownLatch(1); countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"->"+"獲得鎖成功!"); } return true; } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } @Override public boolean tryLock() { // TODO Auto-generated method stub try { CURRENT_LOCK= zk.create(ROOT_LOCK+"/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",嘗試競爭鎖!"); //獲取根節點下的所有子節點 List<String> childrens = zk.getChildren(ROOT_LOCK, false); SortedSet<String> sortedSet = new TreeSet();//定義一個有序的集合進行排序 for (String children : childrens) { sortedSet.add(ROOT_LOCK+"/"+children); } //獲取最小的子節點 String firstNode = sortedSet.first(); SortedSet<String> lessthanMe = sortedSet.headSet(CURRENT_LOCK); if(CURRENT_LOCK.equals(firstNode)){//當前節點和最小鎖比較,如果相同,則獲取鎖成功 return true; } if(!lessthanMe.isEmpty()){ WAIT_LOCK = lessthanMe.last();//獲取比當前節點更小的最后一個節點,設置給WAIT_LOCK } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } return false; } @Override public boolean tryLock(long arg0, TimeUnit arg1) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public void unlock() { System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"釋放鎖"); try { zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); } catch (Exception e) { // TODO: handle exception } } }
測試類
package com.lf.zookeeper.lock; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class LockDemo { public static void main(String[] args) throws IOException { CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { new Thread(()->{ try { countDownLatch.await(); DestributeLock destributeLock = new DestributeLock(); destributeLock.lock(); } catch (Exception e) { e.printStackTrace(); } },"Thread-"+i).start(); countDownLatch.countDown(); } System.in.read(); } }
運行結果
Thread-4->/locks/0000000072,嘗試競爭鎖! Thread-5->/locks/0000000073,嘗試競爭鎖! Thread-9->/locks/0000000074,嘗試競爭鎖! Thread-8->/locks/0000000075,嘗試競爭鎖! Thread-10->/locks/0000000076,嘗試競爭鎖! Thread-2->/locks/0000000077,嘗試競爭鎖! Thread-6->/locks/0000000078,嘗試競爭鎖! Thread-3->/locks/0000000079,嘗試競爭鎖! Thread-7->/locks/0000000080,嘗試競爭鎖! Thread-1->/locks/0000000071,獲取鎖成功! Thread-4->等待/locks/locks/0000000071釋放鎖 Thread-5->等待/locks/locks/0000000072釋放鎖 Thread-9->等待/locks/locks/0000000073釋放鎖 Thread-8->等待/locks/locks/0000000074釋放鎖 Thread-10->等待/locks/locks/0000000075釋放鎖 Thread-2->等待/locks/locks/0000000076釋放鎖 Thread-6->等待/locks/locks/0000000077釋放鎖 Thread-3->等待/locks/locks/0000000078釋放鎖 Thread-7->等待/locks/locks/0000000079釋放鎖
手動觸發watcher事件,釋放鎖,delete /locks/0000000071
出現 Thread-4->/locks/0000000072,獲取鎖成功!
三、基於curator的實現分布式鎖
代碼
package com.lf.zookeeper.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; public class CuratorLockDemo { public static void main(String[] args) { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().build(); InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/locks");//關注節點 try { interProcessMutex.acquire();//獲取鎖 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }