基於zookeeper簡單實現分布式鎖


這里利用zookeeper的EPHEMERAL_SEQUENTIAL類型節點及watcher機制,來簡單實現分布式鎖。

主要思想:
1、開啟10個線程,在disLocks節點下各自創建名為sub的EPHEMERAL_SEQUENTIAL節點;
2、獲取disLocks節點下所有子節點,排序,如果自己的節點編號最小,則獲取鎖;
3、否則watch排在自己前面的節點,監聽到其刪除后,進入第2步(重新檢測排序是防止監聽的節點發生連接失效,導致的節點刪除情況);
4、刪除自身sub節點,釋放連接;
 
這里插播下zookeeper的4種節點類型:
[java]  view plain  copy
 
  1. public enum CreateMode {  
  2.      
  3.     /** 
  4.      * 持久節點:節點創建后,會一直存在,不會因客戶端會話失效而刪除; 
  5.      */  
  6.     PERSISTENT (0, false, false),  
  7.   
  8.     /** 
  9.     * 持久順序節點:基本特性與持久節點一致,創建節點的過程中,zookeeper會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名;  
  10.     */  
  11.     PERSISTENT_SEQUENTIAL (2, false, true),  
  12.   
  13.     /** 
  14.      *  臨時節點:客戶端會話失效或連接關閉后,該節點會被自動刪除,且不能再臨時節點下面創建子節點,否則報如下錯:org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException; 
  15.      */  
  16.     EPHEMERAL (1, true, false),  
  17.   
  18.     /** 
  19.      * 臨時順序節點:基本特性與臨時節點一致,創建節點的過程中,zookeeper會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名;  
  20.      */  
  21.     EPHEMERAL_SEQUENTIAL (3, true, true);  
  22.     private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);  
  23.     private boolean ephemeral;  
  24.     private boolean sequential;  
  25.     private int flag;  
  26.     CreateMode(int flag, boolean ephemeral, boolean sequential) {  
  27.         this.flag = flag;  
  28.         this.ephemeral = ephemeral;  
  29.         this.sequential = sequential;  
  30.     }  
  31.     public boolean isEphemeral() {  
  32.         return ephemeral;  
  33.     }  
  34.     public boolean isSequential() {  
  35.         return sequential;  
  36.     }  
  37.     public int toFlag() {  
  38.         return flag;  
  39.     }  
  40.     static public CreateMode fromFlag(int flag) throws KeeperException {  
  41.         switch(flag) {  
  42.         case 0: return CreateMode.PERSISTENT;  
  43.         case 1: return CreateMode.EPHEMERAL;  
  44.         case 2: return CreateMode.PERSISTENT_SEQUENTIAL;  
  45.         case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;  
  46.         default:  
  47.             LOG.error("Received an invalid flag value to convert to a CreateMode");  
  48.             throw new KeeperException.BadArgumentsException();  
  49.         }  
  50.     }  
  51. }  


測試代碼:
[java]  view plain  copy
 
  1. package zookeeper;  
  2. import org.slf4j.Logger;  
  3. import org.slf4j.LoggerFactory;  
  4. import org.apache.zookeeper.*;  
  5. import org.apache.zookeeper.data.Stat;  
  6. import java.util.List;  
  7. import java.io.IOException;  
  8. import java.util.Collections;  
  9. import java.util.concurrent.CountDownLatch;  
  10.   
  11. public class DistributedLock implements Watcher{  
  12.     private int threadId;  
  13.     private ZooKeeper zk = null;  
  14.     private String selfPath;  
  15.     private String waitPath;  
  16.     private String LOG_PREFIX_OF_THREAD;  
  17.     private static final int SESSION_TIMEOUT = 10000;  
  18.     private static final String GROUP_PATH = "/disLocks";  
  19.     private static final String SUB_PATH = "/disLocks/sub";  
  20.     private static final String CONNECTION_STRING = "192.168.*.*:2181";  
  21.       
  22.     private static final int THREAD_NUM = 10;   
  23.     //確保連接zk成功;  
  24.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
  25.     //確保所有線程運行結束;  
  26.     private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);  
  27.     private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class);  
  28.     public DistributedLock(int id) {  
  29.         this.threadId = id;  
  30.         LOG_PREFIX_OF_THREAD = "【第"+threadId+"個線程】";  
  31.     }  
  32.     public static void main(String[] args) {  
  33.         for(int i=0; i < THREAD_NUM; i++){  
  34.             final int threadId = i+1;  
  35.             new Thread(){  
  36.                 @Override  
  37.                 public void run() {  
  38.                     try{  
  39.                         DistributedLock dc = new DistributedLock(threadId);  
  40.                         dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);  
  41.                         //GROUP_PATH不存在的話,由一個線程創建即可;  
  42.                         synchronized (threadSemaphore){  
  43.                             dc.createPath(GROUP_PATH, "該節點由線程" + threadId + "創建", true);  
  44.                         }  
  45.                         dc.getLock();  
  46.                     } catch (Exception e){  
  47.                         LOG.error("【第"+threadId+"個線程】 拋出的異常:");  
  48.                         e.printStackTrace();  
  49.                     }  
  50.                 }  
  51.             }.start();  
  52.         }  
  53.         try {  
  54.             threadSemaphore.await();  
  55.             LOG.info("所有線程運行結束!");  
  56.         } catch (InterruptedException e) {  
  57.             e.printStackTrace();  
  58.         }  
  59.     }  
  60.     /** 
  61.      * 獲取鎖 
  62.      * @return 
  63.      */  
  64.     private void getLock() throws KeeperException, InterruptedException {  
  65.         selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
  66.         LOG.info(LOG_PREFIX_OF_THREAD+"創建鎖路徑:"+selfPath);  
  67.         if(checkMinPath()){  
  68.             getLockSuccess();  
  69.         }  
  70.     }  
  71.     /** 
  72.      * 創建節點 
  73.      * @param path 節點path 
  74.      * @param data 初始數據內容 
  75.      * @return 
  76.      */  
  77.     public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {  
  78.         if(zk.exists(path, needWatch)==null){  
  79.             LOG.info( LOG_PREFIX_OF_THREAD + "節點創建成功, Path: "  
  80.                     + this.zk.create( path,  
  81.                     data.getBytes(),  
  82.                     ZooDefs.Ids.OPEN_ACL_UNSAFE,  
  83.                     CreateMode.PERSISTENT )  
  84.                     + ", content: " + data );  
  85.         }  
  86.         return true;  
  87.     }  
  88.     /** 
  89.      * 創建ZK連接 
  90.      * @param connectString  ZK服務器地址列表 
  91.      * @param sessionTimeout Session超時時間 
  92.      */  
  93.     public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {  
  94.             zk = new ZooKeeper( connectString, sessionTimeout, this);  
  95.             connectedSemaphore.await();  
  96.     }  
  97.     /** 
  98.      * 獲取鎖成功 
  99.     */  
  100.     public void getLockSuccess() throws KeeperException, InterruptedException {  
  101.         if(zk.exists(this.selfPath,false) == null){  
  102.             LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了...");  
  103.             return;  
  104.         }  
  105.         LOG.info(LOG_PREFIX_OF_THREAD + "獲取鎖成功,趕緊干活!");  
  106.         Thread.sleep(2000);  
  107.         LOG.info(LOG_PREFIX_OF_THREAD + "刪除本節點:"+selfPath);  
  108.         zk.delete(this.selfPath, -1);  
  109.         releaseConnection();  
  110.         threadSemaphore.countDown();  
  111.     }  
  112.     /** 
  113.      * 關閉ZK連接 
  114.      */  
  115.     public void releaseConnection() {  
  116.         if ( this.zk !=null ) {  
  117.             try {  
  118.                 this.zk.close();  
  119.             } catch ( InterruptedException e ) {}  
  120.         }  
  121.         LOG.info(LOG_PREFIX_OF_THREAD + "釋放連接");  
  122.     }  
  123.     /** 
  124.      * 檢查自己是不是最小的節點 
  125.      * @return 
  126.      */  
  127.     public boolean checkMinPath() throws KeeperException, InterruptedException {  
  128.          List<String> subNodes = zk.getChildren(GROUP_PATH, false);  
  129.          Collections.sort(subNodes);  
  130.          int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));  
  131.          switch (index){  
  132.              case -1:{  
  133.                  LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了..."+selfPath);  
  134.                  return false;  
  135.              }  
  136.              case 0:{  
  137.                  LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,我果然是老大"+selfPath);  
  138.                  return true;  
  139.              }  
  140.              default:{  
  141.                  this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);  
  142.                  LOG.info(LOG_PREFIX_OF_THREAD+"獲取子節點中,排在我前面的"+waitPath);  
  143.                  try{  
  144.                      zk.getData(waitPath, true, new Stat());  
  145.                      return false;  
  146.                  }catch(KeeperException e){  
  147.                      if(zk.exists(waitPath,false) == null){  
  148.                          LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,排在我前面的"+waitPath+"已失蹤,幸福來得太突然?");  
  149.                          return checkMinPath();  
  150.                      }else{  
  151.                          throw e;  
  152.                      }  
  153.                  }  
  154.              }  
  155.                    
  156.          }  
  157.        
  158.     }  
  159.     @Override  
  160.     public void process(WatchedEvent event) {  
  161.         if(event == null){  
  162.             return;  
  163.         }  
  164.         Event.KeeperState keeperState = event.getState();  
  165.         Event.EventType eventType = event.getType();  
  166.         if ( Event.KeeperState.SyncConnected == keeperState) {  
  167.             if ( Event.EventType.None == eventType ) {  
  168.                 LOG.info( LOG_PREFIX_OF_THREAD + "成功連接上ZK服務器" );  
  169.                 connectedSemaphore.countDown();  
  170.             }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {  
  171.                 LOG.info(LOG_PREFIX_OF_THREAD + "收到情報,排我前面的家伙已掛,我是不是可以出山了?");  
  172.                 try {  
  173.                     if(checkMinPath()){  
  174.                         getLockSuccess();  
  175.                     }  
  176.                 } catch (KeeperException e) {  
  177.                     e.printStackTrace();  
  178.                 } catch (InterruptedException e) {  
  179.                     e.printStackTrace();  
  180.                 }  
  181.             }  
  182.         }else if ( Event.KeeperState.Disconnected == keeperState ) {  
  183.             LOG.info( LOG_PREFIX_OF_THREAD + "與ZK服務器斷開連接" );  
  184.         } else if ( Event.KeeperState.AuthFailed == keeperState ) {  
  185.             LOG.info( LOG_PREFIX_OF_THREAD + "權限檢查失敗" );  
  186.         } else if ( Event.KeeperState.Expired == keeperState ) {  
  187.             LOG.info( LOG_PREFIX_OF_THREAD + "會話失效" );  
  188.         }  
  189.     }  
  190. }  

log配置文件:
[html]  view plain  copy
 
  1. # DEFAULT   
  2. log4j.rootLogger=INFO,CONSOLE  
  3.   
  4. #  
  5. # Log INFO level and above messages to the console  
  6. #  
  7. log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender  
  8. log4j.appender.CONSOLE.Threshold=INFO  
  9. log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout  
  10. log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %m%n  
  11.   
  12.   
  13. log4j.appender.COMMONSTAT=org.apache.log4j.DailyRollingFileAppender  
  14. log4j.appender.COMMONSTAT.Threshold=INFO  
  15. log4j.appender.COMMONSTAT.File=/home/zookeeper/zookeeper-test-agent/logs/test.log  
  16. log4j.appender.COMMONSTAT.DatePattern='.'yyyy-MM-dd  
  17.   
  18. log4j.appender.COMMONSTAT.layout=org.apache.log4j.PatternLayout  
  19. log4j.appender.COMMONSTAT.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] - %m%n  
  20.   
  21. log4j.logger.org.displaytag=WARN  
  22. log4j.logger.org.apache.zookeeper=ERROR  
  23. log4j.logger.org.springframework=WARN  
  24. log4j.logger.org.I0Itec=WARN  
  25. log4j.logger.commonStat=INFO,COMMONSTAT  

運行結果:
[plain]  view plain  copy
 
  1. 2014-11-19 11:34:10,894 - 【第9個線程】成功連接上ZK服務器  
  2. 2014-11-19 11:34:10,895 - 【第8個線程】成功連接上ZK服務器  
  3. 2014-11-19 11:34:10,894 - 【第1個線程】成功連接上ZK服務器  
  4. 2014-11-19 11:34:10,894 - 【第7個線程】成功連接上ZK服務器  
  5. 2014-11-19 11:34:10,894 - 【第4個線程】成功連接上ZK服務器  
  6. 2014-11-19 11:34:10,895 - 【第5個線程】成功連接上ZK服務器  
  7. 2014-11-19 11:34:10,896 - 【第2個線程】成功連接上ZK服務器  
  8. 2014-11-19 11:34:10,894 - 【第10個線程】成功連接上ZK服務器  
  9. 2014-11-19 11:34:10,894 - 【第3個線程】成功連接上ZK服務器  
  10. 2014-11-19 11:34:10,895 - 【第6個線程】成功連接上ZK服務器  
  11. 2014-11-19 11:34:10,910 - 【第9個線程】節點創建成功, Path: /disLocks, content: 該節點由線程9創建  
  12. 2014-11-19 11:34:10,912 - 【第9個線程】創建鎖路徑:/disLocks/sub0000000000  
  13. 2014-11-19 11:34:10,917 - 【第6個線程】創建鎖路徑:/disLocks/sub0000000001  
  14. 2014-11-19 11:34:10,917 - 【第9個線程】子節點中,我果然是老大/disLocks/sub0000000000  
  15. 2014-11-19 11:34:10,921 - 【第3個線程】創建鎖路徑:/disLocks/sub0000000002  
  16. 2014-11-19 11:34:10,922 - 【第6個線程】獲取子節點中,排在我前面的/disLocks/sub0000000000  
  17. 2014-11-19 11:34:10,923 - 【第9個線程】獲取鎖成功,趕緊干活!  
  18. 2014-11-19 11:34:10,924 - 【第10個線程】創建鎖路徑:/disLocks/sub0000000003  
  19. 2014-11-19 11:34:10,924 - 【第3個線程】獲取子節點中,排在我前面的/disLocks/sub0000000001  
  20. 2014-11-19 11:34:10,928 - 【第10個線程】獲取子節點中,排在我前面的/disLocks/sub0000000002  
  21. 2014-11-19 11:34:10,929 - 【第1個線程】創建鎖路徑:/disLocks/sub0000000004  
  22. 2014-11-19 11:34:10,932 - 【第5個線程】創建鎖路徑:/disLocks/sub0000000005  
  23. 2014-11-19 11:34:10,935 - 【第1個線程】獲取子節點中,排在我前面的/disLocks/sub0000000003  
  24. 2014-11-19 11:34:10,936 - 【第2個線程】創建鎖路徑:/disLocks/sub0000000006  
  25. 2014-11-19 11:34:10,936 - 【第5個線程】獲取子節點中,排在我前面的/disLocks/sub0000000004  
  26. 2014-11-19 11:34:10,940 - 【第4個線程】創建鎖路徑:/disLocks/sub0000000007  
  27. 2014-11-19 11:34:10,941 - 【第2個線程】獲取子節點中,排在我前面的/disLocks/sub0000000005  
  28. 2014-11-19 11:34:10,943 - 【第8個線程】創建鎖路徑:/disLocks/sub0000000008  
  29. 2014-11-19 11:34:10,944 - 【第4個線程】獲取子節點中,排在我前面的/disLocks/sub0000000006  
  30. 2014-11-19 11:34:10,945 - 【第7個線程】創建鎖路徑:/disLocks/sub0000000009  
  31. 2014-11-19 11:34:10,946 - 【第8個線程】獲取子節點中,排在我前面的/disLocks/sub0000000007  
  32. 2014-11-19 11:34:10,947 - 【第7個線程】獲取子節點中,排在我前面的/disLocks/sub0000000008  
  33. 2014-11-19 11:34:12,923 - 【第9個線程】刪除本節點:/disLocks/sub0000000000  
  34. 2014-11-19 11:34:12,926 - 【第6個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  35. 2014-11-19 11:34:12,928 - 【第6個線程】子節點中,我果然是老大/disLocks/sub0000000001  
  36. 2014-11-19 11:34:12,930 - 【第9個線程】釋放連接  
  37. 2014-11-19 11:34:12,930 - 【第6個線程】獲取鎖成功,趕緊干活!  
  38. 2014-11-19 11:34:14,930 - 【第6個線程】刪除本節點:/disLocks/sub0000000001  
  39. 2014-11-19 11:34:14,937 - 【第3個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  40. 2014-11-19 11:34:14,941 - 【第3個線程】子節點中,我果然是老大/disLocks/sub0000000002  
  41. 2014-11-19 11:34:14,943 - 【第6個線程】釋放連接  
  42. 2014-11-19 11:34:14,946 - 【第3個線程】獲取鎖成功,趕緊干活!  
  43. 2014-11-19 11:34:16,946 - 【第3個線程】刪除本節點:/disLocks/sub0000000002  
  44. 2014-11-19 11:34:16,949 - 【第10個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  45. 2014-11-19 11:34:16,951 - 【第10個線程】子節點中,我果然是老大/disLocks/sub0000000003  
  46. 2014-11-19 11:34:16,953 - 【第3個線程】釋放連接  
  47. 2014-11-19 11:34:16,953 - 【第10個線程】獲取鎖成功,趕緊干活!  
  48. 2014-11-19 11:34:18,953 - 【第10個線程】刪除本節點:/disLocks/sub0000000003  
  49. 2014-11-19 11:34:18,957 - 【第1個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  50. 2014-11-19 11:34:18,960 - 【第10個線程】釋放連接  
  51. 2014-11-19 11:34:18,961 - 【第1個線程】子節點中,我果然是老大/disLocks/sub0000000004  
  52. 2014-11-19 11:34:18,964 - 【第1個線程】獲取鎖成功,趕緊干活!  
  53. 2014-11-19 11:34:20,964 - 【第1個線程】刪除本節點:/disLocks/sub0000000004  
  54. 2014-11-19 11:34:20,967 - 【第5個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  55. 2014-11-19 11:34:20,969 - 【第5個線程】子節點中,我果然是老大/disLocks/sub0000000005  
  56. 2014-11-19 11:34:20,971 - 【第1個線程】釋放連接  
  57. 2014-11-19 11:34:20,971 - 【第5個線程】獲取鎖成功,趕緊干活!  
  58. 2014-11-19 11:34:22,971 - 【第5個線程】刪除本節點:/disLocks/sub0000000005  
  59. 2014-11-19 11:34:22,974 - 【第2個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  60. 2014-11-19 11:34:22,978 - 【第2個線程】子節點中,我果然是老大/disLocks/sub0000000006  
  61. 2014-11-19 11:34:22,979 - 【第5個線程】釋放連接  
  62. 2014-11-19 11:34:22,981 - 【第2個線程】獲取鎖成功,趕緊干活!  
  63. 2014-11-19 11:34:24,981 - 【第2個線程】刪除本節點:/disLocks/sub0000000006  
  64. 2014-11-19 11:34:24,985 - 【第4個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  65. 2014-11-19 11:34:24,989 - 【第2個線程】釋放連接  
  66. 2014-11-19 11:34:24,989 - 【第4個線程】子節點中,我果然是老大/disLocks/sub0000000007  
  67. 2014-11-19 11:34:24,995 - 【第4個線程】獲取鎖成功,趕緊干活!  
  68. 2014-11-19 11:34:26,995 - 【第4個線程】刪除本節點:/disLocks/sub0000000007  
  69. 2014-11-19 11:34:26,998 - 【第8個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  70. 2014-11-19 11:34:27,000 - 【第8個線程】子節點中,我果然是老大/disLocks/sub0000000008  
  71. 2014-11-19 11:34:27,004 - 【第8個線程】獲取鎖成功,趕緊干活!  
  72. 2014-11-19 11:34:27,004 - 【第4個線程】釋放連接  
  73. 2014-11-19 11:34:29,004 - 【第8個線程】刪除本節點:/disLocks/sub0000000008  
  74. 2014-11-19 11:34:29,007 - 【第7個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?  
  75. 2014-11-19 11:34:29,009 - 【第7個線程】子節點中,我果然是老大/disLocks/sub0000000009  
  76. 2014-11-19 11:34:29,010 - 【第8個線程】釋放連接  
  77. 2014-11-19 11:34:29,011 - 【第7個線程】獲取鎖成功,趕緊干活!  
  78. 2014-11-19 11:34:31,011 - 【第7個線程】刪除本節點:/disLocks/sub0000000009  
  79. 2014-11-19 11:34:31,017 - 【第7個線程】釋放連接  
  80. 2014-11-19 11:34:31,017 - 所有線程運行結束!  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM