Curator典型應用場景之-分布式鎖


在分布式環境中 ,為了保證數據的一致性,經常在程序的某個運行點(例如,減庫存操作或者流水號生成等)需要進行同步控制。以一個"流水號生成"的場景為例,普通的后台應用通常都是使用時間戳來生成流水號,但是在用戶訪問量很大的情況下,可能會出現並發問題。下面通過示例程序就演示一個典型的並發問題:

public static void main(String[] args) throws Exception {

    CountDownLatch down = new CountDownLatch(1);
    for (int i=0;i<10;i++){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    down.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                String orderNo = sdf.format(new Date());
                System.out.println("生成的訂單號是:"+orderNo);
            }
        }).start();
    }
    down.countDown();
}

程序運行,輸出結果如下:

生成的訂單號是:15:30:28|365
生成的訂單號是:15:30:28|365
生成的訂單號是:15:30:28|367
生成的訂單號是:15:30:28|367
生成的訂單號是:15:30:28|367
生成的訂單號是:15:30:28|367
生成的訂單號是:15:30:28|369
生成的訂單號是:15:30:28|398
生成的訂單號是:15:30:28|367
生成的訂單號是:15:30:28|367

不難發現,生成的10個訂單不少都是重復的,如果是實際的生產環境中,這顯然沒有滿足我們的也無需求。究其原因,就是因為在沒有進行同步的情況下,出現了並發問題。下面我們來看看如何使用Curator實現分布式鎖功能。

Recipes實現的鎖有五種
Shared Reentrant Lockf分布式可重入鎖
官網地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
Shared Lock 分布式非可重入鎖
官網地址:http://curator.apache.org/curator-recipes/shared-lock.html
Shared Reentrant Read Write Lock可重入讀寫鎖
官網地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html
Shared Semaphore共享信號量
官網地址:http://curator.apache.org/curator-recipes/shared-semaphore.html
Multi Shared Lock 多共享鎖
官網地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html

Shared Reentrant Lock(分布式可重入鎖)
全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。

相關的類
InterProcessMutex

使用
創建InterProcessMutex實例
InterProcessMutex提供了兩個構造方法,傳入一個CuratorFramework實例和一個要使用的節點路徑,InterProcessMutex還允許傳入一個自定義的驅動類,默認是使用StandardLockInternalsDriver。

public InterProcessMutex(CuratorFramework client, String path);
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);

獲取鎖

使用acquire方法獲取鎖,acquire方法有兩種:

public void acquire() throws Exception;

獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調用了幾次acquire(),就得調用幾次release()釋放。

public boolean acquire(long time, TimeUnit unit) throws Exception;

與acquire()類似,等待time * unit時間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。

釋放鎖
使用release()方法釋放鎖
線程通過acquire()獲取鎖時,可通過release()進行釋放,如果該線程多次調用 了acquire()獲取鎖,則如果只調用 一次release()該鎖仍然會被該線程持有。

注意:同一個線程中InterProcessMutex實例是可重用的,也就是不需要在每次獲取鎖的時候都new一個InterProcessMutex實例,用同一個實例就好。

鎖撤銷
InterProcessMutex 支持鎖撤銷機制,可通過調用makeRevocable()將鎖設為可撤銷的,當另一線程希望你釋放該鎖時,實例里的listener會被調用。 撤銷機制是協作的。

public void makeRevocable(RevocationListener<T> listener);

如果你請求撤銷當前的鎖, 調用Revoker類中的靜態方法attemptRevoke()要求鎖被釋放或者撤銷。如果該鎖上注冊有RevocationListener監聽,該監聽會被調用。

public static void attemptRevoke(CuratorFramework client, String path) throws Exception;

示例代碼(官網)
共享資源

public class FakeLimitedResource {

    //總共250張火車票
    private Integer ticket = 250;

    public void use() throws InterruptedException {
        try {
            System.out.println("火車票還剩"+(--ticket)+"張!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

使用鎖操作資源

 

public class ExampleClientThatLocks {

    /***/
    private final InterProcessMutex lock;
    /** 共享資源 */
    private final FakeLimitedResource resource;
    /** 客戶端名稱 */
    private final String clientName;

    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if ( !lock.acquire(time, unit) ) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " has the lock");
            //操作資源
            resource.use();
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); //總是在Final塊中釋放鎖。
        }
    }
}

客戶端

 

public class LockingExample {
    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String CONNECTION_STRING = "172.20.10.9:2181";
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {

        //FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個進程訪問
        final FakeLimitedResource resource = new FakeLimitedResource();

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        try {
            for ( int i = 0; i < QTY; ++i ){
                final int index = i;
                Callable<Void>  task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
                        try {
                            client.start();
                            ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
                            for ( int j = 0; j < REPETITIONS; ++j ) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        }catch ( InterruptedException e ){
                            Thread.currentThread().interrupt();
                        }catch ( Exception e ){
                            e.printStackTrace();
                        }finally{
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

起五個線程,即五個窗口賣票,五個客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。

Shared Lock(不可重入鎖)
與Shared Reentrant Lock類似,但是不能重入。

相關的類
InterProcessSemaphoreMutex

使用
創建InterProcessSemaphoreMutex實例

public InterProcessSemaphoreMutex(CuratorFramework client, String path);

示例代碼
我們只需要將上面的例子修改一下,測試一下它的重入。 修改ExampleClientThatLocks,修改鎖的類型,並連續兩次acquire:

public class ExampleClientThatLocks {

    /***/
    private final InterProcessSemaphoreMutex lock;
    /** 共享資源 */
    private final FakeLimitedResource resource;
    /** 客戶端名稱 */
    private final String clientName;

    public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if ( !lock.acquire(time, unit) ) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        System.out.println(clientName + " has the lock");
        if ( !lock.acquire(time, unit) ) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        System.out.println(clientName + " has the lock again");
        try {
            //操作資源
            resource.use();
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); //總是在Final塊中釋放鎖。
            lock.release(); //調用兩次acquire釋放兩次
        }
    }
}

注意我們也需要調用release兩次。這和JDK的ReentrantLock用法一致。如果少調用一次release,則此線程依然擁有鎖。 上面的代碼沒有問題,我們可以多次調用acquire,后續的acquire也不會阻塞。

將上面的InterProcessMutex換成不可重入鎖InterProcessSemaphoreMutex,如果再運行上面的代碼,結果就會發現線程被阻塞再第二個acquire上。直到超時報異常:
java.lang.IllegalStateException: Client 1 could not acquire the lock 說明鎖是不可重入的。

Shared Reentrant Read Write Lock分布式可重入讀寫鎖
讀寫鎖負責管理一對相關的鎖,一個負責讀操作,一個負責寫操作。讀鎖在沒有寫鎖沒被使用時能夠被多個讀進行使用。但是寫鎖只能被一個進得持有。 只有當寫鎖釋放時,讀鎖才能被持有,一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖。 這也意味着寫鎖可以降級成讀鎖, 比如請求寫鎖 —>讀鎖 —->釋放寫鎖。 從讀鎖升級成寫鎖是不行的。可重入讀寫鎖是“公平的”,每個用戶將按請求的順序獲取鎖。

相關的類
InterProcessReadWriteLock
InterProcessLock

使用
創建InterProcessReadWriteLock

public InterProcessReadWriteLock(CuratorFramework client, String basePath);

獲取鎖

可通過readLock()和writeLock())分別獲取鎖類型,再通過acquire()獲取鎖。

public InterProcessMutex readLock();
public InterProcessMutex writeLock();

示例代碼

public class CuratorLockSharedReentrantReadWriteLockZookeeper {

    private static final int SECOND = 1000;
    private static final String PATH="/examples/locks";
    private static final String CONNECTION_STRING = "192.168.58.42:2181";

    public static void main(String[] args) throws Exception {

        CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
        client.start();
        // todo 在此可添加ConnectionStateListener監聽
        System.out.println("Server connected...");
        final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, PATH);
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 30; i++) {
            final int index = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        down.await();
                        if (index % 2 == 0) {
                            lock.readLock().acquire();
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                            String orderNo = sdf.format(new Date());
                            System.out.println("[READ]生成的訂單號是:" + orderNo);
                        } else {
                            lock.writeLock().acquire();
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                            String orderNo = sdf.format(new Date());
                            System.out.println("[WRITE]生成的訂單號是:" + orderNo);
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            if (index % 2 == 0) {
                                lock.readLock().release();
                            } else {
                                lock.writeLock().release();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
        // 保證所有線程內部邏輯執行時間一致
        down.countDown();
        Thread.sleep(10 * SECOND);
        if (client != null) {
            client.close();
        }
        System.out.println("Server closed...");
    }
}

 

運行程序,打印如下結果:

Server connected…
[WRITE]生成的訂單號是:11:40:25|042
[WRITE]生成的訂單號是:11:40:25|098
[READ]生成的訂單號是:11:40:25|116
[READ]生成的訂單號是:11:40:25|127
[READ]生成的訂單號是:11:40:25|137
[READ]生成的訂單號是:11:40:25|141
[READ]生成的訂單號是:11:40:25|175
[READ]生成的訂單號是:11:40:25|214
[WRITE]生成的訂單號是:11:40:25|244
[READ]生成的訂單號是:11:40:25|276
[READ]生成的訂單號是:11:40:25|276
[WRITE]生成的訂單號是:11:40:25|347
[WRITE]生成的訂單號是:11:40:25|370
[READ]生成的訂單號是:11:40:25|378
[WRITE]生成的訂單號是:11:40:25|413
[WRITE]生成的訂單號是:11:40:25|469
[WRITE]生成的訂單號是:11:40:25|499
[WRITE]生成的訂單號是:11:40:25|519
[READ]生成的訂單號是:11:40:25|574
[WRITE]生成的訂單號是:11:40:25|595
[WRITE]生成的訂單號是:11:40:25|636
[WRITE]生成的訂單號是:11:40:25|670
[READ]生成的訂單號是:11:40:25|698
[WRITE]生成的訂單號是:11:40:25|719
[WRITE]生成的訂單號是:11:40:25|742
[READ]生成的訂單號是:11:40:25|756
[READ]生成的訂單號是:11:40:25|771
[READ]生成的訂單號是:11:40:25|776
[WRITE]生成的訂單號是:11:40:25|789
[READ]生成的訂單號是:11:40:25|805
Server closed…

可以看到通過獲得read鎖生成的訂單中是有重復的,而獲取的寫鎖中是沒有重復數據的。符合讀寫鎖的特點。

共享信號量Shared Semaphore
一個計數的信號量類似JDK的Semaphore,所有使用相同鎖定路徑的jvm中所有進程都將實現進程間有限的租約。此外,這個信號量大多是“公平的” - 每個用戶將按照要求的順序獲得租約。
有兩種方式決定信號號的最大租約數。一種是由用戶指定的路徑來決定最大租約數,一種是通過SharedCountReader來決定。
如果未使用SharedCountReader,則不會進行內部檢查比如A表現為有10個租約,進程B表現為有20個。因此,請確保所有進程中的所有實例都使用相同的numberOfLeases值。
acuquire()方法返回的是Lease對象,客戶端在使用完后必須要關閉該lease對象(一般在finally中進行關閉),否則該對象會丟失。如果進程session丟失(如崩潰),該客戶端擁有的所有lease會被自動關閉,此時其他端能夠使用這些lease。

相關的類
InterProcessSemaphoreV2
Lease
SharedCountReader

使用
創建實例

public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);
public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);

獲取Lease
請求獲取lease,如果Semaphore當前的租約不夠,該方法會一直阻塞,直到最大租約數增大或者其他客戶端釋放了一個lease。 當lease對象獲取成功后,處理完成后,客戶端必須調用close該lease(可通過return()方法釋放lease)。最好在finally塊中close。

//獲取一個租約
public Lease acquire() throws Exception;
//獲取多個租約
public Collection<Lease> acquire(int qty) throws Exception;
//對應的有阻塞時間的acquire()方法
public Lease acquire(long time, TimeUnit unit) throws Exception;
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception;

釋放lease

public void returnAll(Collection<Lease> leases);
public void returnLease(Lease lease);

示例代碼

public class InterProcessSemaphoreExample {
    private static final int MAX_LEASE=10;
    private static final String PATH="/examples/locks";
    private static final String CONNECTION_STRING = "172.20.10.9:2181";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try{

            CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

構造參數中最多有10個租約,首先我們先獲得了5個租約,然后再獲取一個,這個時候semaphore還剩4個, 接着再請求了5個租約,因為semaphore還有4個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null。

Multi Shared Lock(多共享分布式鎖)
多個鎖作為一個鎖,可以同時在多個資源上加鎖。一個維護多個鎖對象的容器。當調用 acquire()時,獲取容器中所有的鎖對象,請求失敗時,釋放所有鎖對象。同樣調用release()也會釋放所有的鎖。

相關的類
InterProcessMultiLock
InterProcessLock

使用
創建InterProcessMultiLock

public InterProcessMultiLock(CuratorFramework client, List<String> paths);
public InterProcessMultiLock(List<InterProcessLock> locks);

使用方式和Shared Lock相同。

示例代碼

public class InterProcessMultiLockExample {
    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";
    private static final String CONNECTION_STRING = "172.20.10.9:2181";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try {
            CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has the lock");

            System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //操作資源
            } finally {
                System.out.println("releasing the lock");
                lock.release(); //在finally中釋放鎖
            }
            System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

新建一個InterProcessMultiLock, 包含一個重入鎖和一個非重入鎖。 調用acquire后可以看到線程同時擁有了這兩個鎖。 調用release看到這兩個鎖都被釋放了。

統一的錯誤處理
看過官網的朋友一定發現,每一個鎖的文章最下面都有一個Error Handling,內容直接一鍵翻譯過來:
強烈建議您添加ConnectionStateListener並監視SUSPENDED和LOST狀態更改。如果報告了SUSPENDED狀態,則除非您隨后收到RECONNECTED狀態,否則您無法確定是否仍然持有該鎖。如果報告了LOST狀態,則確定您不再持有鎖。

當連接出現異常, 將通過ConnectionStateListener接口進行監聽, 並進行相應的處理, 這些狀態變化包括:
暫停(SUSPENDED): 當連接丟失, 將暫停所有操作, 直到連接重新建立, 如果在規定時間內無法建立連接, 將觸發LOST通知
重連(RECONNECTED): 連接丟失, 執行重連時, 將觸發該通知
丟失(LOST): 連接超時時, 將觸發該通知

新建一個類實現ConnectionStateListener

public class MyConnectionStateListener implements ConnectionStateListener {

    /** 節點路徑 */
    private String zkRegPathPrefix;
    /** 節點內容 */
    private String regContent;

    public MyConnectionStateListener(String zkRegPathPrefix, String regContent) {
        this.zkRegPathPrefix = zkRegPathPrefix;
        this.regContent = regContent;
    }

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (newState == ConnectionState.LOST) {
            //連接丟失
            System.out.println("lost session with zookeeper");
            System.out.println("鎖已經釋放,不再擁有該鎖");
            while(true){
                try {
                    System.err.println("嘗試重新連接......");
                    if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){
                        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));
                        break;
                    }
                } catch (InterruptedException e) {
                    break;
                } catch (Exception e){
                    //TODO: log something
                }
            }
        } else if (newState == ConnectionState.CONNECTED) {
            //連接新建
            System.out.println("connected with zookeeper");
        } else if (newState == ConnectionState.RECONNECTED) {
            //重新連接
            System.out.println("reconnected with zookeeper");
        }
    }
}

在client中添加該監聽

CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.42:2181",3000,3000, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
client.start();
// todo 在此可添加ConnectionStateListener監聽
MyConnectionStateListener connectionStateListener = new MyConnectionStateListener(PATH,"123456");
client.getConnectionStateListenable().addListener(connectionStateListener);
System.out.println("Server connected...");

啟動程序,然后斷掉網絡,就會觸發監聽,接收到ConnectionState.LOST狀態,表明該客戶端已經不再持有該鎖。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM