zk分布鎖的java實現


只做記錄,直接上代碼

父類:

package com.ylcloud.common.lock;

import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 * @author cjh
 * @Description: zk分布鎖
 * @date: 2018/9/27 11:36
 */
public class ZkLock {

    private static Logger logger = LogManager.getLogger(ZkLock.class);

    public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";

    public static Integer sessionTimeout = 30000;

    public static Integer connectTimeout = 30000;

    /**
     * 節點鎖標記
     */
    public String lockPath;

    /**
     * 前一個節點(設置用戶添加監聽器)
     */
    public String beforeNode;

    /**
     * 當前執行節點(設置用於刪除)
     */
    public String currentNode;

    /**
     * 當前請求節點
     */
    public String threadTag = null;

    private String lock1 = null;

    private String lock2 = null;

    public static final ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer());

    private static final ThreadLocal<String> NODES = new ThreadLocal<String>();

    public ZkLock() {
    }

    public void init(String code) {
        this.lockPath = code;
        this.lock1 = code + "LOCK";
        this.lock2 = code + "UNLOCK";
        client.deleteRecursive(lockPath);
    }

    public void lock() {

        synchronized (lock1) {
            if (!client.exists(lockPath)) {
                client.createPersistent(lockPath);
            }

            if (!tryLock()) {
            }

        }

    }

    public void unlock() {

        List<String> childrens = client.getChildren(lockPath);
        Collections.sort(childrens);

        String nodes = NODES.get();
        logger.info(JSON.toJSONString(childrens) + " ==== " + nodes + " ==== " + (nodes.equals(lockPath + '/' + childrens.get(0))));
        if (childrens.size() > 0 && nodes.equals(lockPath + '/' + childrens.get(0))) {
            client.delete(nodes);
        }

    }

    private boolean tryLock() {

        threadTag = client.createEphemeralSequential(lockPath + '/', "");

        NODES.set(threadTag);

        List<String> childrens = client.getChildren(lockPath);
        Collections.sort(childrens);

        currentNode = lockPath + '/' + childrens.get(0);

        if (threadTag.equals(currentNode)) {
            return true;
        } else {

            currentNode = threadTag;

            int wz = Collections.binarySearch(childrens, threadTag.substring(lockPath.length() + 1));
            beforeNode = lockPath + '/' + childrens.get(wz - 1);

            final CountDownLatch latch = new CountDownLatch(1);
            try {

                client.subscribeDataChanges(beforeNode, new IZkDataListener() {

                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        if (latch != null && latch.getCount() > 0) {
                            latch.countDown();
                        }
                    }

                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {
                    }
                });

                if (client.exists(beforeNode)) {
                    latch.await(sessionTimeout, TimeUnit.MILLISECONDS);
                }

            } catch (Exception e) {
                return true;
            } finally {
            }

        }
        return false;
    }

}

 

子類

package com.ylcloud.common.lock.ext;

import com.ylcloud.common.lock.ZkLock;

/**
  * @Description: 用戶編碼鎖
  * @author cjh
  * @date: 2018/10/10 14:47
  */
public class ZkLockUserCode extends ZkLock {

    public ZkLockUserCode() {
        super.init("/USER_CODE");
    }

}

 

使用示例:

private ZkLock zkLock = new ZkLockUserCode();

public void addUser() {
    
    try {

        zkLock.lock();

            /**
             *業務實現
             */
            
    } catch (Exception e) {
        logger.info("err {} {} ", e.getMessage(), e.getCause());


    } finally {
        zkLock.unlock();
    }
        
}     

 

注意:unlock必須寫在finally里面,否則一旦業務出現運行錯誤造成沒有解鎖,下一次訪問的人就需要等待一個sessionTime了

題外話:zk在linux上啟動命令 ./zkServer.sh start

 

自己實現的很多細節沒考慮到導致在高並發的項目中出現了問題,然后我改用了Curator框架來操控zookeeper實現分布鎖:

    <curator.version>4.0.0</curator.version> 

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>    

 

鎖實現代碼:

package com.ylcloud.common.lock;

import com.ylcloud.common.lock.ext.ZkLockRoleCode;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
 * @author cjh
 * @Description: zk分布鎖
 * @date: 2018/9/27 11:36
 */
public class ZkLock {

    private static Logger logger = LogManager.getLogger(ZkLock.class);

    public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";

    public static String NAME_SPACE = "YL_CLOUD";

    public static Integer sessionTimeout = 15000;

    public static Integer connectTimeout = 30000;

    private static Object initLock = new Object();

    public static CuratorFramework client = null;

    private static Map<String,ZkLock> tagLocks = new HashMap<String,ZkLock>();

    public ZkLock() {
    }

    public InterProcessMutex mutex = null;

    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client =
                CuratorFrameworkFactory.builder()
                        .connectString(ZOOKEEPER_IP_PORT)
                        .sessionTimeoutMs(sessionTimeout)
                        .connectionTimeoutMs(connectTimeout)
                        .retryPolicy(retryPolicy)
                        .namespace(NAME_SPACE)
                        .build();

        client.start();
    }

    public void init(String code) {
        this.mutex = new InterProcessMutex(client,code);
    }

    //prefix 標記  code 主鍵
    public static ZkLock getTagLock(String prefix,String code){
        String tag = prefix + "/" + code;

        if(!tagLocks.containsKey(tag)){
            synchronized (initLock){
                if(!tagLocks.containsKey(tag)){
                    ZkLock zkLock = new ZkLock();
                    zkLock.init(tag);
                    tagLocks.put(tag,zkLock);
                }
            }
        }

        return tagLocks.get(tag);
    }

    public void lock(){
        try {
            this.mutex.acquire();
        } catch (Exception e) {
            logger.error("加鎖失敗");
        }
    }

    public void unlock(){
        try {
            this.mutex.release();
        } catch (Exception e) {
            logger.error("鎖釋放失敗");
        }
    }

}

 

調用代碼(不變)

 

 

轉載請注明博客出處:http://www.cnblogs.com/cjh-notes/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM