在分布式計算中,leader election是很重要的一個功能,這個選舉過程是這樣子的:指派一個進程作為組織者,將任務分發給各節點。在任務開始前,哪個節點都不知道誰是leader或者coordinator。當選舉算法開始執行后,每個節點最終會得到一個唯一的節點作為任務leader。
除此之外,選舉還經常會發生在leader意外宕機的情況下,新的leader要被選舉出來。
Curator有兩種選舉recipe,你可以根據你的需求選擇合適的。
1.Leader Latch
1.LeaderLatch的簡單介紹
首先我們看一個使用LeaderLatch類來選舉的例子。
它的常用方法如下:
// 構造方法
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
// 查看當前LeaderLatch實例是否是leader
public boolean hasLeadership()
// 嘗試讓當前LeaderLatch實例稱為leader
public void await() throws InterruptedException, EOFException
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
必須啟動LeaderLatch: leaderLatch.start();
一旦啟動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然后隨機的選擇其中一個作為leader。
一旦不使用LeaderLatch了,必須調用close方法。如果它是leader,會釋放leadership,其它的參與者將會選舉一個leader。
2.異常處理
LeaderLatch實例可以增加ConnectionStateListener來監聽網絡連接問題。當 SUSPENDED 或 LOST 時,leader不再認為自己還是leader.當LOST 連接重連后 RECONNECTED,LeaderLatch會刪除先前的ZNode然后重新創建一個.
LeaderLatch用戶必須考慮導致leadershi丟失的連接問題。強烈推薦你使用ConnectionStateListener。
3.示例程序
public class LeaderLatchExample
{
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main(String[] args) throws Exception
{
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
try
{
for (int i = 0; i < CLIENT_QTY; ++i)
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
clients.add(client);
client.start();
LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);
examples.add(example);
example.start();
}
System.out.println("LeaderLatch初始化完成!");
Thread.sleep(10 * 1000);// 等待Leader選舉完成
LeaderLatch currentLeader = null;
for (int i = 0; i < CLIENT_QTY; ++i)
{
LeaderLatch example = examples.get(i);
if (example.hasLeadership())
{
currentLeader = example;
}
}
System.out.println("當前leader:" + currentLeader.getId());
currentLeader.close();
examples.get(0).await(10, TimeUnit.SECONDS);
System.out.println("當前leader:" + examples.get(0).getLeader());
System.out.println("輸入回車退出");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
for (LeaderLatch exampleClient : examples)
{
System.out.println("當前leader:" + exampleClient.getLeader());
try
{
CloseableUtils.closeQuietly(exampleClient);
}
catch (Exception e)
{
System.out.println(exampleClient.getId() + " -- " + e.getMessage());
}
}
for (CuratorFramework client : clients)
{
CloseableUtils.closeQuietly(client);
}
}
System.out.println("OK!");
}
}
首先我們創建了10個LeaderLatch,啟動后它們中的一個會被選舉為leader。因為選舉會花費一些時間,start后並不能馬上就得到leader。
通過hasLeadership查看自己是否是leader,如果是的話返回true。
可以通過.getLeader().getId()可以得到當前的leader的ID。
只能通過close釋放當前的領導權。
await是一個阻塞方法, 嘗試獲取leader地位,但是未必能上位。
注意:
LeaderLatch
類不能
close
()
多次,
LeaderLatch
.
hasLeadership
()
與
LeaderLatch
.
getLeader
()
得到的結果不一定一致,需要通過
LeaderLatch
.
getLeader
().isLeader()
來判斷。
4.運行結果及其分析
上面測試程序運行結果如下:
LeaderLatch初始化完成!
當前leader:Client #1
當前leader:Participant{id='Client #8', isLeader=true}
輸入回車退出
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
Client #1 -- Already closed or has not been started
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #8', isLeader=true}
當前leader:Participant{id='Client #9', isLeader=true}
OK!
使用ZooInspector工具查看Zookeeper數據如下圖:

每創建一個
LeaderLatch
實例並調用其
start
()
方法就會在其Path下創建一個節點,當調用
close
()
方法時就會刪除節點。
2.Leader Election
Curator還提供了另外一種選舉方法。與Leader latch不同的是
這種方法
可以對領導權進行控制,在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。主要
涉及以下四個類:
- LeaderSelector - 選舉Leader的角色。
- LeaderSelectorListener - 選舉Leader時的事件監聽。
- LeaderSelectorListenerAdapter - 選舉Leader時的事件監聽,官方提供的適配器,用於用戶擴展。
- CancelLeadershipException - 取消Leader權異常
1.主要類介紹
重要的是LeaderSelector類,它的構造函數為:
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener)
類似LeaderLatch,必須start: leaderSelector.start();
一旦啟動,當實例取得領導權時
LeaderSelectorListener
的takeLeadership()方法被調用。而takeLeadership()方法執行完畢時
領導權會自動釋放重新選舉
。
當你不再使用LeaderSelector實例時,應該調用它的close方法。
LeaderSelector類中也有
hasLeadership()、getLeader()方法。
2.異常處理
LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須小心連接狀態的改變。如果實例成為leader,它應該相應SUSPENDED 或 LOST。當 SUSPENDED 狀態出現時,實例必須假定在重新連接成功之前它可能不再是leader了。如果LOST狀態出現,實例不再是leader,takeLeadership方法返回.
注意
: 推薦處理方式是當收到SUSPENDED 或 LOST時拋出CancelLeadershipException異常. 這會導致LeaderSelector實例中斷並取消執行takeLeadership方法的異常。這非常重要,你必須考慮擴展LeaderSelectorListenerAdapter。LeaderSelectorListenerAdapter提供了推薦的處理邏輯。
3.示例程序
首先創建一個ExampleClient類,它繼承LeaderSelectorListenerAdapter,它實現了takeLeadership方法:
public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable
{
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public ExampleClient(CuratorFramework client, String path, String name)
{
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException
{
leaderSelector.start();
}
@Override
public void close() throws IOException
{
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
final int waitSeconds = 1;
System.out.println(name + " 是當前的leader(" + leaderSelector.hasLeadership() + ") 等待" + waitSeconds + "秒...");
System.out.println(name + " 之前成為leader的次數:" + leaderCount.getAndIncrement() + "次");
try
{
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
}
catch (InterruptedException e)
{
System.err.println(name + " 已被中斷");
Thread.currentThread().interrupt();
}
finally
{
System.out.println(name + " 放棄leader\n");
}
}
}
你可以在
takeLeadership
方法中
進行任務的分配等業務處理,並且不要返回(一返回就會釋放Leader權),如果你想要要此實例一直是leader的話可以加一個死循環。
leaderSelector.autoRequeue();保證在此實例釋放領導權之后還可能獲得領導權。
在這里我們使用AtomicInteger來記錄此client獲得領導權的次數,每個client有平等的機會獲得領導權。
測試代碼:
public class LeaderSelectorExample
{
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main(String[] args) throws Exception
{
List<CuratorFramework> clients = Lists.newArrayList();
List<ExampleClient> examples = Lists.newArrayList();
try
{
for (int i = 0; i < CLIENT_QTY; ++i)
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
clients.add(client);
client.start();
ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);
examples.add(example);
example.start();
}
System.out.println("輸入回車退出:");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
finally
{
for (ExampleClient exampleClient : examples)
{
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients)
{
CloseableUtils.closeQuietly(client);
}
}
System.out.println("OK!");
}
}
4.示例運行結果及分析
運行結果控制台:
輸入回車退出:
Client #4 是當前的leader(true) 等待1秒...
Client #4 之前成為leader的次數:0次
Client #4 放棄leader
Client #5 是當前的leader(true) 等待1秒...
Client #5 之前成為leader的次數:0次
Client #5 已被中斷
Client #5 放棄leader
OK!
可以看出:
LeaderSelector
與LeaderLatch的區別,通過
LeaderSelectorListener
可以對領導權進行控制,在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。而LeaderLatch一根筋到死,除非調用close方法,否則它不會釋放領導權。