在分布式集群部署模式下,為了維護數據一致性,通常需要選舉出一個leader來進行協調,並且在leader掛掉后能從集群中選舉出一個新的leader。選舉leader的方案有很多種,對Paxos和Raft協議有過了解的同學應該對leader選舉有一些認識,一般都是按照少數服從多數的原則來實現,但是因為分布式環境中無法避免的網絡不穩定、數據不同步、時間偏差等問題,要想搞好leader選舉並不是一件特別容易的事。這篇文章將提供一個使用Consul做leader選舉的簡單方案。
原理
Consul 的leader選舉只有兩步:
1、Create Session:參與選舉的應用分別去創建Session,Session的存活狀態關聯到健康檢查。
2、Acquire KV:多個應用帶着創建好的Session去鎖定同一個KV,只能有一個應用鎖定住,鎖定成功的應用就是leader。
如上圖所示,這里假設App2用Session鎖定住了KV,其實就是KV的Session屬性設置為了Session2。
什么時候會觸發重新選舉呢?
- Session失效:Session被刪除、Session關聯的健康檢查失敗、Session TTL過期等。
- KV被刪除:這個沒什么需要特別說明的。
那應用們怎么感知這些情況呢?
應用在選舉結束后,應該保持一個到KV的阻塞查詢,這個查詢會在超時或者KV發生變化的時候返回結果,這時候應用可以根據返回結果判斷是否發起新的選舉。
示例
這里給出一個Java的例子:這是一個控制台程序,程序會創建一個Session,然后嘗試使用這個Session鎖定key為“program/leader”的Consul KV,同時也會嘗試設置KV的值為當前節點Id“007”。不管捕獲成功還是失敗,程序隨后都會啟動一個針對“program/leader”的阻塞查詢,在阻塞查詢返回時會判斷KV是否存在或者綁定的Session是否存在,如果有任何一個不存在,則發起選舉,否則繼續阻塞查詢。這個“阻塞查詢->選舉”的操作是一個無限循環操作。
package cn.bossma; import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.kv.model.GetValue; import com.ecwid.consul.v1.kv.model.PutParams; import com.ecwid.consul.v1.session.model.NewSession; import com.ecwid.consul.v1.session.model.Session; import org.apache.commons.lang3.StringUtils; /** * consul leader 選舉演示程序 * * @author: bossma.cn */ public class Main { private static ConsulClient client = new ConsulClient(); private static String sesssionId = ""; private static String nodeId = "007"; private static String electName = "program/leader"; /** * @param args */ public static void main(String[] args) { System.out.println("starting"); watch(); } /** * 監控選舉 * * @param: * @return: * @author: bossma.cn */ private static void watch() { System.out.println("start first leader election"); // 上來就先選舉一次,看看結果 ElectResponse electResponse = elect(); System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId()); long waitIndex = electResponse.modifyIndex++; int waitTime = 30; do { try { System.out.println("start leader watch query"); // 阻塞查詢 GetValue kv = getKVValue(electName, waitTime, waitIndex); // kv被刪除或者kv綁定的session不存在 if (null == kv || StringUtils.isEmpty(kv.getSession())) { System.out.println("leader missing, start election right away"); electResponse = elect(); waitIndex = electResponse.modifyIndex++; System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId()); } else { long kvModifyIndex = kv.getModifyIndex(); waitIndex = kvModifyIndex++; } } catch (Exception ex) { System.out.print("leader watch異常:" + ex.getMessage()); try { Thread.sleep(3000); } catch (Exception ex2) { System.out.printf(ex2.getMessage()); } } } while (true); } /** * 執行選舉 * * @param: * @return: * @author: bossma.cn */ private static ElectResponse elect() { ElectResponse response = new ElectResponse(); Boolean electResult = false; // 創建一個關聯到當前節點的Session if (StringUtils.isNotEmpty(sesssionId)) { Session s = getSession(sesssionId); if (null == s) { // 這里session關聯的健康檢查只綁定了consul節點的健康檢查 // 實際使用的時候建議把當前應用程序的健康檢查也綁定上,否則如果只是程序關掉了,session也不會失效 sesssionId = createSession(10); } } else { sesssionId = createSession(10); } // 獲取選舉要鎖定的Consul KV對象 GetValue kv = getKVValue(electName); if (null == kv) { kv = new GetValue(); } // 誰先捕獲到KV,誰就是leader // 注意:如果程序關閉后很快啟動,session關聯的健康檢查可能不會失敗,所以session不會失效 // 這時候可以把程序創建的sessionId保存起來,重啟后首先嘗試用上次的sessionId, electResult = acquireKV(electName, nodeId, sesssionId); // 無論參選成功與否,獲取當前的Leader kv = getKVValue(electName); response.setElectResult(electResult); response.setLeaderId(kv.getDecodedValue()); response.setModifyIndex(kv.getModifyIndex()); return response; } /** * 創建Session * * @param: lockDealy session從kv釋放后,kv再次綁定session的延遲時間 * @return: * @author: bossma.cn */ private static String createSession(int lockDelay) { NewSession session = new NewSession(); session.setLockDelay(lockDelay); return client.sessionCreate(session, QueryParams.DEFAULT).getValue(); } /** * 獲取指定的session信息 * * @param: sessionId * @return: Session對象 * @author: bossma.cn */ private static Session getSession(String sessionId) { return client.getSessionInfo(sessionId, QueryParams.DEFAULT).getValue(); } /** * 使用Session捕獲KV * * @param key * @param value * @param sessionId * @return * @author: bossma.cn */ public static Boolean acquireKV(String key, String value, String sessionId) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); return client.setKVValue(key, value, putParams).getValue(); } /** * 獲取指定key對應的值 * * @param: key * @return: * @author: bossma.cn */ private static GetValue getKVValue(String key) { return client.getKVValue(key).getValue(); } /** * block獲取指定key對應的值 * * @param: key, waitTime, waitIndex * @return: * @author: bossma.cn */ private static GetValue getKVValue(String key, int waitTime, long waitIndex) { QueryParams paras = QueryParams.Builder.builder() .setWaitTime(waitTime) .setIndex(waitIndex) .build(); return client.getKVValue(key, paras).getValue(); } /** * leader選舉結果 * * @author: bossma.cn */ private static class ElectResponse { private Boolean electResult = false; private long modifyIndex = 0; private String leaderId; public String getLeaderId() { return leaderId; } public void setLeaderId(String leaderId) { this.leaderId = leaderId; } public Boolean getElectResult() { return electResult; } public void setElectResult(Boolean electResult) { this.electResult = electResult; } public long getModifyIndex() { return modifyIndex; } public void setModifyIndex(long modifyIndex) { this.modifyIndex = modifyIndex; } } }
坑
1、用於選舉的Consul KV必須使用鎖定的session進行更新,如果通過其它方式更新,KV綁定的Session不會有影響,也就是說KV還是被原來的程序鎖定,但是卻被其它的程序修改了,這不符合leader的規則。
2、Session 關聯的健康檢查默認只有當前節點的健康檢查,如果應用程序停止,Session並不會失效,所以建議將Session 關聯的健康檢查包含應用的健康檢查;但是如果只有應用的健康檢查,服務器停止,應用的健康檢查仍可能是健康的,所以Session的健康檢查應該把應用程序和Consul 節點的健康檢查都納入進來。
3、如果程序關閉后很快啟動,session關聯的健康檢查可能不會失敗,所以session不會失效,程序啟動后如果創建一個新的Session去鎖定KV,就不能成功鎖定KV,這時候建議將SessionId持久化存儲,如果Session還存在,就還是用這個Session 去鎖定。
4、lockdelay:這不是一個坑,是一個保護機制,但需要考慮好用不用。它可以在session失效后短暫的不允許其它session來鎖定KV,這是因為此時應用程序可能還是正常的,session關聯的健康檢查誤報了,應用程序可能還在處理業務,需要一段時間來結束處理。也可以使用0值把這個機制禁止掉。
5、可能leader加載的東西比較多,leader切換比較麻煩,考慮到session關聯的健康檢查誤報的問題,希望leader選舉優先上次鎖定KV的程序,這樣可以提高效率。此時可以在選舉程序中增加一些邏輯:如果選舉的時候發現上次的leader是當前程序,則立即選舉;如果發現上次的leader不是當前程序,則等待兩個固定的時間周期再提交選舉。
整體上看,Consul提供的Leader選舉方案還是比較簡單的,無論是集群部署中的leader選舉,還是傳統主備部署,都可以適用。其中的關鍵是Session,一定要結合自己的業務考慮周全。最后歡迎加入800人Consul交流群234939415,一起探討使用Consul的各種場景和問題。