目前分布式鎖,比較成熟、主流的方案有基於redis及基於zookeeper的二種方案。
大體來講,基於redis的分布式鎖核心指令為SETNX,即如果目標key存在,寫入緩存失敗返回0,反之如果目標key不存在,寫入緩存成功返回1,通過區分這二個不同的返回值,可以認為SETNX成功即為獲得了鎖。
redis分布式鎖,看上去很簡單,但其實要考慮周全,並不容易,網上有一篇文章討論得很詳細:http://blog.csdn.net/ugg/article/details/41894947/,有興趣的可以閱讀一下。
其主要問題在於某些異常情況下,鎖的釋放會有問題,比如SETNX成功,應用獲得鎖,這時出於某種原因,比如網絡中斷,或程序出異常退出,會導致鎖無法及時釋放,只能依賴於緩存的過期時間,但是過期時間這個值設置多大,也是一個糾結的問題,設置小了,應用處理邏輯很復雜的話,可能會導致鎖提前釋放,如果設置大了,又會導致鎖不能及時釋放,所以那篇文章中針對這些細節討論了很多。
而基於zk的分布式鎖,在鎖的釋放問題上處理起來要容易一些,其大體思路是利用zk的“臨時順序”節點,需要獲取鎖時,在某個約定節點下注冊一個臨時順序節點,然后將所有臨時節點按小從到大排序,如果自己注冊的臨時節點正好是最小的,表示獲得了鎖。(zk能保證臨時節點序號始終遞增,所以如果后面有其它應用也注冊了臨時節點,序號肯定比獲取鎖的應用更大)
當應用處理完成,或者處理過程中出現某種原因,導致與zk斷開,超過時間閾值(可配置)后,zk server端會自動刪除該臨時節點,即:鎖被釋放。所有參與鎖競爭的應用,只要監聽父路徑的子節點變化即可,有變化時(即:有應用斷開或注冊時),開始搶鎖,搶完了大家都在一邊等着,直到有新變化時,開始新一輪搶鎖。
關於zk的分布式鎖,網上也有一篇文章寫得不錯,見http://blog.csdn.net/desilting/article/details/41280869
個人感覺:zk做分布式鎖機制更完善,但zk抗並發的能力弱於redis,性能上略差,建議如果並發要求高,鎖競爭激烈,可考慮用redis,如果搶鎖的頻度不高,用zk更適合。
最后送福利時間到:
文中提到的基於zk分布式鎖的那篇文章,邏輯上雖然沒有問題,但是有些場景下,鎖的數量限制可能要求不止1個,比如:某些應用,我希望同時啟動2個實例來處理,但是出於HA的考慮,又擔心這二個實例會掛掉,這時可以啟動4個(或者更多),這些實例中,只允許2個搶到鎖的實例可以進行業務處理,其它實例處於standby狀態(即:備胎),如果這二個搶到鎖的實例掛了(比如異常退出),那么standby的實例會得到鎖,即:備胎轉正,開始正常業務處理,從而保證了系統的HA。
對於這些場景,我封裝了一個抽象類,大家可在此基礎上自行修改:(主要看明白思路就行,代碼細節並不重要)
package cn.cnblogs.yjmyzz.zookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by yangjunming on 5/27/16.
* 基於Zookeeper的分布式鎖
*/
public abstract class AbstractLock {
private int lockNumber = 1; //允許獲取的鎖數量(默認為1,即最小節點=自身時,認為獲得鎖)
private ZkClient zk = null;
private String rootNode = "/lock"; //根節點名稱
private String selfNode;
private final String className = this.getClass().getSimpleName(); //當前實例的className
private String selfNodeName;//自身注冊的臨時節點名
private boolean handling = false;
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final JsonUtil jsonUtil = new FastJsonUtil();
private static final String SPLIT = "/";
private String selfNodeFullName;
/**
* 通過Zk獲取分布式鎖
*/
protected void getLock(int lockNumber) {
setLockNumber(lockNumber);
initBean();
initNode();
subscribe();
register();
heartBeat();
remainRunning();
}
protected void getLock() {
getLock(1);
}
/**
* 初始化結點
*/
private void initNode() {
String error;
if (!rootNode.startsWith(SPLIT)) {
error = "rootNode必須以" + SPLIT + "開頭";
logger.error(error);
throw new RuntimeException(error);
}
if (rootNode.endsWith(SPLIT)) {
error = "不能以" + SPLIT + "結尾";
logger.error(error);
throw new RuntimeException(error);
}
int start = 1;
int index = rootNode.indexOf(SPLIT, start);
String path;
while (index != -1) {
path = rootNode.substring(0, index);
if (!zk.exists(path)) {
zk.createPersistent(path);
}
start = index + 1;
if (start >= rootNode.length()) {
break;
}
index = rootNode.indexOf(SPLIT, start);
}
if (start < rootNode.length()) {
if (!zk.exists(rootNode)) {
zk.createPersistent(rootNode);
}
}
selfNode = rootNode + SPLIT + className;
if (!zk.exists(selfNode)) {
zk.createPersistent(selfNode);
}
}
/**
* 向zk注冊自身節點
*/
private void register() {
selfNodeName = zk.createEphemeralSequential(selfNode + SPLIT, StringUtils.EMPTY);
if (!StringUtils.isEmpty(selfNodeName)) {
selfNodeFullName = selfNodeName;
logger.info("自身節點:" + selfNodeName + ",注冊成功!");
selfNodeName = selfNodeName.substring(selfNode.length() + 1);
}
checkMin();
}
/**
* 訂閱zk的節點變化
*/
private void subscribe() {
zk.subscribeChildChanges(selfNode, (parentPath, currentChilds) -> {
checkMin();
});
}
/**
* 檢測是否獲得鎖
*/
private void checkMin() {
List<String> list = zk.getChildren(selfNode);
if (CollectionUtils.isEmpty(list)) {
logger.error(selfNode + " 無任何子節點!");
lockFail();
handling = false;
return;
}
//按序號從小到大排
Collections.sort(list);
//如果自身ID在前N個鎖中,則認為獲取成功
int max = Math.min(getLockNumber(), list.size());
for (int i = 0; i < max; i++) {
if (list.get(i).equals(selfNodeName)) {
if (!handling) {
lockSuccess();
handling = true;
logger.info("獲得鎖成功!");
}
return;
}
}
int selfIndex = list.indexOf(selfNodeName);
if (selfIndex > 0) {
logger.info("前面還有節點" + list.get(selfIndex - 1) + ",獲取鎖失敗!");
} else {
logger.info("獲取鎖失敗!");
}
lockFail();
handling = false;
}
/**
* 獲得鎖成功的處理回調
*/
protected abstract void lockSuccess();
/**
* 獲得鎖失敗的處理回調
*/
protected abstract void lockFail();
/**
* 初始化相關的Bean對象
*/
protected abstract void initBean();
protected void setZkClient(ZkClient zk) {
this.zk = zk;
}
protected int getLockNumber() {
return lockNumber;
}
protected void setLockNumber(int lockNumber) {
this.lockNumber = lockNumber;
}
protected void setRootNode(String value) {
this.rootNode = value;
}
/**
* 防程序退出
*/
private void remainRunning() {
byte[] lock = new byte[0];
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("remainRunning出錯:", e);
}
}
}
/**
* 定時向zk發送心跳
*/
private void heartBeat() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
service.scheduleAtFixedRate(() -> {
HeartBeat heartBeat = new HeartBeat();
heartBeat.setHostIp(NetworkUtil.getHostAddress());
heartBeat.setHostName(NetworkUtil.getHostName());
heartBeat.setLastTime(new Date());
heartBeat.setPid(RuntimeUtil.getPid());
zk.writeData(selfNodeFullName, jsonUtil.toJson(heartBeat));
}, 0, 15, TimeUnit.SECONDS);
}
}
這個類中,提供了三個抽象方法:
/**
* 獲得鎖成功的處理回調
*/
protected abstract void lockSuccess();
/**
* 獲得鎖失敗的處理回調
*/
protected abstract void lockFail();
/**
* 初始化相關的Bean對象
*/
protected abstract void initBean();
用於處理搶鎖成功、搶鎖失敗、及開搶前的一些對象初始化處理,子類繼承后,只要實現這3個具體的方法即可,同時該抽象類默認還提供了心跳機制,用於定時向zk匯報自身的健康狀態。
