使用ZooKeeper協調多台Web Server的定時任務處理(方案1)


背景說明: 有一套Web服務程序, 為了保證HA, 需要在多台服務器上部署, 該服務程序有一些定時任務要執行, 現在要保證的是, 同一定時任務不會在多台機器上被同時執行.

方案1 --- 任務級的主備方案:
每個定時任務啟動后, 都發起任務級的主節點的競爭, 勝出者執行具體任務.

方案2 --- 服務器級的主備方案, 需要兩個組件:
組件一: 一個后台線程用來競爭Leader
具體細節為: Web服務程序中開一個后台線程來競爭作為服務級別的Leader, 優勝者將自己的 ${服務器名}+${端口} 記錄到zk 中.
組件二:每個定時作業, 在開始的時候, 先對比本機是否是服務主節點, 如果是主節點即執行具體任務, 否則跳過.


方案1的說明:
1. 每個定時任務都需要競爭Leader, 任務的執行效率較差.
2. 如果兩個服務器的時間不同步, 定時任務耗時又很短, 在這種情況下容易double run, 需要故意延長任務執行時間以避免double run.
3. 有的定時任務在其中一台上執行, 另一些在另一台上執行, 查日志不是很方便.
4. 因為是在每個定時任務啟動的時候競爭leader, 不必關心任務執行過程中, 由於zk客戶端長連接斷開需要進行leader切換的問題.
5. 本方案采用了Curator 的 LeaderLatch 選舉機制.


方案2的說明:
1. 該方案能很好地從幾台服務器中選出一個Master機器, 不僅僅可以用於定時任務場景, 還可以用在其他場景下.
2. 該方案能實現Master節點的自動 failover, 經我測試 failover 過程稍長, 接近1分鍾.
5. 本方案采用了Curator 的 LeaderSelector 選舉機制.


==============================
LeaderLatch 和 LeaderSelector 兩種選舉實現
==============================
LeaderLatch 的方式:
是以一種搶占的方式來決定選主. 比較簡單粗暴, 邏輯相對簡單. 類似非公平鎖的搶占, 所以, 多節點是一個隨機產生主節點的過程, 誰搶到就算誰的.

LeaderSelector 方式:
內部通過一個分布式鎖來實現選主, 並且選主結果是公平的, zk會按照各節點請求的次序成為主節點.

LeaderLatch 和 LeaderSelector 本身也提供 Master 節點的自動failover, 經我測試 failover 過程都稍長, 有時會接近1分鍾.

 

下文先講解 LeaderLatch 相關知識, 以及用 LeaderLatch 實現方案1 的過程.


==============================
Curator 中 LeaderLatch 相關函數
==============================
最簡單的構造子
public LeaderLatch(CuratorFramework client, String latchPath)

leaderLatch.start()
start()讓zk 客戶端立即參與選舉, zk server最終會確定某個客戶端成為leader.

leaderLatch.hasLeadership()
檢查是否是Leader, 返回值為boolean, 該函數調用會立即返回.

leaderLatch.await()
阻塞調用, 直到本客戶端成為Leader才返回.

leaderLatch.await(long timeout, TimeUnit)
阻塞調用, 並設定一個等待時間.

leaderLatch.close()
對於參與者是Leader, 只有調用該方法, 當前參與者才能失去Leader資格, 其他參與者才能獲取Leader資格.
對於其他參與者, 調用該方法將主動退出選舉.

leaderLatch.addListener()
增加一個Listener監聽器, 當參與者成為Leader或失去Leader資格后, 自動觸發該監聽器.

client.getConnectionStateListenable().addListener()
為zk 客戶端增加一個監聽器, 用來監聽連接狀態, 共有三種狀態: RECONNECT/LOST/SUSPEND
當連接狀態為 ConnectionState.LOST 時, 寫代碼強制客戶端重連, 以便該客戶端能繼續參與Leader選舉.
當連接狀態為 ConnectionState.SUSPEND 時, 我們一般不用處理, 輸出log即可.

=============================
環境准備
=============================
在 VM (192.168.1.11) 上啟動一個 zookeeper 容器
docker run -d --name myzookeeper --net host zookeeper:latest

在Windows開發機上, 使用 zkCli.cmd 應該能連上虛機中的 zk server.
zkCli.cmd -server 192.168.1.11:2181

 

=============================
SpringBoot 服務程序
=============================
增加下面三個依賴項, 引入 actuator 僅僅是為了不寫任何代碼就能展現一個web UI.

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <!--org.apache.curator 依賴 slf4j -->
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.7</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
 

 

完整Java 代碼

package com.example.demo;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatch.State;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/*
 * 主程序類
 * */
@EnableScheduling
@SpringBootApplication
public class ZkServiceApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ZkServiceApplication.class, args);
    }
}

/*
 * 常量工具類
 */
class ZkTaskConst {
    public static final String SERVICE_NAME = "ServiceA";
    public static final String SERVICE_SERVER = "Server1:8080";
    public static final String ZK_URL = "localhost:2181";

    // 為了確保能選出一個leader, 需要等待一會兒,
    public static final int WAIT_SECONDS_ENSURE_BE_LEADER = 20;

    // Task運行完畢后, 再Hold leader一會兒, 以防止多個服務器時間不准導致作業double run
    public static final int SLEEP_SECONDS_AFTER_TASK = 30;

    public static String getZkLatchPath(String taskName) {
        return String.format("/%s/%s", SERVICE_NAME, taskName);
    }
}

/*
 * Leader 選舉Listener
 */
class ZkTaskLeaderLatchListener implements LeaderLatchListener {
    private static final Logger log = LoggerFactory.getLogger(ZkTaskLeaderLatchListener.class);

    @Override
    public void isLeader() {
        log.info(String.format("The server (%s) become the leader", ZkTaskConst.SERVICE_SERVER));
    }

    @Override
    public void notLeader() {
        log.debug(String.format("The server (%s) has not been the leader", ZkTaskConst.SERVICE_SERVER));
    }
}

/*
 * Zk Connection 監聽器
 * 如果 zk client 長連接斷開后, 需要重連以保證該客戶端仍能參與 Leader 選舉.
 * 對於定時任務級的Leader選舉, 這個監聽器並不重要.
 * 對於服務器級別的Leader選舉, 這個監聽器很重要.
 */
class ZkConnectionStateListener implements ConnectionStateListener {
    private static final Logger log = LoggerFactory.getLogger(ZkConnectionStateListener.class);

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        log.debug("Zk connection change to " + newState);
        if (ConnectionState.CONNECTED != newState) {
            while (true) {
                try {
                    log.error("Disconnected to the Zk server. Try to reconnect Zk server");
                    client.getZookeeperClient().blockUntilConnectedOrTimedOut();
                    log.info("Succeed to reconnect Zk server");
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                }
            }
        }
    }
}

/*
 * 定時任務控制器類
 */
class ZkTaskController {
    private static final Logger log = LoggerFactory.getLogger(ZkTaskController.class);
    private CuratorFramework client;
    private LeaderLatch leaderLatch;
    private String taskName;
    public boolean isLeader = false;

    public ZkTaskController(String taskName) {
        this.taskName = taskName;
    }

    private void start() throws Exception {
        client = getClient(false);
        client.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
        leaderLatch = new LeaderLatch(client, ZkTaskConst.getZkLatchPath(taskName));
        leaderLatch.addListener(new ZkTaskLeaderLatchListener());
        client.start();
        if (leaderLatch.getState() != State.STARTED) {
            leaderLatch.start();
        }
    }

    private void awaitForLeader(long timeout, TimeUnit unit) {
        try {
            this.leaderLatch.await(timeout, unit);
            isLeader = leaderLatch.hasLeadership();
        } catch (InterruptedException e) {
            // log.error(e.getMessage(), e);
        }
    }

    private void stop(boolean closeLeaderLatch) {
        if (closeLeaderLatch) {
            CloseableUtils.closeQuietly(leaderLatch);
        }
        client.close();
    }

    private static CuratorFramework getClient(boolean autoStart) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(ZkTaskConst.ZK_URL, retryPolicy);
        if (client.getState() != CuratorFrameworkState.STARTED && autoStart) {
            client.start();
        }
        return client;
    }

    public <T> void runTask(Runnable action) {
        ZkTaskController zkTaskController = new ZkTaskController(taskName);
        try {
            zkTaskController.start();
            zkTaskController.awaitForLeader(ZkTaskConst.WAIT_SECONDS_ENSURE_BE_LEADER, TimeUnit.SECONDS);
            if (zkTaskController.isLeader) {
                log.info(String.format("The task %s will run on this task's leader server", taskName));
                action.run();
            } else {
                log.info(String.format("The task %s will not this task's non-leader server", taskName));
            }
            // 再Hold leader一會兒, 以防止多個服務器時間不准導致作業double run
            Thread.sleep(1000 * ZkTaskConst.SLEEP_SECONDS_AFTER_TASK);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            zkTaskController.stop(true);
        }
    }
}

/*
 * 定時任務類
 */
@Component
class MyTasks {

    /**
     * 一個定時任務 reportCurrentTimeTask 方法 (每分鍾運行)
     */
    @Scheduled(cron = "0 * * * * *")
    public void reportCurrentTimeTask() {
        ZkTaskController zkTaskController = new ZkTaskController("reportCurrentTime");
        zkTaskController.runTask(new ReportCurrentTimeTaskInternal());
    }

    /**
     * 定時任務 reportCurrentTimeTask 真正執行的內容
     */
    class ReportCurrentTimeTaskInternal implements Runnable {
        private final Logger log = LoggerFactory.getLogger(ReportCurrentTimeTaskInternal.class);
        private final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

        @Override
        public void run() {
            log.info(String.format("The server (%s) is now %s", ZkTaskConst.SERVICE_SERVER,
                    dateFormat.format(new Date())));
        }
    }

}

 


======================
參考
======================
https://www.cnblogs.com/leesf456/p/6032716.html
https://www.jianshu.com/p/70151fc0ef5d
https://www.codelast.com/%e5%8e%9f%e5%88%9b-zookeeper%e6%b3%a8%e5%86%8c%e8%8a%82%e7%82%b9%e7%9a%84%e6%8e%89%e7%ba%bf%e8%87%aa%e5%8a%a8%e9%87%8d%e6%96%b0%e6%b3%a8%e5%86%8c%e5%8f%8a%e6%b5%8b%e8%af%95%e6%96%b9%e6%b3%95/
http://www.cnblogs.com/francisYoung/p/5464789.html
https://www.cnblogs.com/LiZhiW/p/4930486.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM