分布式鎖介紹##
分布式執行一些不需要同時執行的復雜任務,curator利用zk的特質,實現了這個選舉過程。其實就是利用了多個zk客戶端在同一個位置建節點,只會有一個客戶端建立成功這個特性。來實現同一時間,只會選擇一個客戶端執行任務
代碼###
//分布式鎖
InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path");
CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(()->{
try {
down.await();
lock.acquire();
} catch (Exception e) {
e.printStackTrace();
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
System.out.println(sdf.format(new Date()));
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
down.countDown();
InterProcessMutex 是一個可重入的排他鎖,獲取鎖的過程是通過往ZK下面成功建立節點來實現的,下面是獲取鎖的過程
//獲取當前線程
Thread currentThread = Thread.currentThread();
//獲取當前線程的鎖數據
LockData lockData = threadData.get(currentThread);
//如果不為null,則將鎖的數量+1,實現可重入
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//第一次會到這里,嘗試在我們之前設置的路徑下建立節點
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
//建立成功后就初始化lockdata 並按當前線程進行保存,所以可以通過創建多個thread來模擬鎖競爭,而不需要建多個client。
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
下面是attemptLock的重要代碼
while ( !isDone )
{
isDone = true;
try
{
//建立節點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//下面就是獲取鎖和加鎖的循環了
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//session過期時走這里,按策略處理,允許重試就重試,否則就拋出異常
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
下面是internalLockLoop的重要代碼
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//排序是因為要實現公平鎖,加上maxleases參數限制取首位
List<String> children = getSortedChildren();
//得到子節點名稱,比如 _c_ce2a26cb-9721-4f56-91fd-a6d00b00b12c-lock-0000000030
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//獲取是否獲得鎖的狀態,重要方法,maxLeases為1,后面要通過這個參數進行對比,通過判斷小於這個來實現 公平鎖
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//如果沒有獲得鎖,就給當前節點加個watcher,繼續等待,一旦被刪掉就調用這個watcher notifyall。
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 這個watcher只有一個作用就是喚醒其他線程進行競爭 notifyAll();
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
...
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}