分布式鎖和Master選舉相似點
分布式鎖和 Master選舉有幾種相似點,實際上其實現機制也相近:
同一時刻只有一個獲取鎖 / 只能有一個leader
對於分布式排他鎖來說,任意時刻,只能有一個進程(對於單進程內的鎖是單線程)可以獲得鎖。
對於領導選舉來說,任意時刻,只能有一個成功當選為leader。否則就會出現腦裂。
鎖重入 / 確認自己是leader
對於分布式鎖,需要保證獲得鎖的進程在釋放鎖之前可再次獲得鎖,即鎖的可重入性。
對於領導選舉,Leader需要能夠確認自己已經獲得領導權,即確認自己是Leader。
釋放鎖 / 放棄領導權
鎖的獲得者應該能夠正確釋放已經獲得的鎖,並且當獲得鎖的進程宕機時,鎖應該自動釋放,從而使得其它競爭方可以獲得該鎖,從而避免出現死鎖的狀態。
領導應該可以主動放棄領導權,並且當領導所在進程宕機時,領導權應該自動釋放,從而使得其它參與者可重新競爭領導而避免進入無主狀態。
感知鎖釋放 / 感知領導權釋放
當獲得鎖的一方釋放鎖時,其它對於鎖的競爭方需要能夠感知到鎖的釋放,並再次嘗試獲取鎖。
原來的Leader放棄領導權時,其它參與方應該能夠感知該事件,並重新發起選舉流程。
Curator中選舉分為兩種:
Leader Latch和Leader Election
Leader Latch
LeaderLatch方式就是以一種搶占方式來決定選主,是一種非公平的領導選舉,誰搶到就是誰,會隨機從候選者中選擇一台作為leader, 選中后除非leader自己 調用close()釋放leadership,否則其他的候選者不能成為leader。
選主過程
假設現在有三個zookeeper的客戶端,如下圖所示,同時競爭leader。這三個客戶端同時向zookeeper集群注冊Ephemeral且Non-sequence類型的節點,路徑都為/zkroot/leader。
如上圖所示,由於是Non-sequence節點,這三個客戶端只會有一個創建成功,其它節點均創建失敗。此時,創建成功的客戶端(即上圖中的Client 1)即成功競選為 Leader 。其它客戶端(即上圖中的Client 2和Client 3)此時勻為 Follower。
放棄領導權
如果Leader打算主動放棄領導權,直接刪除/zkroot/leader節點即可。
如果Leader進程意外宕機,其與Zookeeper間的Session也結束,該節點由於是Ephemeral類型的節點,因此也會自動被刪除。
此時/zkroot/leader節點不復存在,對於其它參與競選的客戶端而言,之前的Leader已經放棄了領導權。
感知領導權的放棄
由上圖可見,創建節點失敗的節點,除了成為 Follower 以外,還會向/zkroot/leader注冊一個 Watch ,一旦 Leader 放棄領導權,也即該節點被刪除,所有的 Follower 會收到通知。
重新選舉
感知到舊 Leader 放棄領導權后,所有的 Follower 可以再次發起新一輪的領導選舉,如下圖所示。
從上圖中可見
新一輪的領導選舉方法與最初的領導選舉方法完全一樣,都是發起節點創建請求,創建成功即為Leader,否則為Follower,且Follower會Watch該節點。
新一輪的選舉結果,無法預測,與它們在第一輪選舉中的順序無關。這也是該方案被稱為非公平模式的原因。
Leader Latch模式總結
1. Leader Latch實現很簡單,每一輪的選舉算法都一樣。 2. 非公平模式,每一次選舉都是隨機,誰搶到就是誰的,假如是第二次選舉,每個 Follower 通過 Watch 感知到節點被刪除的時間不完全一樣,只要有一個 Follower 得到通知即發起競選。 3. 給zookeeper造成的負載大,假如有上萬個客戶端都參與競選,意味着同時會有上萬個寫請求發送給 Zookeper。同時一旦 Leader 放棄領導權,Zookeeper 需要同時通知上萬個 Follower,負載較大。
使用過程
相關的類
LeaderLatch
構造LeaderLatch ,構造方法如下:
public LeaderLatch(CuratorFramework client, String latchPath); public LeaderLatch(CuratorFramework client, String latchPath, String id);
啟動
通過start()方法啟動之后,再等待幾秒鍾后,Curator會自動從中選舉出Leader。
public void start() throws Exception;
可以調用實例的hasLeadership()判斷該實例是否為leader。
public boolean hasLeadership();
嘗試獲取leadership
調用await()方法會使線程一直阻塞到獲得leadership為止。
public void await() throws InterruptedException, EOFException; public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
釋放leadership
只能通過close()釋放leadership, 只有leader將leadership釋放時,其他的候選者才有機會被選為leader
public void close() throws IOException; public synchronized void close(CloseMode closeMode) throws IOException;
示例代碼
public class TestLeaderLatch { private static final String PATH = "/demo/leader"; /** 5個客戶端 */ private static final Integer CLIENT_COUNT = 5; public static void main(String[] args) throws Exception { //5個線程,5個客戶端 ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT); for (int i = 0; i < CLIENT_COUNT ; i++) { final int index = i; service.submit(new Runnable() { @Override public void run() { try { new TestLeaderLatch().schedule(index); } catch (Exception e) { e.printStackTrace(); } } }); } //休眠50秒之后結束main方法 Thread.sleep(30 * 1000); service.shutdownNow(); } private void schedule(int thread) throws Exception { //獲取一個client CuratorFramework client = this.getClient(thread); //獲取一個latch LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread)); //給latch添加監聽,在 latch.addListener(new LeaderLatchListener() { @Override public void notLeader() { //如果不是leader System.out.println("Client [" + thread + "] I am the follower !"); } @Override public void isLeader() { //如果是leader System.out.println("Client [" + thread + "] I am the leader !"); } }); //開始選取 leader latch.start(); //每個線程 休眠時間不一樣,但是最大不能超過 main方法中的那個休眠時間,那個是50秒 到時候main方法結束 會中斷休眠時間 Thread.sleep(2 * (thread + 5) * 1000); if (latch != null) { //釋放leadership //CloseMode.NOTIFY_LEADER 節點狀態改變時,通知LeaderLatchListener latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER); } if (client != null) { client.close(); } System.out.println("Client [" + latch.getId() + "] Server closed..."); } private CuratorFramework getClient(final int thread) { RetryPolicy rp = new ExponentialBackoffRetry(1000, 3); // Fluent風格創建 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.58.42:2181") .sessionTimeoutMs(1000000) .connectionTimeoutMs(3000) .retryPolicy(rp) .build(); client.start(); System.out.println("Client [" + thread + "] Server connected..."); return client; } }
程序運行,輸出以下結果:
Client [3] Server connected… Client [2] Server connected… Client [4] Server connected… Client [0] Server connected… Client [1] Server connected… Client [1] I am the leader ! Client [0] Server closed… Client [1] I am the follower ! Client [1] Server closed… Client [2] I am the leader ! Client [2] I am the follower ! Client [2] Server closed… Client [4] I am the leader ! Client [3] Server closed… Client [4] I am the follower ! Client [4] Server closed…
在上面的程序中,啟動了5個zookeeper客戶端,程序會隨機選中其中一個作為leader。通過注冊監聽的方式來判斷自己是否成為leader。調用close()方法釋放當前領導權。有可能優先close的並不是leader節點,但是當leader節點close的時候,可以繼續在已有的節點中重新選舉leader節點。
LeaderElection
上面講了怎么使用LeaderLatch方式進行master選舉,Curator提供了兩種選舉,一種是LeaderLatch,提供的另一種Leader選舉策略是Leader Election。
跟LeaderLatch選舉策略相比,LeaderElection選舉策略不同之處在於每個實例都能公平獲取領導權,而且當獲取領導權的實例在釋放領導權之后,該實例還有機會再次獲取領導權。
另外,選舉出來的leader不會一直占有領導權,當 takeLeadership(CuratorFramework client) 方法執行結束之后會自動釋放領導權。LeaderElection屬於公平的選舉方式,通過LeaderSelectorListener可以對領導權進行控制, 在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。 而LeaderLatch則一直持有leadership, 除非調用close方法,否則它不會釋放領導權。
選主過程
如下圖所示,LeaderElection選舉中,各客戶端均創建/zkroot/leader節點,且其類型為Ephemeral與Sequence。
由於是Sequence類型節點,故上圖中三個客戶端均創建成功,只是序號不一樣。此時,每個客戶端都會判斷自己創建成功的節點的序號是不是當前最小的。如果是,則該客戶端為 Leader,否則即為 Follower。
在上圖中,Client1 創建的節點序號為1 ,Client2創建的節點序號為2,Client3創建的節點序號為3。由於最小序號為 1 ,且該節點由Client1創建,故Client 1為 Leader 。
放棄領導權
Leader 如果主動放棄領導權,直接刪除其創建的節點即可。
如果 Leader 所在進程意外宕機,其與 Zookeeper 間的 Session 結束,由於其創建的節點為Ephemeral類型,故該節點自動被刪除。
感知領導權的放棄
與LeaderLatch方式不同,每個 Follower 並非都 Watch 由 Leader 創建出來的節點,而是 Watch 序號剛好比自己序號小的節點。
在上圖中,總共有 1、2、3 共三個節點,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序號應該是10位數字,而非一位數字,序號最大為int最大值)。
一旦Leader棄權或者宕機,/zkroot/leader1被刪除,Client2可得到通知。此時Client3由於 Watch 的是/zkroot/leader2,故不會得到通知。
重新選舉
Client2得到/zkroot/leader1被刪除的通知后,不會立即成為新的 Leader 。而是先判斷自己的序號2是不是當前最小的序號。在該場景下,其序號確為最小。因此Client 2成為新的 Leader 。
這里要注意,如果在Client1放棄領導權之前,Client2就宕機了,Client3會收到通知。此時Client3不會立即成為Leader,而是要先判斷自己的序號3是否為當前最小序號。很顯然,由於Client1創建的/zkroot/leader1還在,因此Client 3不會成為新的 Leader ,並向Client2序號2 前面的序號,也即 1 創建 Watch。該過程如下圖所示。
LeaderElection模式總結
擴展性好,每個客戶端都只Watch 一個節點且每次節點被刪除只須通知一個客戶端
舊 Leader 放棄領導權時,其它客戶端根據競選的先后順序(也即節點序號)成為新 Leader,這也是公平模式的由來。
延遲相對非公平模式要高,因為它必須等待特定節點得到通知才能選出新的 Leader。
使用過程
相關的類
LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException
使用方法 創建 LeaderSelector
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener); public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener); public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);
啟動
leaderSelector.start();
一旦啟動,如果獲取了leadership的話,takeLeadership()會被調用,只有當leader釋放了leadership的時候,takeLeadership()才會返回。
釋放
調用close()釋放 leadership
leaderSelector.close();
示例代碼
LeaderSelectorListener的實現類
實現LeaderSelectorListener 或者 繼承LeaderSelectorListenerAdapter,用於定義獲取領導權后的業務邏輯:
public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable { /** 客戶端名稱 */ private String name; /** leaderSelector */ private LeaderSelector leaderSelector; /** 原子性的 用來記錄獲取 leader的次數 */ public AtomicInteger leaderCount = new AtomicInteger(1); public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){ this.name = name; this.leaderSelector = new LeaderSelector(client, path, this); /** * 自動重新排隊 * 該方法的調用可以確保此實例在釋放領導權后還可能獲得領導權 */ leaderSelector.autoRequeue(); } /** * 啟動 調用leaderSelector.start() * @throws IOException */ public void start() throws IOException { leaderSelector.start(); } /** * 獲取領導權之后執行的業務邏輯,執行完自動放棄領導權 * @param client * @throws Exception */ @Override public void takeLeadership(CuratorFramework client) throws Exception { final int waitSeconds = 2; System.out.println(name + "成為當前leader" + " 共成為leader的次數:" + leaderCount.getAndIncrement() + "次"); try{ //模擬業務邏輯執行2秒 Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); }catch ( InterruptedException e ){ System.err.println(name + "已被中斷"); Thread.currentThread().interrupt(); }finally{ System.out.println(name + "放棄領導權"); } } @Override public void close() throws IOException { leaderSelector.close(); } }
多個客戶端測試
public class TestLeaderElection { private static final String PATH = "/demo/leader"; /** 3個客戶端 */ private static final Integer CLIENT_COUNT = 3; public static void main(String[] args) throws Exception { ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT); for (int i = 0; i < CLIENT_COUNT; i++) { final int index = i; service.submit(new Runnable() { @Override public void run() { try { new TestLeaderElection().schedule(index); } catch (Exception e) { e.printStackTrace(); } } }); } Thread.sleep(30 * 1000); service.shutdownNow(); } private void schedule(final int thread) throws Exception { CuratorFramework client = this.getClient(thread); CustomLeaderSelectorListenerAdapter leaderSelectorListener = new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread); leaderSelectorListener.start(); } private CuratorFramework getClient(final int thread) { RetryPolicy rp = new ExponentialBackoffRetry(1000, 3); // Fluent風格創建 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.58.42:2181") .sessionTimeoutMs(1000000) .connectionTimeoutMs(3000) .retryPolicy(rp) .build(); client.start(); System.out.println("Client [" + thread + "] Server connected..."); return client; } }
運行程序,輸出以下內容:
Client [0] Server connected… Client [1] Server connected… Client [2] Server connected… Client #2成為當前leader 共成為leader的次數:1次 Client #2放棄領導權 Client #0成為當前leader 共成為leader的次數:1次 Client #0放棄領導權 Client #1成為當前leader 共成為leader的次數:1次 Client #1放棄領導權 Client #2成為當前leader 共成為leader的次數:2次 Client #2放棄領導權 Client #0成為當前leader 共成為leader的次數:2次 Client #0放棄領導權 Client #1成為當前leader 共成為leader的次數:2次 Client #1放棄領導權 Client #2成為當前leader 共成為leader的次數:3次 Client #2放棄領導權 Client #0成為當前leader 共成為leader的次數:3次 Client #0放棄領導權 Client #1成為當前leader 共成為leader的次數:3次 Client #1放棄領導權 Client #2成為當前leader 共成為leader的次數:4次 Client #2放棄領導權 Client #0成為當前leader 共成為leader的次數:4次 Client #0放棄領導權 Client #1成為當前leader 共成為leader的次數:4次 Client #1放棄領導權
上面只是簡單測試代碼,並沒有關閉client等操作,每個實例在獲取領導權后,如果 takeLeadership(CuratorFramework client) 方法執行結束,將會釋放其領導權。而且獲取領導權 也是按照 Client #2, Client #0 ,Client #1 順序來的,正好驗證了它的公平性。
LeaderSelectorListener類繼承了ConnectionStateListener。一旦LeaderSelector啟動,它會向curator客戶端添加監聽器。 使用LeaderSelector必須時刻注意連接的變化。一旦出現連接問題如SUSPENDED,curator實例必須確保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出現,curator實例不再是leader並且其takeLeadership()應該直接退出。
推薦的做法是,如果發生SUSPENDED或者LOST連接問題,最好直接拋CancelLeadershipException,此時,leaderSelector實例會嘗試中斷並且取消正在執行takeLeadership()方法的線程。
建議擴展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已經提供了推薦的處理方式 。