目前分布式鎖,比較成熟、主流的方案有基於redis及基於zookeeper的二種方案。
大體來講,基於redis的分布式鎖核心指令為SETNX,即如果目標key存在,寫入緩存失敗返回0,反之如果目標key不存在,寫入緩存成功返回1,通過區分這二個不同的返回值,可以認為SETNX成功即為獲得了鎖。
redis分布式鎖,看上去很簡單,但其實要考慮周全,並不容易,網上有一篇文章討論得很詳細:http://blog.csdn.net/ugg/article/details/41894947/,有興趣的可以閱讀一下。
其主要問題在於某些異常情況下,鎖的釋放會有問題,比如SETNX成功,應用獲得鎖,這時出於某種原因,比如網絡中斷,或程序出異常退出,會導致鎖無法及時釋放,只能依賴於緩存的過期時間,但是過期時間這個值設置多大,也是一個糾結的問題,設置小了,應用處理邏輯很復雜的話,可能會導致鎖提前釋放,如果設置大了,又會導致鎖不能及時釋放,所以那篇文章中針對這些細節討論了很多。
而基於zk的分布式鎖,在鎖的釋放問題上處理起來要容易一些,其大體思路是利用zk的“臨時順序”節點,需要獲取鎖時,在某個約定節點下注冊一個臨時順序節點,然后將所有臨時節點按小從到大排序,如果自己注冊的臨時節點正好是最小的,表示獲得了鎖。(zk能保證臨時節點序號始終遞增,所以如果后面有其它應用也注冊了臨時節點,序號肯定比獲取鎖的應用更大)
當應用處理完成,或者處理過程中出現某種原因,導致與zk斷開,超過時間閾值(可配置)后,zk server端會自動刪除該臨時節點,即:鎖被釋放。所有參與鎖競爭的應用,只要監聽父路徑的子節點變化即可,有變化時(即:有應用斷開或注冊時),開始搶鎖,搶完了大家都在一邊等着,直到有新變化時,開始新一輪搶鎖。
關於zk的分布式鎖,網上也有一篇文章寫得不錯,見http://blog.csdn.net/desilting/article/details/41280869
個人感覺:zk做分布式鎖機制更完善,但zk抗並發的能力弱於redis,性能上略差,建議如果並發要求高,鎖競爭激烈,可考慮用redis,如果搶鎖的頻度不高,用zk更適合。
最后送福利時間到:
文中提到的基於zk分布式鎖的那篇文章,邏輯上雖然沒有問題,但是有些場景下,鎖的數量限制可能要求不止1個,比如:某些應用,我希望同時啟動2個實例來處理,但是出於HA的考慮,又擔心這二個實例會掛掉,這時可以啟動4個(或者更多),這些實例中,只允許2個搶到鎖的實例可以進行業務處理,其它實例處於standby狀態(即:備胎),如果這二個搶到鎖的實例掛了(比如異常退出),那么standby的實例會得到鎖,即:備胎轉正,開始正常業務處理,從而保證了系統的HA。
對於這些場景,我封裝了一個抽象類,大家可在此基礎上自行修改:(主要看明白思路就行,代碼細節並不重要)
package cn.cnblogs.yjmyzz.zookeeper; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by yangjunming on 5/27/16. * 基於Zookeeper的分布式鎖 */ public abstract class AbstractLock { private int lockNumber = 1; //允許獲取的鎖數量(默認為1,即最小節點=自身時,認為獲得鎖) private ZkClient zk = null; private String rootNode = "/lock"; //根節點名稱 private String selfNode; private final String className = this.getClass().getSimpleName(); //當前實例的className private String selfNodeName;//自身注冊的臨時節點名 private boolean handling = false; protected final Logger logger = LoggerFactory.getLogger(this.getClass()); private static final JsonUtil jsonUtil = new FastJsonUtil(); private static final String SPLIT = "/"; private String selfNodeFullName; /** * 通過Zk獲取分布式鎖 */ protected void getLock(int lockNumber) { setLockNumber(lockNumber); initBean(); initNode(); subscribe(); register(); heartBeat(); remainRunning(); } protected void getLock() { getLock(1); } /** * 初始化結點 */ private void initNode() { String error; if (!rootNode.startsWith(SPLIT)) { error = "rootNode必須以" + SPLIT + "開頭"; logger.error(error); throw new RuntimeException(error); } if (rootNode.endsWith(SPLIT)) { error = "不能以" + SPLIT + "結尾"; logger.error(error); throw new RuntimeException(error); } int start = 1; int index = rootNode.indexOf(SPLIT, start); String path; while (index != -1) { path = rootNode.substring(0, index); if (!zk.exists(path)) { zk.createPersistent(path); } start = index + 1; if (start >= rootNode.length()) { break; } index = rootNode.indexOf(SPLIT, start); } if (start < rootNode.length()) { if (!zk.exists(rootNode)) { zk.createPersistent(rootNode); } } selfNode = rootNode + SPLIT + className; if (!zk.exists(selfNode)) { zk.createPersistent(selfNode); } } /** * 向zk注冊自身節點 */ private void register() { selfNodeName = zk.createEphemeralSequential(selfNode + SPLIT, StringUtils.EMPTY); if (!StringUtils.isEmpty(selfNodeName)) { selfNodeFullName = selfNodeName; logger.info("自身節點:" + selfNodeName + ",注冊成功!"); selfNodeName = selfNodeName.substring(selfNode.length() + 1); } checkMin(); } /** * 訂閱zk的節點變化 */ private void subscribe() { zk.subscribeChildChanges(selfNode, (parentPath, currentChilds) -> { checkMin(); }); } /** * 檢測是否獲得鎖 */ private void checkMin() { List<String> list = zk.getChildren(selfNode); if (CollectionUtils.isEmpty(list)) { logger.error(selfNode + " 無任何子節點!"); lockFail(); handling = false; return; } //按序號從小到大排 Collections.sort(list); //如果自身ID在前N個鎖中,則認為獲取成功 int max = Math.min(getLockNumber(), list.size()); for (int i = 0; i < max; i++) { if (list.get(i).equals(selfNodeName)) { if (!handling) { lockSuccess(); handling = true; logger.info("獲得鎖成功!"); } return; } } int selfIndex = list.indexOf(selfNodeName); if (selfIndex > 0) { logger.info("前面還有節點" + list.get(selfIndex - 1) + ",獲取鎖失敗!"); } else { logger.info("獲取鎖失敗!"); } lockFail(); handling = false; } /** * 獲得鎖成功的處理回調 */ protected abstract void lockSuccess(); /** * 獲得鎖失敗的處理回調 */ protected abstract void lockFail(); /** * 初始化相關的Bean對象 */ protected abstract void initBean(); protected void setZkClient(ZkClient zk) { this.zk = zk; } protected int getLockNumber() { return lockNumber; } protected void setLockNumber(int lockNumber) { this.lockNumber = lockNumber; } protected void setRootNode(String value) { this.rootNode = value; } /** * 防程序退出 */ private void remainRunning() { byte[] lock = new byte[0]; synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("remainRunning出錯:", e); } } } /** * 定時向zk發送心跳 */ private void heartBeat() { ScheduledExecutorService service = Executors.newScheduledThreadPool(1); service.scheduleAtFixedRate(() -> { HeartBeat heartBeat = new HeartBeat(); heartBeat.setHostIp(NetworkUtil.getHostAddress()); heartBeat.setHostName(NetworkUtil.getHostName()); heartBeat.setLastTime(new Date()); heartBeat.setPid(RuntimeUtil.getPid()); zk.writeData(selfNodeFullName, jsonUtil.toJson(heartBeat)); }, 0, 15, TimeUnit.SECONDS); } }
這個類中,提供了三個抽象方法:
/** * 獲得鎖成功的處理回調 */ protected abstract void lockSuccess(); /** * 獲得鎖失敗的處理回調 */ protected abstract void lockFail(); /** * 初始化相關的Bean對象 */ protected abstract void initBean();
用於處理搶鎖成功、搶鎖失敗、及開搶前的一些對象初始化處理,子類繼承后,只要實現這3個具體的方法即可,同時該抽象類默認還提供了心跳機制,用於定時向zk匯報自身的健康狀態。