04.Curator Leader選舉


    在分布式計算中,leader election是很重要的一個功能,這個選舉過程是這樣子的:指派一個進程作為組織者,將任務分發給各節點。在任務開始前,哪個節點都不知道誰是leader或者coordinator。當選舉算法開始執行后,每個節點最終會得到一個唯一的節點作為任務leader。 除此之外,選舉還經常會發生在leader意外宕機的情況下,新的leader要被選舉出來。
     Curator有兩種選舉recipe,你可以根據你的需求選擇合適的。

1.Leader Latch

1.LeaderLatch的簡單介紹
     首先我們看一個使用LeaderLatch類來選舉的例子。 它的常用方法如下:
    
    
    
            
  1. // 構造方法
  2. public LeaderLatch(CuratorFramework client, String latchPath)
  3. public LeaderLatch(CuratorFramework client, String latchPath, String id)
  4. public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
  5. // 查看當前LeaderLatch實例是否是leader
  6. public boolean hasLeadership()
  7. // 嘗試讓當前LeaderLatch實例稱為leader
  8. public void await() throws InterruptedException, EOFException
  9. public boolean await(long timeout, TimeUnit unit) throws InterruptedException
必須啟動LeaderLatch: leaderLatch.start(); 一旦啟動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然后隨機的選擇其中一個作為leader。
一旦不使用LeaderLatch了,必須調用close方法。如果它是leader,會釋放leadership,其它的參與者將會選舉一個leader。
2.異常處理
    LeaderLatch實例可以增加ConnectionStateListener來監聽網絡連接問題。當 SUSPENDED 或 LOST 時,leader不再認為自己還是leader.當LOST 連接重連后 RECONNECTED,LeaderLatch會刪除先前的ZNode然后重新創建一個.
    LeaderLatch用戶必須考慮導致leadershi丟失的連接問題。強烈推薦你使用ConnectionStateListener。
3.示例程序
   
   
   
           
  1. public class LeaderLatchExample
  2. {
  3. private static final int CLIENT_QTY = 10;
  4. private static final String PATH = "/examples/leader";
  5. public static void main(String[] args) throws Exception
  6. {
  7. List<CuratorFramework> clients = Lists.newArrayList();
  8. List<LeaderLatch> examples = Lists.newArrayList();
  9. try
  10. {
  11. for (int i = 0; i < CLIENT_QTY; ++i)
  12. {
  13. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  14. clients.add(client);
  15. client.start();
  16. LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);
  17. examples.add(example);
  18. example.start();
  19. }
  20. System.out.println("LeaderLatch初始化完成!");
  21. Thread.sleep(10 * 1000);// 等待Leader選舉完成
  22. LeaderLatch currentLeader = null;
  23. for (int i = 0; i < CLIENT_QTY; ++i)
  24. {
  25. LeaderLatch example = examples.get(i);
  26. if (example.hasLeadership())
  27. {
  28. currentLeader = example;
  29. }
  30. }
  31. System.out.println("當前leader:" + currentLeader.getId());
  32. currentLeader.close();
  33. examples.get(0).await(10, TimeUnit.SECONDS);
  34. System.out.println("當前leader:" + examples.get(0).getLeader());
  35. System.out.println("輸入回車退出");
  36. new BufferedReader(new InputStreamReader(System.in)).readLine();
  37. }
  38. catch (Exception e)
  39. {
  40. e.printStackTrace();
  41. }
  42. finally
  43. {
  44. for (LeaderLatch exampleClient : examples)
  45. {
  46. System.out.println("當前leader:" + exampleClient.getLeader());
  47. try
  48. {
  49. CloseableUtils.closeQuietly(exampleClient);
  50. }
  51. catch (Exception e)
  52. {
  53. System.out.println(exampleClient.getId() + " -- " + e.getMessage());
  54. }
  55. }
  56. for (CuratorFramework client : clients)
  57. {
  58. CloseableUtils.closeQuietly(client);
  59. }
  60. }
  61. System.out.println("OK!");
  62. }
  63. }
首先我們創建了10個LeaderLatch,啟動后它們中的一個會被選舉為leader。因為選舉會花費一些時間,start后並不能馬上就得到leader。
通過hasLeadership查看自己是否是leader,如果是的話返回true。
可以通過.getLeader().getId()可以得到當前的leader的ID。
只能通過close釋放當前的領導權。
await是一個阻塞方法, 嘗試獲取leader地位,但是未必能上位。
注意: LeaderLatch 類不能 close () 多次, LeaderLatch . hasLeadership () LeaderLatch . getLeader () 得到的結果不一定一致,需要通過 LeaderLatch . getLeader ().isLeader() 來判斷。
4.運行結果及其分析
    上面測試程序運行結果如下:
   
   
   
           
  1. LeaderLatch初始化完成!
  2. 當前leaderClient #1
  3. 當前leaderParticipant{id='Client #8', isLeader=true}
  4. 輸入回車退出
  5. 當前leaderParticipant{id='Client #8', isLeader=true}
  6. 當前leaderParticipant{id='Client #8', isLeader=true}
  7. Client #1 -- Already closed or has not been started
  8. 當前leaderParticipant{id='Client #8', isLeader=true}
  9. 當前leaderParticipant{id='Client #8', isLeader=true}
  10. 當前leaderParticipant{id='Client #8', isLeader=true}
  11. 當前leaderParticipant{id='Client #8', isLeader=true}
  12. 當前leaderParticipant{id='Client #8', isLeader=true}
  13. 當前leaderParticipant{id='Client #8', isLeader=true}
  14. 當前leaderParticipant{id='Client #8', isLeader=true}
  15. 當前leaderParticipant{id='Client #9', isLeader=true}
  16. OK!
使用ZooInspector工具查看Zookeeper數據如下圖:

每創建一個 LeaderLatch 實例並調用其 start () 方法就會在其Path下創建一個節點,當調用 close () 方法時就會刪除節點。

2.Leader Election

     Curator還提供了另外一種選舉方法。與Leader latch不同的是 這種方法 可以對領導權進行控制,在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。主要 涉及以下四個類:
  • LeaderSelector - 選舉Leader的角色。
  • LeaderSelectorListener - 選舉Leader時的事件監聽。
  • LeaderSelectorListenerAdapter - 選舉Leader時的事件監聽,官方提供的適配器,用於用戶擴展。
  • CancelLeadershipException - 取消Leader權異常
1.主要類介紹
    重要的是LeaderSelector類,它的構造函數為:
   
   
   
           
  1. public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
  2. public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
  3. public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener)
    類似LeaderLatch,必須start: leaderSelector.start(); 一旦啟動,當實例取得領導權時 LeaderSelectorListener 的takeLeadership()方法被調用。而takeLeadership()方法執行完畢時 領導權會自動釋放重新選舉 當你不再使用LeaderSelector實例時,應該調用它的close方法。 LeaderSelector類中也有 hasLeadership()、getLeader()方法。
2.異常處理
    LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須小心連接狀態的改變。如果實例成為leader,它應該相應SUSPENDED 或 LOST。當 SUSPENDED 狀態出現時,實例必須假定在重新連接成功之前它可能不再是leader了。如果LOST狀態出現,實例不再是leader,takeLeadership方法返回.
    注意 : 推薦處理方式是當收到SUSPENDED 或 LOST時拋出CancelLeadershipException異常. 這會導致LeaderSelector實例中斷並取消執行takeLeadership方法的異常。這非常重要,你必須考慮擴展LeaderSelectorListenerAdapter。LeaderSelectorListenerAdapter提供了推薦的處理邏輯。
3.示例程序
    首先創建一個ExampleClient類,它繼承LeaderSelectorListenerAdapter,它實現了takeLeadership方法:
   
   
   
           
  1. public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable
  2. {
  3. private final String name;
  4. private final LeaderSelector leaderSelector;
  5. private final AtomicInteger leaderCount = new AtomicInteger();
  6. public ExampleClient(CuratorFramework client, String path, String name)
  7. {
  8. this.name = name;
  9. leaderSelector = new LeaderSelector(client, path, this);
  10. leaderSelector.autoRequeue();
  11. }
  12. public void start() throws IOException
  13. {
  14. leaderSelector.start();
  15. }
  16. @Override
  17. public void close() throws IOException
  18. {
  19. leaderSelector.close();
  20. }
  21. @Override
  22. public void takeLeadership(CuratorFramework client) throws Exception
  23. {
  24. final int waitSeconds = 1;
  25. System.out.println(name + " 是當前的leader(" + leaderSelector.hasLeadership() + ") 等待" + waitSeconds + "秒...");
  26. System.out.println(name + " 之前成為leader的次數:" + leaderCount.getAndIncrement() + "次");
  27. try
  28. {
  29. Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
  30. }
  31. catch (InterruptedException e)
  32. {
  33. System.err.println(name + " 已被中斷");
  34. Thread.currentThread().interrupt();
  35. }
  36. finally
  37. {
  38. System.out.println(name + " 放棄leader\n");
  39. }
  40. }
  41. }
    你可以在 takeLeadership 方法中 進行任務的分配等業務處理,並且不要返回(一返回就會釋放Leader權),如果你想要要此實例一直是leader的話可以加一個死循環。 leaderSelector.autoRequeue();保證在此實例釋放領導權之后還可能獲得領導權。 在這里我們使用AtomicInteger來記錄此client獲得領導權的次數,每個client有平等的機會獲得領導權。
    測試代碼:
   
   
   
           
  1. public class LeaderSelectorExample
  2. {
  3. private static final int CLIENT_QTY = 10;
  4. private static final String PATH = "/examples/leader";
  5. public static void main(String[] args) throws Exception
  6. {
  7. List<CuratorFramework> clients = Lists.newArrayList();
  8. List<ExampleClient> examples = Lists.newArrayList();
  9. try
  10. {
  11. for (int i = 0; i < CLIENT_QTY; ++i)
  12. {
  13. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  14. clients.add(client);
  15. client.start();
  16. ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);
  17. examples.add(example);
  18. example.start();
  19. }
  20. System.out.println("輸入回車退出:");
  21. new BufferedReader(new InputStreamReader(System.in)).readLine();
  22. }
  23. finally
  24. {
  25. for (ExampleClient exampleClient : examples)
  26. {
  27. CloseableUtils.closeQuietly(exampleClient);
  28. }
  29. for (CuratorFramework client : clients)
  30. {
  31. CloseableUtils.closeQuietly(client);
  32. }
  33. }
  34. System.out.println("OK!");
  35. }
  36. }
4.示例運行結果及分析
    運行結果控制台:
   
   
   
           
  1. 輸入回車退出:
  2. Client #4 是當前的leader(true) 等待1秒...
  3. Client #4 之前成為leader的次數:0
  4. Client #4 放棄leader
  5. Client #5 是當前的leader(true) 等待1秒...
  6. Client #5 之前成為leader的次數:0
  7. Client #5 已被中斷
  8. Client #5 放棄leader
  9. OK!
可以看出: LeaderSelector 與LeaderLatch的區別,通過 LeaderSelectorListener 可以對領導權進行控制,在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。而LeaderLatch一根筋到死,除非調用close方法,否則它不會釋放領導權。
-------------------------------------------------------------------------------------------------------------------------------




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM