Zookeeper使用實例——分布式共享鎖


前一講中我們知道,Zookeeper通過維護一個分布式目錄數據結構,實現分布式協調服務。本文主要介紹利用Zookeeper有序目錄的創建和刪除,實現分布式共享鎖。

舉個例子,性能管理系統中,告警規則只允許最多創建450條,我們如何保證這個約束呢?

如果只有一個web節點,我們只需要簡單的把規則數量查詢服務,入庫服務加一個鎖即可以解決,代碼如下

synchronized(this)
{
    if(450 >  queryRuleCount())
    {
        insertRule(rule);
    }
}

實際上,性能管理系統至少有兩個以上的web節點,一方面保障服務性能,一方面用於容災備份。這種場景兩個規則創建請求可能在兩個web節點上執行,synchronized就無用武之地了。這種沖突在規則導入場景下更容易發生。所以,使用分布式共享鎖就勢在必行了。

我們知道,zookeeper維護的分布式目錄數據結構視圖,對於各個zookeeper節點都是相同。zookeeper允許客戶端創建一個有序的目錄——在CreateMode.EPHEMERAL_SEQUENTIAL創建模式下,zookeeper會自動在客戶端創建的目錄名稱后面添加一個自增長的id。關鍵代碼

            // 關鍵方法,創建包含自增長id名稱的目錄,這個方法支持了分布式鎖的實現
            // 四個參數:
            // 1、目錄名稱 2、目錄文本信息 
            // 3、文件夾權限,Ids.OPEN_ACL_UNSAFE表示所有權限 
            // 4、目錄類型,CreateMode.EPHEMERAL_SEQUENTIAL表示會在目錄名稱后面加一個自增加數字
            String lockPath = getZkClient().create(
                    ROOT_LOCK_PATH + '/' + PRE_LOCK_NAME,
                    Thread.currentThread().getName().getBytes(), 
                    Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

利用zookeeper允許客戶端創建一個有序的目錄的特性,可以實現一個可靠的分布式共享鎖。

分布式進程在讀寫一個共享數據時,可以先在某個公共目錄下創建一個有序子目錄,然后判斷該目錄id是否最小。
目錄id最小則獲得鎖並消費共享數據,然后刪除該目錄。否則則等待,直到自己的目錄id成為最小后,才獲得鎖。

zookeeper所有目錄操作事件都可以注冊監聽器,所以分布式進程不必循環查詢子目錄判斷自己的目錄id是否最小,可以注冊一個監聽器在前一個目錄上,監聽前一個目錄是否被刪除。

下面是一個分布式進程消費共享消息的例子

1、 zookeeper共享鎖

package com.coshaho.learn.zookeeper;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

/**
 * zookeeper分布式共享鎖
 * @author coshaho
 *
 */
public class ZookeeperLock 
{
    private String ROOT_LOCK_PATH = "/Locks";
    private String PRE_LOCK_NAME = "mylock_";
    private static ZookeeperLock lock;
    public static ZookeeperLock getInstance()
    {
        if(null == lock)
        {
            lock = new ZookeeperLock();
        }
        return lock;
    }
    
    /**
     * 獲取鎖:實際上是創建線程目錄,並判斷線程目錄序號是否最小
     * @return
     */
    public String getLock() 
    {
        try 
        {
            // 關鍵方法,創建包含自增長id名稱的目錄,這個方法支持了分布式鎖的實現
            // 四個參數:
            // 1、目錄名稱 2、目錄文本信息 
            // 3、文件夾權限,Ids.OPEN_ACL_UNSAFE表示所有權限 
            // 4、目錄類型,CreateMode.EPHEMERAL_SEQUENTIAL表示會在目錄名稱后面加一個自增加數字
            String lockPath = getZkClient().create(
                    ROOT_LOCK_PATH + '/' + PRE_LOCK_NAME,
                    Thread.currentThread().getName().getBytes(), 
                    Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + " create lock path : " + lockPath);
            tryLock(lockPath);
            return lockPath;
        } 
        catch (Exception e) 
        {
            e.printStackTrace();
        }
        return null;
    }
    
    private boolean tryLock(String lockPath) throws KeeperException, InterruptedException 
    {
        // 獲取ROOT_LOCK_PATH下所有的子節點,並按照節點序號排序
        List<String> lockPaths = getZkClient().getChildren(ROOT_LOCK_PATH, false);
        Collections.sort(lockPaths);
        int index = lockPaths.indexOf(lockPath.substring(ROOT_LOCK_PATH.length() + 1));
        if (index == 0) 
        {
            System.out.println(Thread.currentThread().getName() + " get lock, lock path: " + lockPath);
            return true;
        } 
        else 
        {
            // 創建Watcher,監控lockPath的前一個節點
            Watcher watcher = new Watcher() 
            {
                @Override
                public void process(WatchedEvent event) 
                {
                    // 創建的鎖目錄只有刪除事件
                    System.out.println("Received delete event, node path is " + event.getPath());
                    synchronized (this) 
                    {
                        notifyAll();
                    }
                }
            };
            
            String preLockPath = lockPaths.get(index - 1);
            // 查詢前一個目錄是否存在,並且注冊目錄事件監聽器,監聽一次事件后即刪除
            Stat state = getZkClient().exists(ROOT_LOCK_PATH + "/" + preLockPath, watcher);
            // 返回值為目錄詳細信息
            if (state == null) 
            {
                return tryLock(lockPath);
            } 
            else 
            {
                System.out.println(Thread.currentThread().getName() + " wait for " + preLockPath);
                synchronized (watcher) 
                {
                    // 等待目錄刪除事件喚醒
                    watcher.wait();
                }
                return tryLock(lockPath);
            }
        }
    }

    /**
     * 釋放鎖:實際上是刪除當前線程目錄
     * @param lockPath
     */
    public void releaseLock(String lockPath) 
    {
        try 
        {
            getZkClient().delete(lockPath, -1);
            System.out.println("Release lock, lock path is" + lockPath);
        } 
        catch (InterruptedException | KeeperException e) 
        {
            e.printStackTrace();
        }
    }
    
    private String zookeeperIp = "192.168.1.104:12181";
    private static ZooKeeper zkClient  = null;
    public ZooKeeper getZkClient() 
    {
        if(null == zkClient)
        {
            try
            {
                zkClient = new ZooKeeper(zookeeperIp, 3000, null);
            } 
            catch (IOException e) 
            {
                e.printStackTrace();
            }
        }
        return zkClient;
    }
}

2、 模擬分布式進程消費共享消息

package com.coshaho.learn.zookeeper;

import java.util.ArrayList;
import java.util.List;

import org.springframework.util.CollectionUtils;

/**
 * 分布式進程消費共享消息
 * @author coshaho
 *
 */
public class DistributeCache 
{
    private static List<String> msgCache = new ArrayList<String>();
    
    static class MsgConsumer extends Thread 
    {
        @Override
        public void run() 
        {
            while(!CollectionUtils.isEmpty(msgCache))
            {
                String lock = ZookeeperLock.getInstance().getLock();
                if(CollectionUtils.isEmpty(msgCache))
                {
                    return;
                }
                String msg = msgCache.get(0);
                System.out.println(Thread.currentThread().getName() + " consume msg: " + msg);
                try 
                {
                    Thread.sleep(1000);
                } 
                catch (InterruptedException e) 
                {
                    e.printStackTrace();
                }
                msgCache.remove(msg);
                ZookeeperLock.getInstance().releaseLock(lock);
            }
        }
    }
    
    public static void main(String[] args)
    {
        for(int i = 0; i < 10; i++)
        {
            msgCache.add("msg" + i);
        }
        MsgConsumer consumer1 = new MsgConsumer();
        MsgConsumer consumer2 = new MsgConsumer();
        consumer1.start();
        consumer2.start();
    }
}

3、 測試結果

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Thread-1 create lock path : /Locks/mylock_0000000217
Thread-0 create lock path : /Locks/mylock_0000000216
Thread-0 get lock, lock path: /Locks/mylock_0000000216
Thread-0 consume msg: msg0
Thread-1 wait for mylock_0000000216
Received delete event, node path is /Locks/mylock_0000000216
Release lock, lock path is/Locks/mylock_0000000216
Thread-1 get lock, lock path: /Locks/mylock_0000000217
Thread-1 consume msg: msg1
Thread-0 create lock path : /Locks/mylock_0000000218
Thread-0 wait for mylock_0000000217
Received delete event, node path is /Locks/mylock_0000000217
Release lock, lock path is/Locks/mylock_0000000217
Thread-0 get lock, lock path: /Locks/mylock_0000000218
Thread-0 consume msg: msg2
Thread-1 create lock path : /Locks/mylock_0000000219
Thread-1 wait for mylock_0000000218
Received delete event, node path is /Locks/mylock_0000000218
Release lock, lock path is/Locks/mylock_0000000218
Thread-1 get lock, lock path: /Locks/mylock_0000000219
Thread-1 consume msg: msg3
Thread-0 create lock path : /Locks/mylock_0000000220
Thread-0 wait for mylock_0000000219

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM