分布式鎖-基於ZK和Redis實現


一、基於zookeeper實現分布式鎖

1.1 Zookeeper的常用接口

package register;


import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class BaseZookeeper implements Watcher{

    public BaseZookeeper(){}

    public BaseZookeeper(String host){
        this.connectZookeeper(host);
    }

    private ZooKeeper zookeeper;

    //超時時間
    private static final int SESSION_TIME_OUT = 2000;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void process(WatchedEvent event) {
        if (event.getState() == KeeperState.SyncConnected) {
            //System.out.println("Watch received event");
            countDownLatch.countDown();
        }
    }

    //連接zookeeper
    protected void connectZookeeper(String host){
        try {
            zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
            countDownLatch.await();
            //System.out.println("zookeeper connection success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //創建節點
    protected String createNode(String path, String data){
        try {
            //永久節點
            String result = this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            //臨時節點(會話關閉就刪除了,調用close后就自動刪除了)
            //String result = this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("createNode: " + result);
            return result;
        } catch (Exception e) {
            //e.printStackTrace();
            return null;
        }
    }

    //創建多級節點
    //String path = "/dubbo/server/com.wzy.server.OrderServer";
    protected boolean createMultNode(String path){

        String[] paths = path.split("/");
        String realPath = "/";
        for (int i=1; i<paths.length; i++) {
            realPath += paths[i];
            String result = createNode(realPath, "");

            if (result == null) {
                return false;
            }
            realPath += "/";
        }
        return true;
    }

    //獲取路徑下所有子節點
    protected List<String> getChildren(String path){
        try {
            List<String> children = zookeeper.getChildren(path, false);
            return children;
        } catch (Exception e) {
            //當路徑已經是根節點(沒有子節點)時,就會拋異常
            return null;
        }

    }

    //獲取節點上面的數據
    protected String getData(String path) throws KeeperException, InterruptedException{
        byte[] data = zookeeper.getData(path, false, null);
        if (data == null) {
            return "";
        }
        return new String(data);
    }

    //設置節點信息
    protected Stat setData(String path, String data){
        try {
            getData(path);
            Stat stat = zookeeper.setData(path, data.getBytes(), -1);
            return stat;
        } catch (Exception e) {
            //String result = createNode(path,"");
            return null;
        }

    }

    //刪除節點
    protected boolean deleteNode(String path){
        if (!path.startsWith("/")) {
            path = "/" + path;
        }
        try {
            zookeeper.delete(path, -1);
        } catch (InterruptedException e) {
            return false;
        } catch (KeeperException e) {
            return false;
        }
        return true;
    }

    //獲取創建時間
    protected String getCTime(String path) throws KeeperException, InterruptedException{
        Stat stat = zookeeper.exists(path, false);
        return String.valueOf(stat.getCtime());
    }

    //獲取某個路徑下孩子的數量
    protected Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
        int childenNum = zookeeper.getChildren(path, false).size();
        return childenNum;
    }

    //監聽節點是否被刪除
    protected void watchIsDel(final String path) throws Exception{
        zookeeper.exists(path, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                Event.EventType type = watchedEvent.getType();
                if (Event.EventType.NodeDeleted.equals(type)) {
                    System.out.println("結點 " + path + "被刪除了");
                }
            }
        });
    }

    //關閉連接
    public void closeConnection() {
        if (zookeeper != null) {
            try {
                zookeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
package register;

import framework.URL;

import java.util.List;
import java.util.Random;

/**
 * zk 的注冊工具
 */
public class ZkRegister extends BaseZookeeper {

    private static String ZK_HOST = "127.0.0.1:2181";
    private final String SERVER_ADDRESS = "/dubbo/server";
    private final String ROOT_ADDRESS = "/dubbo";

    public ZkRegister(){
        super(ZK_HOST);
    }

    public void setZkHost(String host){ZkRegister.ZK_HOST = host;}

    /**
     * 注冊服務
     * @param serverInterface
     * @param url
     * @return
     */
    public boolean regist(Class serverInterface, URL url){
        if (null != getChildren(ROOT_ADDRESS)){
            deleteNodeRF(ROOT_ADDRESS);
        }
        return addAddressToNode(SERVER_ADDRESS + "/" + serverInterface.getName(), new String[]{url.getAddress()});
    }

    /**
     * 從地址列表里隨機獲取一個地址
     * @param serverInterface
     * @return
     */
    public String getURLRandom(Class serverInterface){
        List<String> urls = getChildren(SERVER_ADDRESS + "/" + serverInterface.getName());
        return urls.get(new Random().nextInt(urls.size()));
    }

    /**
     * 向節點添加服務地址
     * @param nodePath
     * @param address
     * @return
     * String path = "/dubbo/server/com.wzy.server.OrderServer";
     * String[] ip = new String[]{"192.168.37.1","192.168.37.2","192.168.37.3"};
     */
    public boolean addAddressToNode (String nodePath, String[] address) {
        if (!nodePath.startsWith("/")) {
            nodePath = "/" + nodePath;
        }

        if (null == getChildren(nodePath)){
            createMultNode(nodePath);
        }
        for (int i=0; i<address.length; i++) {
            String newPath = nodePath + "/" + address[i];
            String result = createNode(newPath,"");
            if (null == result) {
                return false;
            }
        }
        return true;
    }

    public boolean deleteNodeRF (String rootPath) {
        return deleteNodeRF(rootPath, rootPath);
    }
    /**
     * 刪除節點及其子目錄
     * @param rootPath
     * @return
     */
    private boolean deleteNodeRF (String rootPath, String parentPath) {
        if (!rootPath.startsWith("/")) {
            rootPath = "/" + rootPath;
        }
        List<String> childs = getChildren(rootPath);
        if (childs.size() > 0) {
            //遞歸
            for (String child : childs) {
                deleteNodeRF(rootPath + "/" + child, rootPath);
            }
        } else {
            System.out.println("delete: " + rootPath + " " + deleteNode(rootPath));
        }
        System.out.println("delete: " + parentPath + " " + deleteNode(parentPath));

        return true;
    }
}

1.2 基於zk實現分布式鎖

package lock;

import org.apache.zookeeper.*;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Zookeeper實現分布式鎖
 */
public class ZookeeperLock implements Lock {

    private ThreadLocal<ZooKeeper> zk = new ThreadLocal<ZooKeeper>();
    private String host = "localhost:2181";

    private final String LOCK_NAME = "/LOCK";
    private ThreadLocal<String> CURRENT_NODE = new ThreadLocal<String>();

    private void init() {
        if (zk.get() == null) {
            synchronized (ZookeeperLock.class) {
                if (zk.get() == null) {
                    try {
                        zk.set( new ZooKeeper(host, 2000, new Watcher() {
                            public void process(WatchedEvent watchedEvent) {
                                // do nothing..
                            }
                        }));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            }
        }
    }

    public void lock() {
        init();
        if (tryLock()) {
            System.out.println("get lock success");
        }
    }

    public boolean tryLock() {
        String node = LOCK_NAME + "/zk_";
        try {
            //創建臨時順序節點  /LOCK/zk_1
            CURRENT_NODE.set(zk.get().create(node, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
            //zk_1,zk_2
            List<String> list = zk.get().getChildren(LOCK_NAME, false);
            Collections.sort(list);
            System.out.println(list);
            String minNode = list.get(0);

            if ((LOCK_NAME + "/" + minNode).equals(CURRENT_NODE.get())) {
                return true;
            } else {
                //等待鎖
                Integer currentIndex = list.indexOf(CURRENT_NODE.get().substring(CURRENT_NODE.get().lastIndexOf("/") + 1));
                String preNodeName = list.get(currentIndex - 1);

                //監聽前一個節點刪除事件
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                zk.get().exists(LOCK_NAME + "/" + preNodeName, new Watcher() {
                    public void process(WatchedEvent watchedEvent) {
                        if (Event.EventType.NodeDeleted.equals(watchedEvent.getType())) {
                            countDownLatch.countDown();
                            System.out.println(Thread.currentThread().getName() + "喚醒鎖..");
                        }
                    }
                });

                System.out.println(Thread.currentThread().getName() + "等待鎖..");
                countDownLatch.await();//在變成0之前會一直阻塞

            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }

    public void unlock() {
        try {
            zk.get().delete(CURRENT_NODE.get(), -1);
            CURRENT_NODE.remove();
            zk.get().close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public Condition newCondition() {
        return null;
    }
}

二、基於Redis實現分布式鎖

package lock;

import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Redis實現分布式鎖
 */
public class RedisLock implements Lock {

    ThreadLocal<Jedis> jedis = new ThreadLocal<Jedis>();

    private static String LOCK_NAME = "LOCK";
    private static String REQUEST_ID = null;

    public RedisLock (String requestId) {
        RedisLock.REQUEST_ID = requestId;
        if (jedis.get() == null) {
            jedis.set(new Jedis("localhost"));
        }
    }
    public void lock() {
        if (tryLock()) {
            //jedis.set(LOCK_NAME, REQUEST_ID);
            //jedis.expire(LOCK_NAME, 1000);//設置過期時間

            //問題:上面兩句代碼不存在原子性操作,所以用下面一句代碼替換掉
            jedis.get().set(LOCK_NAME, REQUEST_ID, "NX", "PX", 1000);
        }
    }

    public boolean tryLock() {
        while (true) {
            //key不存在時設置值,存在則設置失敗。設置成功返回1,設置失敗返回0
            Long lock = jedis.get().setnx(LOCK_NAME, REQUEST_ID);
            if (lock == 1) {
                return true;
            }
        }
    }


    public void unlock() {
        //問題:保證不了原子性
        //String value = jedis.get(LOCK_NAME);
        //if (REQUEST_ID.equals(value)) {
        //    jedis.del(LOCK_NAME);
        //}

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        jedis.get().eval(script, Collections.singletonList(LOCK_NAME), Collections.singletonList(REQUEST_ID));
        jedis.get().close();
        jedis.remove();

    }

    public Condition newCondition() {
        return null;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly() throws InterruptedException {

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM