使用Consul做leader選舉的方案


在分布式集群部署模式下,為了維護數據一致性,通常需要選舉出一個leader來進行協調,並且在leader掛掉后能從集群中選舉出一個新的leader。選舉leader的方案有很多種,對Paxos和Raft協議有過了解的同學應該對leader選舉有一些認識,一般都是按照少數服從多數的原則來實現,但是因為分布式環境中無法避免的網絡不穩定、數據不同步、時間偏差等問題,要想搞好leader選舉並不是一件特別容易的事。這篇文章將提供一個使用Consul做leader選舉的簡單方案。

原理

Consul 的leader選舉只有兩步:

1、Create Session:參與選舉的應用分別去創建Session,Session的存活狀態關聯到健康檢查。

2、Acquire KV:多個應用帶着創建好的Session去鎖定同一個KV,只能有一個應用鎖定住,鎖定成功的應用就是leader。

如上圖所示,這里假設App2用Session鎖定住了KV,其實就是KV的Session屬性設置為了Session2。

什么時候會觸發重新選舉呢?

  • Session失效:Session被刪除、Session關聯的健康檢查失敗、Session TTL過期等。
  • KV被刪除:這個沒什么需要特別說明的。

那應用們怎么感知這些情況呢?

應用在選舉結束后,應該保持一個到KV的阻塞查詢,這個查詢會在超時或者KV發生變化的時候返回結果,這時候應用可以根據返回結果判斷是否發起新的選舉。

示例

這里給出一個Java的例子:這是一個控制台程序,程序會創建一個Session,然后嘗試使用這個Session鎖定key為“program/leader”的Consul KV,同時也會嘗試設置KV的值為當前節點Id“007”。不管捕獲成功還是失敗,程序隨后都會啟動一個針對“program/leader”的阻塞查詢,在阻塞查詢返回時會判斷KV是否存在或者綁定的Session是否存在,如果有任何一個不存在,則發起選舉,否則繼續阻塞查詢。這個“阻塞查詢->選舉”的操作是一個無限循環操作。

package cn.bossma;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import org.apache.commons.lang3.StringUtils;

/**
 * consul leader 選舉演示程序
 *
 * @author: bossma.cn
 */
public class Main {

    private static ConsulClient client = new ConsulClient();
    private static String sesssionId = "";
    private static String nodeId = "007";
    private static String electName = "program/leader";

    /**
     * @param args
     */
    public static void main(String[] args) {
        System.out.println("starting");
        watch();
    }

    /**
     * 監控選舉
     *
     * @param:
     * @return:
     * @author: bossma.cn
     */
    private static void watch() {

        System.out.println("start first leader election");

        // 上來就先選舉一次,看看結果
        ElectResponse electResponse = elect();
        System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId());

        long waitIndex = electResponse.modifyIndex++;
        int waitTime = 30;

        do {
            try {
                System.out.println("start leader watch query");

                // 阻塞查詢
                GetValue kv = getKVValue(electName, waitTime, waitIndex);

                // kv被刪除或者kv綁定的session不存在
                if (null == kv || StringUtils.isEmpty(kv.getSession())) {
                    System.out.println("leader missing, start election right away");
                    electResponse = elect();
                    waitIndex = electResponse.modifyIndex++;
                    System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId());
                } else {
                    long kvModifyIndex = kv.getModifyIndex();
                    waitIndex = kvModifyIndex++;
                }
            } catch (Exception ex) {
                System.out.print("leader watch異常:" + ex.getMessage());

                try {
                    Thread.sleep(3000);
                } catch (Exception ex2) {
                    System.out.printf(ex2.getMessage());
                }
            }
        }
        while (true);
    }

    /**
     * 執行選舉
     *
     * @param:
     * @return:
     * @author: bossma.cn
     */
    private static ElectResponse elect() {

        ElectResponse response = new ElectResponse();

        Boolean electResult = false;

        // 創建一個關聯到當前節點的Session
        if (StringUtils.isNotEmpty(sesssionId)) {
            Session s = getSession(sesssionId);
            if (null == s) {
                // 這里session關聯的健康檢查只綁定了consul節點的健康檢查
                // 實際使用的時候建議把當前應用程序的健康檢查也綁定上,否則如果只是程序關掉了,session也不會失效
                sesssionId = createSession(10);
            }
        } else {
            sesssionId = createSession(10);
        }

        // 獲取選舉要鎖定的Consul KV對象
        GetValue kv = getKVValue(electName);
        if (null == kv) {
            kv = new GetValue();
        }

        // 誰先捕獲到KV,誰就是leader
        // 注意:如果程序關閉后很快啟動,session關聯的健康檢查可能不會失敗,所以session不會失效
        // 這時候可以把程序創建的sessionId保存起來,重啟后首先嘗試用上次的sessionId,
        electResult = acquireKV(electName, nodeId, sesssionId);

        // 無論參選成功與否,獲取當前的Leader
        kv = getKVValue(electName);
        response.setElectResult(electResult);
        response.setLeaderId(kv.getDecodedValue());
        response.setModifyIndex(kv.getModifyIndex());
        return response;
    }

    /**
     * 創建Session
     *
     * @param: lockDealy session從kv釋放后,kv再次綁定session的延遲時間
     * @return:
     * @author: bossma.cn
     */
    private static String createSession(int lockDelay) {
        NewSession session = new NewSession();
        session.setLockDelay(lockDelay);
        return client.sessionCreate(session, QueryParams.DEFAULT).getValue();
    }

    /**
     * 獲取指定的session信息
     *
     * @param: sessionId
     * @return: Session對象
     * @author: bossma.cn
     */
    private static Session getSession(String sessionId) {
        return client.getSessionInfo(sessionId, QueryParams.DEFAULT).getValue();
    }


    /**
     * 使用Session捕獲KV
     *
     * @param key
     * @param value
     * @param sessionId
     * @return
     * @author: bossma.cn
     */
    public static Boolean acquireKV(String key, String value, String sessionId) {
        PutParams putParams = new PutParams();
        putParams.setAcquireSession(sessionId);

        return client.setKVValue(key, value, putParams).getValue();
    }

    /**
     * 獲取指定key對應的值
     *
     * @param: key
     * @return:
     * @author: bossma.cn
     */
    private static GetValue getKVValue(String key) {
        return client.getKVValue(key).getValue();
    }

    /**
     * block獲取指定key對應的值
     *
     * @param: key, waitTime, waitIndex
     * @return:
     * @author: bossma.cn
     */
    private static GetValue getKVValue(String key, int waitTime, long waitIndex) {
        QueryParams paras = QueryParams.Builder.builder()
                .setWaitTime(waitTime)
                .setIndex(waitIndex)
                .build();
        return client.getKVValue(key, paras).getValue();
    }

    /**
     * leader選舉結果
     *
     * @author: bossma.cn
     */
    private static class ElectResponse {

        private Boolean electResult = false;
        private long modifyIndex = 0;
        private String leaderId;

        public String getLeaderId() {
            return leaderId;
        }

        public void setLeaderId(String leaderId) {
            this.leaderId = leaderId;
        }

        public Boolean getElectResult() {
            return electResult;
        }

        public void setElectResult(Boolean electResult) {
            this.electResult = electResult;
        }

        public long getModifyIndex() {
            return modifyIndex;
        }

        public void setModifyIndex(long modifyIndex) {
            this.modifyIndex = modifyIndex;
        }
    }
}

1、用於選舉的Consul KV必須使用鎖定的session進行更新,如果通過其它方式更新,KV綁定的Session不會有影響,也就是說KV還是被原來的程序鎖定,但是卻被其它的程序修改了,這不符合leader的規則。

2、Session 關聯的健康檢查默認只有當前節點的健康檢查,如果應用程序停止,Session並不會失效,所以建議將Session 關聯的健康檢查包含應用的健康檢查;但是如果只有應用的健康檢查,服務器停止,應用的健康檢查仍可能是健康的,所以Session的健康檢查應該把應用程序和Consul 節點的健康檢查都納入進來。

3、如果程序關閉后很快啟動,session關聯的健康檢查可能不會失敗,所以session不會失效,程序啟動后如果創建一個新的Session去鎖定KV,就不能成功鎖定KV,這時候建議將SessionId持久化存儲,如果Session還存在,就還是用這個Session 去鎖定。

4、lockdelay:這不是一個坑,是一個保護機制,但需要考慮好用不用。它可以在session失效后短暫的不允許其它session來鎖定KV,這是因為此時應用程序可能還是正常的,session關聯的健康檢查誤報了,應用程序可能還在處理業務,需要一段時間來結束處理。也可以使用0值把這個機制禁止掉。

5、可能leader加載的東西比較多,leader切換比較麻煩,考慮到session關聯的健康檢查誤報的問題,希望leader選舉優先上次鎖定KV的程序,這樣可以提高效率。此時可以在選舉程序中增加一些邏輯:如果選舉的時候發現上次的leader是當前程序,則立即選舉;如果發現上次的leader不是當前程序,則等待兩個固定的時間周期再提交選舉。

整體上看,Consul提供的Leader選舉方案還是比較簡單的,無論是集群部署中的leader選舉,還是傳統主備部署,都可以適用。其中的關鍵是Session,一定要結合自己的業務考慮周全。最后歡迎加入800人Consul交流群234939415,一起探討使用Consul的各種場景和問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM