在分布式環境中 ,為了保證數據的一致性,經常在程序的某個運行點(例如,減庫存操作或者流水號生成等)需要進行同步控制。以一個"流水號生成"的場景為例,普通的后台應用通常都是使用時間戳來生成流水號,但是在用戶訪問量很大的情況下,可能會出現並發問題。下面通過示例程序就演示一個典型的並發問題:
public static void main(String[] args) throws Exception { CountDownLatch down = new CountDownLatch(1); for (int i=0;i<10;i++){ new Thread(new Runnable() { @Override public void run() { try { down.await(); } catch (InterruptedException e) { e.printStackTrace(); } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("生成的訂單號是:"+orderNo); } }).start(); } down.countDown(); }
程序運行,輸出結果如下:
生成的訂單號是:15:30:28|365 生成的訂單號是:15:30:28|365 生成的訂單號是:15:30:28|367 生成的訂單號是:15:30:28|367 生成的訂單號是:15:30:28|367 生成的訂單號是:15:30:28|367 生成的訂單號是:15:30:28|369 生成的訂單號是:15:30:28|398 生成的訂單號是:15:30:28|367 生成的訂單號是:15:30:28|367
不難發現,生成的10個訂單不少都是重復的,如果是實際的生產環境中,這顯然沒有滿足我們的也無需求。究其原因,就是因為在沒有進行同步的情況下,出現了並發問題。下面我們來看看如何使用Curator實現分布式鎖功能。
Recipes實現的鎖有五種
Shared Reentrant Lockf分布式可重入鎖
官網地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
Shared Lock 分布式非可重入鎖
官網地址:http://curator.apache.org/curator-recipes/shared-lock.html
Shared Reentrant Read Write Lock可重入讀寫鎖
官網地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html
Shared Semaphore共享信號量
官網地址:http://curator.apache.org/curator-recipes/shared-semaphore.html
Multi Shared Lock 多共享鎖
官網地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html
Shared Reentrant Lock(分布式可重入鎖)
全局同步的可重入分布式鎖,任何時刻不會有兩個客戶端同時持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味着同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。
相關的類
InterProcessMutex
使用
創建InterProcessMutex實例
InterProcessMutex提供了兩個構造方法,傳入一個CuratorFramework實例和一個要使用的節點路徑,InterProcessMutex還允許傳入一個自定義的驅動類,默認是使用StandardLockInternalsDriver。
public InterProcessMutex(CuratorFramework client, String path); public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);
獲取鎖
使用acquire方法獲取鎖,acquire方法有兩種:
public void acquire() throws Exception;
獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調用了幾次acquire(),就得調用幾次release()釋放。
public boolean acquire(long time, TimeUnit unit) throws Exception;
與acquire()類似,等待time * unit時間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。
釋放鎖
使用release()方法釋放鎖
線程通過acquire()獲取鎖時,可通過release()進行釋放,如果該線程多次調用 了acquire()獲取鎖,則如果只調用 一次release()該鎖仍然會被該線程持有。
注意:同一個線程中InterProcessMutex實例是可重用的,也就是不需要在每次獲取鎖的時候都new一個InterProcessMutex實例,用同一個實例就好。
鎖撤銷
InterProcessMutex 支持鎖撤銷機制,可通過調用makeRevocable()將鎖設為可撤銷的,當另一線程希望你釋放該鎖時,實例里的listener會被調用。 撤銷機制是協作的。
public void makeRevocable(RevocationListener<T> listener);
如果你請求撤銷當前的鎖, 調用Revoker類中的靜態方法attemptRevoke()要求鎖被釋放或者撤銷。如果該鎖上注冊有RevocationListener監聽,該監聽會被調用。
public static void attemptRevoke(CuratorFramework client, String path) throws Exception;
示例代碼(官網)
共享資源
public class FakeLimitedResource { //總共250張火車票 private Integer ticket = 250; public void use() throws InterruptedException { try { System.out.println("火車票還剩"+(--ticket)+"張!"); }catch (Exception e){ e.printStackTrace(); } } }
使用鎖操作資源
public class ExampleClientThatLocks { /** 鎖 */ private final InterProcessMutex lock; /** 共享資源 */ private final FakeLimitedResource resource; /** 客戶端名稱 */ private final String clientName; public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if ( !lock.acquire(time, unit) ) { throw new IllegalStateException(clientName + " could not acquire the lock"); } try { System.out.println(clientName + " has the lock"); //操作資源 resource.use(); } finally { System.out.println(clientName + " releasing the lock"); lock.release(); //總是在Final塊中釋放鎖。 } } }
客戶端
public class LockingExample { private static final int QTY = 5; private static final int REPETITIONS = QTY * 10; private static final String CONNECTION_STRING = "172.20.10.9:2181"; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { //FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個進程訪問 final FakeLimitedResource resource = new FakeLimitedResource(); ExecutorService service = Executors.newFixedThreadPool(QTY); try { for ( int i = 0; i < QTY; ++i ){ final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE)); try { client.start(); ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index); for ( int j = 0; j < REPETITIONS; ++j ) { example.doWork(10, TimeUnit.SECONDS); } }catch ( InterruptedException e ){ Thread.currentThread().interrupt(); }catch ( Exception e ){ e.printStackTrace(); }finally{ CloseableUtils.closeQuietly(client); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); }catch (Exception e){ e.printStackTrace(); } } }
起五個線程,即五個窗口賣票,五個客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。
Shared Lock(不可重入鎖)
與Shared Reentrant Lock類似,但是不能重入。
相關的類
InterProcessSemaphoreMutex
使用
創建InterProcessSemaphoreMutex實例
public InterProcessSemaphoreMutex(CuratorFramework client, String path);
示例代碼
我們只需要將上面的例子修改一下,測試一下它的重入。 修改ExampleClientThatLocks,修改鎖的類型,並連續兩次acquire:
public class ExampleClientThatLocks { /** 鎖 */ private final InterProcessSemaphoreMutex lock; /** 共享資源 */ private final FakeLimitedResource resource; /** 客戶端名稱 */ private final String clientName; public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessSemaphoreMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if ( !lock.acquire(time, unit) ) { throw new IllegalStateException(clientName + " could not acquire the lock"); } System.out.println(clientName + " has the lock"); if ( !lock.acquire(time, unit) ) { throw new IllegalStateException(clientName + " could not acquire the lock"); } System.out.println(clientName + " has the lock again"); try { //操作資源 resource.use(); } finally { System.out.println(clientName + " releasing the lock"); lock.release(); //總是在Final塊中釋放鎖。 lock.release(); //調用兩次acquire釋放兩次 } } }
注意我們也需要調用release兩次。這和JDK的ReentrantLock用法一致。如果少調用一次release,則此線程依然擁有鎖。 上面的代碼沒有問題,我們可以多次調用acquire,后續的acquire也不會阻塞。
將上面的InterProcessMutex換成不可重入鎖InterProcessSemaphoreMutex,如果再運行上面的代碼,結果就會發現線程被阻塞再第二個acquire上。直到超時報異常:
java.lang.IllegalStateException: Client 1 could not acquire the lock 說明鎖是不可重入的。
Shared Reentrant Read Write Lock分布式可重入讀寫鎖
讀寫鎖負責管理一對相關的鎖,一個負責讀操作,一個負責寫操作。讀鎖在沒有寫鎖沒被使用時能夠被多個讀進行使用。但是寫鎖只能被一個進得持有。 只有當寫鎖釋放時,讀鎖才能被持有,一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進入寫鎖。 這也意味着寫鎖可以降級成讀鎖, 比如請求寫鎖 —>讀鎖 —->釋放寫鎖。 從讀鎖升級成寫鎖是不行的。可重入讀寫鎖是“公平的”,每個用戶將按請求的順序獲取鎖。
相關的類
InterProcessReadWriteLock
InterProcessLock
使用
創建InterProcessReadWriteLock
public InterProcessReadWriteLock(CuratorFramework client, String basePath);
獲取鎖
可通過readLock()和writeLock())分別獲取鎖類型,再通過acquire()獲取鎖。
public InterProcessMutex readLock(); public InterProcessMutex writeLock();
示例代碼
public class CuratorLockSharedReentrantReadWriteLockZookeeper { private static final int SECOND = 1000; private static final String PATH="/examples/locks"; private static final String CONNECTION_STRING = "192.168.58.42:2181"; public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE)); client.start(); // todo 在此可添加ConnectionStateListener監聽 System.out.println("Server connected..."); final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, PATH); final CountDownLatch down = new CountDownLatch(1); for (int i = 0; i < 30; i++) { final int index = i; new Thread(new Runnable() { @Override public void run() { try { down.await(); if (index % 2 == 0) { lock.readLock().acquire(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("[READ]生成的訂單號是:" + orderNo); } else { lock.writeLock().acquire(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("[WRITE]生成的訂單號是:" + orderNo); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (index % 2 == 0) { lock.readLock().release(); } else { lock.writeLock().release(); } } catch (Exception e) { e.printStackTrace(); } } } }).start(); } // 保證所有線程內部邏輯執行時間一致 down.countDown(); Thread.sleep(10 * SECOND); if (client != null) { client.close(); } System.out.println("Server closed..."); } }
運行程序,打印如下結果:
Server connected… [WRITE]生成的訂單號是:11:40:25|042 [WRITE]生成的訂單號是:11:40:25|098 [READ]生成的訂單號是:11:40:25|116 [READ]生成的訂單號是:11:40:25|127 [READ]生成的訂單號是:11:40:25|137 [READ]生成的訂單號是:11:40:25|141 [READ]生成的訂單號是:11:40:25|175 [READ]生成的訂單號是:11:40:25|214 [WRITE]生成的訂單號是:11:40:25|244 [READ]生成的訂單號是:11:40:25|276 [READ]生成的訂單號是:11:40:25|276 [WRITE]生成的訂單號是:11:40:25|347 [WRITE]生成的訂單號是:11:40:25|370 [READ]生成的訂單號是:11:40:25|378 [WRITE]生成的訂單號是:11:40:25|413 [WRITE]生成的訂單號是:11:40:25|469 [WRITE]生成的訂單號是:11:40:25|499 [WRITE]生成的訂單號是:11:40:25|519 [READ]生成的訂單號是:11:40:25|574 [WRITE]生成的訂單號是:11:40:25|595 [WRITE]生成的訂單號是:11:40:25|636 [WRITE]生成的訂單號是:11:40:25|670 [READ]生成的訂單號是:11:40:25|698 [WRITE]生成的訂單號是:11:40:25|719 [WRITE]生成的訂單號是:11:40:25|742 [READ]生成的訂單號是:11:40:25|756 [READ]生成的訂單號是:11:40:25|771 [READ]生成的訂單號是:11:40:25|776 [WRITE]生成的訂單號是:11:40:25|789 [READ]生成的訂單號是:11:40:25|805 Server closed…
可以看到通過獲得read鎖生成的訂單中是有重復的,而獲取的寫鎖中是沒有重復數據的。符合讀寫鎖的特點。
共享信號量Shared Semaphore
一個計數的信號量類似JDK的Semaphore,所有使用相同鎖定路徑的jvm中所有進程都將實現進程間有限的租約。此外,這個信號量大多是“公平的” - 每個用戶將按照要求的順序獲得租約。
有兩種方式決定信號號的最大租約數。一種是由用戶指定的路徑來決定最大租約數,一種是通過SharedCountReader來決定。
如果未使用SharedCountReader,則不會進行內部檢查比如A表現為有10個租約,進程B表現為有20個。因此,請確保所有進程中的所有實例都使用相同的numberOfLeases值。
acuquire()方法返回的是Lease對象,客戶端在使用完后必須要關閉該lease對象(一般在finally中進行關閉),否則該對象會丟失。如果進程session丟失(如崩潰),該客戶端擁有的所有lease會被自動關閉,此時其他端能夠使用這些lease。
相關的類
InterProcessSemaphoreV2
Lease
SharedCountReader
使用
創建實例
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases); public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);
獲取Lease
請求獲取lease,如果Semaphore當前的租約不夠,該方法會一直阻塞,直到最大租約數增大或者其他客戶端釋放了一個lease。 當lease對象獲取成功后,處理完成后,客戶端必須調用close該lease(可通過return()方法釋放lease)。最好在finally塊中close。
//獲取一個租約 public Lease acquire() throws Exception; //獲取多個租約 public Collection<Lease> acquire(int qty) throws Exception; //對應的有阻塞時間的acquire()方法 public Lease acquire(long time, TimeUnit unit) throws Exception; public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception;
釋放lease
public void returnAll(Collection<Lease> leases); public void returnLease(Lease lease);
示例代碼
public class InterProcessSemaphoreExample { private static final int MAX_LEASE=10; private static final String PATH="/examples/locks"; private static final String CONNECTION_STRING = "172.20.10.9:2181"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); try{ CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE)); client.start(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE); Collection<Lease> leases = semaphore.acquire(5); System.out.println("get " + leases.size() + " leases"); Lease lease = semaphore.acquire(); System.out.println("get another lease"); resource.use(); Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS); System.out.println("Should timeout and acquire return " + leases2); System.out.println("return one lease"); semaphore.returnLease(lease); System.out.println("return another 5 leases"); semaphore.returnAll(leases); }catch (Exception e){ e.printStackTrace(); } } }
構造參數中最多有10個租約,首先我們先獲得了5個租約,然后再獲取一個,這個時候semaphore還剩4個, 接着再請求了5個租約,因為semaphore還有4個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null。
Multi Shared Lock(多共享分布式鎖)
多個鎖作為一個鎖,可以同時在多個資源上加鎖。一個維護多個鎖對象的容器。當調用 acquire()時,獲取容器中所有的鎖對象,請求失敗時,釋放所有鎖對象。同樣調用release()也會釋放所有的鎖。
相關的類
InterProcessMultiLock
InterProcessLock
使用
創建InterProcessMultiLock
public InterProcessMultiLock(CuratorFramework client, List<String> paths); public InterProcessMultiLock(List<InterProcessLock> locks);
使用方式和Shared Lock相同。
示例代碼
public class InterProcessMultiLockExample { private static final String PATH1 = "/examples/locks1"; private static final String PATH2 = "/examples/locks2"; private static final String CONNECTION_STRING = "172.20.10.9:2181"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); try { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE)); client.start(); InterProcessLock lock1 = new InterProcessMutex(client, PATH1); InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2)); if (!lock.acquire(10, TimeUnit.SECONDS)) { throw new IllegalStateException("could not acquire the lock"); } System.out.println("has the lock"); System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess()); System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess()); try { resource.use(); //操作資源 } finally { System.out.println("releasing the lock"); lock.release(); //在finally中釋放鎖 } System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess()); System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess()); }catch (Exception e){ e.printStackTrace(); } } }
新建一個InterProcessMultiLock, 包含一個重入鎖和一個非重入鎖。 調用acquire后可以看到線程同時擁有了這兩個鎖。 調用release看到這兩個鎖都被釋放了。
統一的錯誤處理
看過官網的朋友一定發現,每一個鎖的文章最下面都有一個Error Handling,內容直接一鍵翻譯過來:
強烈建議您添加ConnectionStateListener並監視SUSPENDED和LOST狀態更改。如果報告了SUSPENDED狀態,則除非您隨后收到RECONNECTED狀態,否則您無法確定是否仍然持有該鎖。如果報告了LOST狀態,則確定您不再持有鎖。
當連接出現異常, 將通過ConnectionStateListener接口進行監聽, 並進行相應的處理, 這些狀態變化包括:
暫停(SUSPENDED): 當連接丟失, 將暫停所有操作, 直到連接重新建立, 如果在規定時間內無法建立連接, 將觸發LOST通知
重連(RECONNECTED): 連接丟失, 執行重連時, 將觸發該通知
丟失(LOST): 連接超時時, 將觸發該通知
新建一個類實現ConnectionStateListener
public class MyConnectionStateListener implements ConnectionStateListener { /** 節點路徑 */ private String zkRegPathPrefix; /** 節點內容 */ private String regContent; public MyConnectionStateListener(String zkRegPathPrefix, String regContent) { this.zkRegPathPrefix = zkRegPathPrefix; this.regContent = regContent; } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.LOST) { //連接丟失 System.out.println("lost session with zookeeper"); System.out.println("鎖已經釋放,不再擁有該鎖"); while(true){ try { System.err.println("嘗試重新連接......"); if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){ client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkRegPathPrefix, regContent.getBytes("UTF-8")); break; } } catch (InterruptedException e) { break; } catch (Exception e){ //TODO: log something } } } else if (newState == ConnectionState.CONNECTED) { //連接新建 System.out.println("connected with zookeeper"); } else if (newState == ConnectionState.RECONNECTED) { //重新連接 System.out.println("reconnected with zookeeper"); } } }
在client中添加該監聽
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.42:2181",3000,3000, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
client.start();
// todo 在此可添加ConnectionStateListener監聽
MyConnectionStateListener connectionStateListener = new MyConnectionStateListener(PATH,"123456");
client.getConnectionStateListenable().addListener(connectionStateListener);
System.out.println("Server connected...");
啟動程序,然后斷掉網絡,就會觸發監聽,接收到ConnectionState.LOST狀態,表明該客戶端已經不再持有該鎖。
