ZooKeeper 筆記(6) 分布式鎖


  目前分布式鎖,比較成熟、主流的方案有基於redis及基於zookeeper的二種方案。

  大體來講,基於redis的分布式鎖核心指令為SETNX,即如果目標key存在,寫入緩存失敗返回0,反之如果目標key不存在,寫入緩存成功返回1,通過區分這二個不同的返回值,可以認為SETNX成功即為獲得了鎖。

  redis分布式鎖,看上去很簡單,但其實要考慮周全,並不容易,網上有一篇文章討論得很詳細:http://blog.csdn.net/ugg/article/details/41894947/,有興趣的可以閱讀一下。

  其主要問題在於某些異常情況下,鎖的釋放會有問題,比如SETNX成功,應用獲得鎖,這時出於某種原因,比如網絡中斷,或程序出異常退出,會導致鎖無法及時釋放,只能依賴於緩存的過期時間,但是過期時間這個值設置多大,也是一個糾結的問題,設置小了,應用處理邏輯很復雜的話,可能會導致鎖提前釋放,如果設置大了,又會導致鎖不能及時釋放,所以那篇文章中針對這些細節討論了很多。

  而基於zk的分布式鎖,在鎖的釋放問題上處理起來要容易一些,其大體思路是利用zk的“臨時順序”節點,需要獲取鎖時,在某個約定節點下注冊一個臨時順序節點,然后將所有臨時節點按小從到大排序,如果自己注冊的臨時節點正好是最小的,表示獲得了鎖。(zk能保證臨時節點序號始終遞增,所以如果后面有其它應用也注冊了臨時節點,序號肯定比獲取鎖的應用更大)

  當應用處理完成,或者處理過程中出現某種原因,導致與zk斷開,超過時間閾值(可配置)后,zk server端會自動刪除該臨時節點,即:鎖被釋放。所有參與鎖競爭的應用,只要監聽父路徑的子節點變化即可,有變化時(即:有應用斷開或注冊時),開始搶鎖,搶完了大家都在一邊等着,直到有新變化時,開始新一輪搶鎖。

  關於zk的分布式鎖,網上也有一篇文章寫得不錯,見http://blog.csdn.net/desilting/article/details/41280869

 

個人感覺:zk做分布式鎖機制更完善,但zk抗並發的能力弱於redis,性能上略差,建議如果並發要求高,鎖競爭激烈,可考慮用redis,如果搶鎖的頻度不高,用zk更適合。

 

最后送福利時間到:

  文中提到的基於zk分布式鎖的那篇文章,邏輯上雖然沒有問題,但是有些場景下,鎖的數量限制可能要求不止1個,比如:某些應用,我希望同時啟動2個實例來處理,但是出於HA的考慮,又擔心這二個實例會掛掉,這時可以啟動4個(或者更多),這些實例中,只允許2個搶到鎖的實例可以進行業務處理,其它實例處於standby狀態(即:備胎),如果這二個搶到鎖的實例掛了(比如異常退出),那么standby的實例會得到鎖,即:備胎轉正,開始正常業務處理,從而保證了系統的HA。

對於這些場景,我封裝了一個抽象類,大家可在此基礎上自行修改:(主要看明白思路就行,代碼細節並不重要)

package cn.cnblogs.yjmyzz.zookeeper;

import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by yangjunming on 5/27/16.
 * 基於Zookeeper的分布式鎖
 */
public abstract class AbstractLock {

    private int lockNumber = 1; //允許獲取的鎖數量(默認為1,即最小節點=自身時,認為獲得鎖)
    private ZkClient zk = null;
    private String rootNode = "/lock"; //根節點名稱
    private String selfNode;
    private final String className = this.getClass().getSimpleName(); //當前實例的className
    private String selfNodeName;//自身注冊的臨時節點名
    private boolean handling = false;
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final JsonUtil jsonUtil = new FastJsonUtil();
    private static final String SPLIT = "/";
    private String selfNodeFullName;


    /**
     * 通過Zk獲取分布式鎖
     */
    protected void getLock(int lockNumber) {
        setLockNumber(lockNumber);
        initBean();
        initNode();
        subscribe();
        register();
        heartBeat();
        remainRunning();
    }


    protected void getLock() {
        getLock(1);
    }


    /**
     * 初始化結點
     */
    private void initNode() {

        String error;
        if (!rootNode.startsWith(SPLIT)) {
            error = "rootNode必須以" + SPLIT + "開頭";
            logger.error(error);
            throw new RuntimeException(error);
        }

        if (rootNode.endsWith(SPLIT)) {
            error = "不能以" + SPLIT + "結尾";
            logger.error(error);
            throw new RuntimeException(error);
        }

        int start = 1;
        int index = rootNode.indexOf(SPLIT, start);
        String path;
        while (index != -1) {
            path = rootNode.substring(0, index);
            if (!zk.exists(path)) {
                zk.createPersistent(path);
            }
            start = index + 1;
            if (start >= rootNode.length()) {
                break;
            }
            index = rootNode.indexOf(SPLIT, start);
        }

        if (start < rootNode.length()) {
            if (!zk.exists(rootNode)) {
                zk.createPersistent(rootNode);
            }
        }

        selfNode = rootNode + SPLIT + className;

        if (!zk.exists(selfNode)) {
            zk.createPersistent(selfNode);
        }
    }

    /**
     * 向zk注冊自身節點
     */
    private void register() {
        selfNodeName = zk.createEphemeralSequential(selfNode + SPLIT, StringUtils.EMPTY);
        if (!StringUtils.isEmpty(selfNodeName)) {
            selfNodeFullName = selfNodeName;
            logger.info("自身節點:" + selfNodeName + ",注冊成功!");
            selfNodeName = selfNodeName.substring(selfNode.length() + 1);
        }
        checkMin();
    }

    /**
     * 訂閱zk的節點變化
     */
    private void subscribe() {
        zk.subscribeChildChanges(selfNode, (parentPath, currentChilds) -> {
            checkMin();
        });
    }

    /**
     * 檢測是否獲得鎖
     */
    private void checkMin() {
        List<String> list = zk.getChildren(selfNode);
        if (CollectionUtils.isEmpty(list)) {
            logger.error(selfNode + " 無任何子節點!");
            lockFail();
            handling = false;
            return;
        }
        //按序號從小到大排
        Collections.sort(list);

        //如果自身ID在前N個鎖中,則認為獲取成功
        int max = Math.min(getLockNumber(), list.size());
        for (int i = 0; i < max; i++) {
            if (list.get(i).equals(selfNodeName)) {
                if (!handling) {
                    lockSuccess();
                    handling = true;
                    logger.info("獲得鎖成功!");
                }
                return;
            }
        }

        int selfIndex = list.indexOf(selfNodeName);
        if (selfIndex > 0) {
            logger.info("前面還有節點" + list.get(selfIndex - 1) + ",獲取鎖失敗!");
        } else {
            logger.info("獲取鎖失敗!");
        }
        lockFail();

        handling = false;
    }

    /**
     * 獲得鎖成功的處理回調
     */
    protected abstract void lockSuccess();

    /**
     * 獲得鎖失敗的處理回調
     */
    protected abstract void lockFail();

    /**
     * 初始化相關的Bean對象
     */
    protected abstract void initBean();


    protected void setZkClient(ZkClient zk) {
        this.zk = zk;
    }

    protected int getLockNumber() {
        return lockNumber;
    }

    protected void setLockNumber(int lockNumber) {
        this.lockNumber = lockNumber;
    }

    protected void setRootNode(String value) {
        this.rootNode = value;
    }

    /**
     * 防程序退出
     */
    private void remainRunning() {
        byte[] lock = new byte[0];
        synchronized (lock) {
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("remainRunning出錯:", e);
            }
        }
    }

    /**
     * 定時向zk發送心跳
     */
    private void heartBeat() {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleAtFixedRate(() -> {
            HeartBeat heartBeat = new HeartBeat();
            heartBeat.setHostIp(NetworkUtil.getHostAddress());
            heartBeat.setHostName(NetworkUtil.getHostName());
            heartBeat.setLastTime(new Date());
            heartBeat.setPid(RuntimeUtil.getPid());
            zk.writeData(selfNodeFullName, jsonUtil.toJson(heartBeat));
        }, 0, 15, TimeUnit.SECONDS);
    }
}

這個類中,提供了三個抽象方法:

    /**
     * 獲得鎖成功的處理回調
     */
    protected abstract void lockSuccess();

    /**
     * 獲得鎖失敗的處理回調
     */
    protected abstract void lockFail();

    /**
     * 初始化相關的Bean對象
     */
    protected abstract void initBean();

用於處理搶鎖成功、搶鎖失敗、及開搶前的一些對象初始化處理,子類繼承后,只要實現這3個具體的方法即可,同時該抽象類默認還提供了心跳機制,用於定時向zk匯報自身的健康狀態。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM