這里利用zookeeper的EPHEMERAL_SEQUENTIAL類型節點及watcher機制,來簡單實現分布式鎖。
主要思想:
1、開啟10個線程,在disLocks節點下各自創建名為sub的EPHEMERAL_SEQUENTIAL節點;
2、獲取disLocks節點下所有子節點,排序,如果自己的節點編號最小,則獲取鎖;
3、否則watch排在自己前面的節點,監聽到其刪除后,進入第2步(重新檢測排序是防止監聽的節點發生連接失效,導致的節點刪除情況);
4、刪除自身sub節點,釋放連接;
這里插播下zookeeper的4種節點類型:
- public enum CreateMode {
- /**
- * 持久節點:節點創建后,會一直存在,不會因客戶端會話失效而刪除;
- */
- PERSISTENT (0, false, false),
- /**
- * 持久順序節點:基本特性與持久節點一致,創建節點的過程中,zookeeper會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名;
- */
- PERSISTENT_SEQUENTIAL (2, false, true),
- /**
- * 臨時節點:客戶端會話失效或連接關閉后,該節點會被自動刪除,且不能再臨時節點下面創建子節點,否則報如下錯:org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException;
- */
- EPHEMERAL (1, true, false),
- /**
- * 臨時順序節點:基本特性與臨時節點一致,創建節點的過程中,zookeeper會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名;
- */
- EPHEMERAL_SEQUENTIAL (3, true, true);
- private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);
- private boolean ephemeral;
- private boolean sequential;
- private int flag;
- CreateMode(int flag, boolean ephemeral, boolean sequential) {
- this.flag = flag;
- this.ephemeral = ephemeral;
- this.sequential = sequential;
- }
- public boolean isEphemeral() {
- return ephemeral;
- }
- public boolean isSequential() {
- return sequential;
- }
- public int toFlag() {
- return flag;
- }
- static public CreateMode fromFlag(int flag) throws KeeperException {
- switch(flag) {
- case 0: return CreateMode.PERSISTENT;
- case 1: return CreateMode.EPHEMERAL;
- case 2: return CreateMode.PERSISTENT_SEQUENTIAL;
- case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;
- default:
- LOG.error("Received an invalid flag value to convert to a CreateMode");
- throw new KeeperException.BadArgumentsException();
- }
- }
- }
測試代碼:
- package zookeeper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import java.util.List;
- import java.io.IOException;
- import java.util.Collections;
- import java.util.concurrent.CountDownLatch;
- public class DistributedLock implements Watcher{
- private int threadId;
- private ZooKeeper zk = null;
- private String selfPath;
- private String waitPath;
- private String LOG_PREFIX_OF_THREAD;
- private static final int SESSION_TIMEOUT = 10000;
- private static final String GROUP_PATH = "/disLocks";
- private static final String SUB_PATH = "/disLocks/sub";
- private static final String CONNECTION_STRING = "192.168.*.*:2181";
- private static final int THREAD_NUM = 10;
- //確保連接zk成功;
- private CountDownLatch connectedSemaphore = new CountDownLatch(1);
- //確保所有線程運行結束;
- private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
- private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class);
- public DistributedLock(int id) {
- this.threadId = id;
- LOG_PREFIX_OF_THREAD = "【第"+threadId+"個線程】";
- }
- public static void main(String[] args) {
- for(int i=0; i < THREAD_NUM; i++){
- final int threadId = i+1;
- new Thread(){
- @Override
- public void run() {
- try{
- DistributedLock dc = new DistributedLock(threadId);
- dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
- //GROUP_PATH不存在的話,由一個線程創建即可;
- synchronized (threadSemaphore){
- dc.createPath(GROUP_PATH, "該節點由線程" + threadId + "創建", true);
- }
- dc.getLock();
- } catch (Exception e){
- LOG.error("【第"+threadId+"個線程】 拋出的異常:");
- e.printStackTrace();
- }
- }
- }.start();
- }
- try {
- threadSemaphore.await();
- LOG.info("所有線程運行結束!");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /**
- * 獲取鎖
- * @return
- */
- private void getLock() throws KeeperException, InterruptedException {
- selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- LOG.info(LOG_PREFIX_OF_THREAD+"創建鎖路徑:"+selfPath);
- if(checkMinPath()){
- getLockSuccess();
- }
- }
- /**
- * 創建節點
- * @param path 節點path
- * @param data 初始數據內容
- * @return
- */
- public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {
- if(zk.exists(path, needWatch)==null){
- LOG.info( LOG_PREFIX_OF_THREAD + "節點創建成功, Path: "
- + this.zk.create( path,
- data.getBytes(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT )
- + ", content: " + data );
- }
- return true;
- }
- /**
- * 創建ZK連接
- * @param connectString ZK服務器地址列表
- * @param sessionTimeout Session超時時間
- */
- public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {
- zk = new ZooKeeper( connectString, sessionTimeout, this);
- connectedSemaphore.await();
- }
- /**
- * 獲取鎖成功
- */
- public void getLockSuccess() throws KeeperException, InterruptedException {
- if(zk.exists(this.selfPath,false) == null){
- LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了...");
- return;
- }
- LOG.info(LOG_PREFIX_OF_THREAD + "獲取鎖成功,趕緊干活!");
- Thread.sleep(2000);
- LOG.info(LOG_PREFIX_OF_THREAD + "刪除本節點:"+selfPath);
- zk.delete(this.selfPath, -1);
- releaseConnection();
- threadSemaphore.countDown();
- }
- /**
- * 關閉ZK連接
- */
- public void releaseConnection() {
- if ( this.zk !=null ) {
- try {
- this.zk.close();
- } catch ( InterruptedException e ) {}
- }
- LOG.info(LOG_PREFIX_OF_THREAD + "釋放連接");
- }
- /**
- * 檢查自己是不是最小的節點
- * @return
- */
- public boolean checkMinPath() throws KeeperException, InterruptedException {
- List<String> subNodes = zk.getChildren(GROUP_PATH, false);
- Collections.sort(subNodes);
- int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));
- switch (index){
- case -1:{
- LOG.error(LOG_PREFIX_OF_THREAD+"本節點已不在了..."+selfPath);
- return false;
- }
- case 0:{
- LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,我果然是老大"+selfPath);
- return true;
- }
- default:{
- this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);
- LOG.info(LOG_PREFIX_OF_THREAD+"獲取子節點中,排在我前面的"+waitPath);
- try{
- zk.getData(waitPath, true, new Stat());
- return false;
- }catch(KeeperException e){
- if(zk.exists(waitPath,false) == null){
- LOG.info(LOG_PREFIX_OF_THREAD+"子節點中,排在我前面的"+waitPath+"已失蹤,幸福來得太突然?");
- return checkMinPath();
- }else{
- throw e;
- }
- }
- }
- }
- }
- @Override
- public void process(WatchedEvent event) {
- if(event == null){
- return;
- }
- Event.KeeperState keeperState = event.getState();
- Event.EventType eventType = event.getType();
- if ( Event.KeeperState.SyncConnected == keeperState) {
- if ( Event.EventType.None == eventType ) {
- LOG.info( LOG_PREFIX_OF_THREAD + "成功連接上ZK服務器" );
- connectedSemaphore.countDown();
- }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
- LOG.info(LOG_PREFIX_OF_THREAD + "收到情報,排我前面的家伙已掛,我是不是可以出山了?");
- try {
- if(checkMinPath()){
- getLockSuccess();
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }else if ( Event.KeeperState.Disconnected == keeperState ) {
- LOG.info( LOG_PREFIX_OF_THREAD + "與ZK服務器斷開連接" );
- } else if ( Event.KeeperState.AuthFailed == keeperState ) {
- LOG.info( LOG_PREFIX_OF_THREAD + "權限檢查失敗" );
- } else if ( Event.KeeperState.Expired == keeperState ) {
- LOG.info( LOG_PREFIX_OF_THREAD + "會話失效" );
- }
- }
- }
log配置文件:
- # DEFAULT
- log4j.rootLogger=INFO,CONSOLE
- #
- # Log INFO level and above messages to the console
- #
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.Threshold=INFO
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %m%n
- log4j.appender.COMMONSTAT=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.COMMONSTAT.Threshold=INFO
- log4j.appender.COMMONSTAT.File=/home/zookeeper/zookeeper-test-agent/logs/test.log
- log4j.appender.COMMONSTAT.DatePattern='.'yyyy-MM-dd
- log4j.appender.COMMONSTAT.layout=org.apache.log4j.PatternLayout
- log4j.appender.COMMONSTAT.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] - %m%n
- log4j.logger.org.displaytag=WARN
- log4j.logger.org.apache.zookeeper=ERROR
- log4j.logger.org.springframework=WARN
- log4j.logger.org.I0Itec=WARN
- log4j.logger.commonStat=INFO,COMMONSTAT
運行結果:
- 2014-11-19 11:34:10,894 - 【第9個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,895 - 【第8個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,894 - 【第1個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,894 - 【第7個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,894 - 【第4個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,895 - 【第5個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,896 - 【第2個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,894 - 【第10個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,894 - 【第3個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,895 - 【第6個線程】成功連接上ZK服務器
- 2014-11-19 11:34:10,910 - 【第9個線程】節點創建成功, Path: /disLocks, content: 該節點由線程9創建
- 2014-11-19 11:34:10,912 - 【第9個線程】創建鎖路徑:/disLocks/sub0000000000
- 2014-11-19 11:34:10,917 - 【第6個線程】創建鎖路徑:/disLocks/sub0000000001
- 2014-11-19 11:34:10,917 - 【第9個線程】子節點中,我果然是老大/disLocks/sub0000000000
- 2014-11-19 11:34:10,921 - 【第3個線程】創建鎖路徑:/disLocks/sub0000000002
- 2014-11-19 11:34:10,922 - 【第6個線程】獲取子節點中,排在我前面的/disLocks/sub0000000000
- 2014-11-19 11:34:10,923 - 【第9個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:10,924 - 【第10個線程】創建鎖路徑:/disLocks/sub0000000003
- 2014-11-19 11:34:10,924 - 【第3個線程】獲取子節點中,排在我前面的/disLocks/sub0000000001
- 2014-11-19 11:34:10,928 - 【第10個線程】獲取子節點中,排在我前面的/disLocks/sub0000000002
- 2014-11-19 11:34:10,929 - 【第1個線程】創建鎖路徑:/disLocks/sub0000000004
- 2014-11-19 11:34:10,932 - 【第5個線程】創建鎖路徑:/disLocks/sub0000000005
- 2014-11-19 11:34:10,935 - 【第1個線程】獲取子節點中,排在我前面的/disLocks/sub0000000003
- 2014-11-19 11:34:10,936 - 【第2個線程】創建鎖路徑:/disLocks/sub0000000006
- 2014-11-19 11:34:10,936 - 【第5個線程】獲取子節點中,排在我前面的/disLocks/sub0000000004
- 2014-11-19 11:34:10,940 - 【第4個線程】創建鎖路徑:/disLocks/sub0000000007
- 2014-11-19 11:34:10,941 - 【第2個線程】獲取子節點中,排在我前面的/disLocks/sub0000000005
- 2014-11-19 11:34:10,943 - 【第8個線程】創建鎖路徑:/disLocks/sub0000000008
- 2014-11-19 11:34:10,944 - 【第4個線程】獲取子節點中,排在我前面的/disLocks/sub0000000006
- 2014-11-19 11:34:10,945 - 【第7個線程】創建鎖路徑:/disLocks/sub0000000009
- 2014-11-19 11:34:10,946 - 【第8個線程】獲取子節點中,排在我前面的/disLocks/sub0000000007
- 2014-11-19 11:34:10,947 - 【第7個線程】獲取子節點中,排在我前面的/disLocks/sub0000000008
- 2014-11-19 11:34:12,923 - 【第9個線程】刪除本節點:/disLocks/sub0000000000
- 2014-11-19 11:34:12,926 - 【第6個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:12,928 - 【第6個線程】子節點中,我果然是老大/disLocks/sub0000000001
- 2014-11-19 11:34:12,930 - 【第9個線程】釋放連接
- 2014-11-19 11:34:12,930 - 【第6個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:14,930 - 【第6個線程】刪除本節點:/disLocks/sub0000000001
- 2014-11-19 11:34:14,937 - 【第3個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:14,941 - 【第3個線程】子節點中,我果然是老大/disLocks/sub0000000002
- 2014-11-19 11:34:14,943 - 【第6個線程】釋放連接
- 2014-11-19 11:34:14,946 - 【第3個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:16,946 - 【第3個線程】刪除本節點:/disLocks/sub0000000002
- 2014-11-19 11:34:16,949 - 【第10個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:16,951 - 【第10個線程】子節點中,我果然是老大/disLocks/sub0000000003
- 2014-11-19 11:34:16,953 - 【第3個線程】釋放連接
- 2014-11-19 11:34:16,953 - 【第10個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:18,953 - 【第10個線程】刪除本節點:/disLocks/sub0000000003
- 2014-11-19 11:34:18,957 - 【第1個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:18,960 - 【第10個線程】釋放連接
- 2014-11-19 11:34:18,961 - 【第1個線程】子節點中,我果然是老大/disLocks/sub0000000004
- 2014-11-19 11:34:18,964 - 【第1個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:20,964 - 【第1個線程】刪除本節點:/disLocks/sub0000000004
- 2014-11-19 11:34:20,967 - 【第5個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:20,969 - 【第5個線程】子節點中,我果然是老大/disLocks/sub0000000005
- 2014-11-19 11:34:20,971 - 【第1個線程】釋放連接
- 2014-11-19 11:34:20,971 - 【第5個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:22,971 - 【第5個線程】刪除本節點:/disLocks/sub0000000005
- 2014-11-19 11:34:22,974 - 【第2個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:22,978 - 【第2個線程】子節點中,我果然是老大/disLocks/sub0000000006
- 2014-11-19 11:34:22,979 - 【第5個線程】釋放連接
- 2014-11-19 11:34:22,981 - 【第2個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:24,981 - 【第2個線程】刪除本節點:/disLocks/sub0000000006
- 2014-11-19 11:34:24,985 - 【第4個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:24,989 - 【第2個線程】釋放連接
- 2014-11-19 11:34:24,989 - 【第4個線程】子節點中,我果然是老大/disLocks/sub0000000007
- 2014-11-19 11:34:24,995 - 【第4個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:26,995 - 【第4個線程】刪除本節點:/disLocks/sub0000000007
- 2014-11-19 11:34:26,998 - 【第8個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:27,000 - 【第8個線程】子節點中,我果然是老大/disLocks/sub0000000008
- 2014-11-19 11:34:27,004 - 【第8個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:27,004 - 【第4個線程】釋放連接
- 2014-11-19 11:34:29,004 - 【第8個線程】刪除本節點:/disLocks/sub0000000008
- 2014-11-19 11:34:29,007 - 【第7個線程】收到情報,排我前面的家伙已掛,我是不是可以出山了?
- 2014-11-19 11:34:29,009 - 【第7個線程】子節點中,我果然是老大/disLocks/sub0000000009
- 2014-11-19 11:34:29,010 - 【第8個線程】釋放連接
- 2014-11-19 11:34:29,011 - 【第7個線程】獲取鎖成功,趕緊干活!
- 2014-11-19 11:34:31,011 - 【第7個線程】刪除本節點:/disLocks/sub0000000009
- 2014-11-19 11:34:31,017 - 【第7個線程】釋放連接
- 2014-11-19 11:34:31,017 - 所有線程運行結束!