spring boot / cloud (十五) 分布式調度中心進階


spring boot / cloud (十五) 分布式調度中心進階

<spring boot / cloud (十) 使用quartz搭建調度中心>這篇文章中介紹了如何在spring boot項目中集成quartz.

今天這篇文章則會進一步跟大家討論一下設計和搭建分布式調度中心所需要關注的事情.

下面先看一下,總體的邏輯架構圖:

分布式調度-邏輯架構示意

分布式調度-邏輯架構示意

架構設計

總體思路是,將調度執行兩個概念分離開來,形成調度中心執行節點兩個模塊:

調度中心

是一個公共的平台,負責所有任務的調度,以及任務的管理,不涉及任何業務邏輯,從上圖可以看到,它主要包括如下模塊:

  • 核心調度器quartz : 調度中心的核心,按照jobDetail和trigger的設定發起作業調度,並且提供底層的管理api

  • 管理功能 : 可通過restful和web頁面的方式動態的管理作業,觸發器的CURD操作,並且實時生效,而且還可以記錄調度日志,以及可以以圖表,表格,等各種可視化的方式展現調度中心的各個維度的指標信息

  • RmsJob和RmsJobDisallowConcurrent : 基於http遠程調用(RMS)的作業和禁止並發執行的作業

  • Callback : 用於接收"執行節點"異步執行完成后的信息

執行節點

是嵌入在各個微服務中的一個執行模塊,負責接收調度中心的調度,專注於執行業務邏輯,無需關系調度細節,並且理論上來說,它主要包括如下模塊:

  • 同步執行器 : 同步執行並且返回調度中心觸發的任務

  • 異步執行器 : 異步執行調度中心觸發的任務,並且通過callback將執行結果反饋給調度中心

  • 作業鏈 : 可任意組合不同任務的執行順序和依賴關系,滿足更復雜的業務需求

  • 業務bean : 業務邏輯的載體

架構優點

這樣一來,調度中心只負責調度,執行節點只負責業務,相互通過http協議進行溝通,兩部分可以完全解耦合,增強系統整體的擴展性

並且引入了異步執行器的概念,這一樣一來,調度中心就能以非阻塞的形式觸發執行器,可以不受任務業務邏輯帶來的性能影響,進一步提高了系統的性能

然后理論上來說執行節點是不局限於任何的語言或者平台的,並且與調度中心采用的是通用的http協議,真正的可以做到跨平台

特點

集群,高可用,故障轉移

整體的解決方案是建立在spring cloud基礎上的,依賴於服務發現eureka,可使所有的服務去中心化,來實現集群和高可用

調度中心的核心依賴於quartz,而quartz是原生支持集群的,它通過將作業和觸發器的細節持久化到數據庫中,然后在通過db鎖的方式,與集群中的各個節點通訊,從而實現了去中心化

執行節點調度中心都是注冊在eureka上的,通過ribbon的客戶端負載均衡的特性,自動屏蔽壞掉的節點,自動發現新增加的節點,可使雙方的http通信都做到高可用.

如下是quartz集群配置的片段:

#Configure scheduler
org.quartz.scheduler.instanceName=clusterQuartzScheduler #實例名稱
org.quartz.scheduler.instanceId=AUTO #自動設定實例ID
org.quartz.scheduler.skipUpdateCheck=true

#Configure JobStore and Cluster
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX #使用jdbc持久化到數據中
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #sql代理,mysql
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_ #表前綴
org.quartz.jobStore.isClustered=true #開啟集群模式
org.quartz.jobStore.clusterCheckinInterval=20000
org.quartz.jobStore.misfireThreshold=60000

線程池調優

quartz的默認配置,可根據實際情況進行調整.

#Configure ThreadPool
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool #線程池類型
org.quartz.threadPool.threadCount=5 #線程池數量
org.quartz.threadPool.threadPriority=5 #優先級

這里就體現出了分離調度的業務邏輯的好處,在傳統的做法中,調度器承載着業務邏輯,必然會占用執行線程更長時間,並發能力受業務邏輯限制.

將業務邏輯分離出去后,並且采用異步任務的方式,調度器觸發某個任務后,將立即返回,這時占用執行線程的時間會大幅縮短.

所以在相同的線程池數量下,采用這種架構,是可以大幅度的提高調度中心的並發能力的.

集中化配置管理

同樣,整個解決方案也依賴於spring cloud config server.

我們在系統中抽象出了一系列的元數據用於做系統配置,這些元數據在org.itkk.udf.scheduler.meta包下,大家可以查看,這些元數據基本囊括了所有作業和觸發器的屬性,通過@ConfigurationProperties特性,可輕松的將這些元數據類轉化為配置文件.

並且設計上簡化了后續管理api的復雜度,我們某個作業或者某個觸發器的一套屬性歸納到一個CODE中,然后后續通過這個CODE就能操作所對應的作業或者觸發器.

配置片段如下:

#jobGroup
org.itkk.scheduler.properties.jobGroup.general=通用
#triggerGroup
org.itkk.scheduler.properties.triggerGroup.general=通用
#rmsJob
org.itkk.scheduler.properties.jobDetail.rmsJob.name=generalJob
org.itkk.scheduler.properties.jobDetail.rmsJob.group=general
org.itkk.scheduler.properties.jobDetail.rmsJob.className=org.itkk.udf.scheduler.job.RmsJob
org.itkk.scheduler.properties.jobDetail.rmsJob.description=通用作業
org.itkk.scheduler.properties.jobDetail.rmsJob.recovery=false
org.itkk.scheduler.properties.jobDetail.rmsJob.durability=true
org.itkk.scheduler.properties.jobDetail.rmsJob.autoInit=true
#rmsJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.name=generalJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.group=general
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.className=org.itkk.udf.scheduler.job.RmsJobDisallowConcurrent
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.description=通用作業(禁止並發)
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.recovery=false
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.durability=true
org.itkk.scheduler.properties.jobDetail.rmsJobDisallowConcurrent.autoInit=true
#simpleTrigger
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.jobCode=rmsJob
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.name=testSimpleTrigger
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.group=general
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.intervalInMilliseconds=10000
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.autoInit=true
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.description=測試簡單觸發器
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.serviceCode=SCH_CLIENT_UDF_SERVICE_A_DEMO
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.beanName=testBean
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.async=true
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param1=a
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param2=b
org.itkk.scheduler.properties.simpleTrigger.testSimpleTrigger.dataMap.param3=123

以上可以看,我們可以通過properties配置文件設定作業和觸發器的任何屬性,並且通過如:simpleTrigger這個code,就能隨意的通過管理api進行curd操作.

基於rms的JobDetail

從上面的配置可以看到,解決方案中內置了兩個默認的jobDetail,一個是rmsJob另一個是rmsJobDisallowConcurrent.

想要使用它們很簡單,為它們配置一個觸發器即可,rmsjob通過以下屬性來確定自己將要調用那個任務:

#配置simple或者corn觸發器的dataMap屬性,並且添加如下值:

#指定要調用那個rms,這里設定的是rmscode,不太清楚的話可以回看第八篇文章
省略.serviceCode=SCH_CLIENT_UDF_SERVICE_A_DEMO 
#指定要調用哪一個bean
省略.beanName=testBean 
#是否采用異步方式
省略.async=true 
#業務參數
省略.param1=a 
省略.param2=b
省略.param3=123

如下方式可以在執行節點中定義一個執行器

@Component("testBean")
public class TestSch extends AbstractExecutor {
    @Override
    public void handle(String id, Map<String, Object> jobDataMap) {
        try {
            LOGGER.info("任務執行了------id:{}, jobDataMap:{}", id, jobDataMap);
        } catch (JsonProcessingException e) {
            throw new SchException(e);
        }
    }
}

這樣就能為某一個執行器設定觸發器,從而做到調度的功能.

而rmsJob是可以並發的觸發執行器的.

禁止並發的基於rms的JobDetail

在這個解決方案中禁止並發有兩個層次

第一個層次就是默認實現的rmsJobDisallowConcurrent,大家看源碼就知道,這個類上標注了@DisallowConcurrentExecution,這個注解的含義是禁止作業並發執行.

在傳統的做法中jobdetail中包含了業務邏輯,沒有異步的遠程操作,所以說在類上標注這個注解能做到禁止並發.

但是現在有了異步任務的概念,觸發器觸發執行器后立即就返回結束了,如果這個時候,觸發器的觸發間隔小於執行器的執行時間,那么依然還是會有任務並發執行的.

這顯然是不希望發生的,既然禁止並發,那么就一定要完全的做到禁止並發,如下設定保證了這一點:

protected void disallowConcurrentExecute(RmsJobParam rmsJobParam) throws JobExecutionException {
    if (!this.hasRunning(rmsJobParam)) { //沒有正在運行的任務才能運行
        this.execute(rmsJobParam);
    } else { //跳過執行,並且記錄
        RmsJobResult result = new RmsJobResult();
        result.setId(rmsJobParam.getId());
        result.setStats(RmsJobStats.SKIP.value());
        save(rmsJobParam, result);
    }
}

在禁止並發的異步任務觸發前,會校驗當前這個任務是否正在執行,如果正在執行的話,跳過並且記錄.

異步任務,異步回調

執行節點中的任務即可同步執行也可異步執行,通過配置觸發器的async屬性來控制的,

同步執行 : 的任務適合執行時間短,執行時間穩定,並且有必要立即知道返回結果的任務

異步執行 : 高並發,高性能的執行方式,沒有特別的限制,推薦使用

如下實現片段:

//SchClientController中
public RestResponse<RmsJobResult> execute(@RequestBody RmsJobParam param) {
    //記錄來接收時間
    Date receiveTime = new Date();
    //定義返回值
    RmsJobResult result = new RmsJobResult();
    result.setClientReceiveTime(receiveTime);
    result.setId(param.getId());
    result.setClientStartExecuteTime(new Date());
    //執行(區分同步跟異步)
    if (param.getAsync()) {
        schClientHandle.asyncHandle(param, result);
        result.setStats(RmsJobStats.EXECUTING.value());
    } else {
        schClientHandle.handle(param);
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.COMPLETE.value());
    }
    //返回
    return new RestResponse<>(result);
}
//SchClientHandle中
//異步執行
@Async
public void asyncHandle(RmsJobParam param, RmsJobResult result) {
    try {
        //執行
        this.handle(param);
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.COMPLETE.value());
        //回調
        this.callback(result);
    } catch (Exception e) {
        result.setClientEndExecuteTime(new Date());
        result.setStats(RmsJobStats.ERROR.value());
        result.setErrorMsg(ExceptionUtils.getStackTrace(e));
        //回調
        this.callback(result);
        //拋出異常
        log.error("asyncHandle error:", e);
        throw new SchException(e);
    }

}
//同步執行
public void handle(RmsJobParam param) {
    //判斷bean是否存在
    if (!applicationContext.containsBean(param.getBeanName())) {
        throw new SchException(param.getBeanName() + " not definition");
    }
    //獲得bean
    AbstractExecutor bean = applicationContext.getBean(param.getBeanName(), AbstractExecutor.class);
    //執行
    bean.handle(param);
}
//異步回調(重處理)
@Retryable(maxAttempts = 3, value = Exception.class)
private void callback(RmsJobResult result) {
    log.info("try to callback");
    final String serviceCode = "SCH_CLIENT_CALLBACK_1";
    rms.call(serviceCode, result, null, new ParameterizedTypeReference<RestResponse<String>>() {
    }, null);
}
//回調失敗后的處理
@Recover
public void recover(Exception e) {
    log.error("try to callback failed:", e);
}

任務鏈

在執行器父類中提供如下方法,可在執行節點觸發其他執行器:

//調用鏈 (允許並發,異步調用)
protected String chain(boolean isConcurrent, String parentId, String serviceCode, 
                String beanName, boolean async, Map<String, String> param)

而在執行器中的使用樣例:

@Component("testBean")
public class TestSch extends AbstractExecutor {
    @Override
    public void handle(String id, Map<String, Object> jobDataMap) {
        try {
            LOGGER.info("任務執行了------id:{}, jobDataMap:{}", id, xssObjectMapper.writeValueAsString(jobDataMap)); //NOSONAR
            if (!jobDataMap.containsKey(TriggerDataMapKey.PARENT_TRIGGER_ID.value())) {
                LOGGER.info("job鏈---->"); //NOSONAR
                Map<String, String> param = new HashMap<>();
                param.put("chain1", "1");
                param.put("chain2", "2");
                this.chain(id, "SCH_CLIENT_UDF_SERVICE_A_DEMO", "testBean", param);
            }
        } catch (JsonProcessingException e) {
            throw new SchException(e);
        }
    }
}

這樣可以使得執行器更加靈活,可以隨意組合

管理api

依賴於quartz的底層管理api,我們可以抽象出一系列restFul的api,目前實現的功能如下:

作業管理 : 保存作業 , 保存作業(覆蓋) , 移除作業 , 立即觸發作業

觸發器管理 : 保存簡單觸發器 , 保存簡單觸發器(覆蓋) , 保存CRON觸發器 , 保存CRON觸發器(覆蓋) , 刪除觸發器

計划任務管理 : 清理數據

misfire設定

quartz原生的設定,表示那些錯過了觸發時間的觸發器,后續處理的規則,可能是因為 : 服務不可用 , 線程阻塞,線程池耗盡 , 等..

simple觸發器

MISFIRE_INSTRUCTION_FIRE_NOW

以當前時間為觸發頻率立即觸發執行

MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT

不觸發立即執行
等待下次觸發頻率周期時刻執行
以總次數-已執行次數作為剩余周期次數,重新計算FinalTime
調整后的FinalTime會略大於根據starttime計算的到的FinalTime值

MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT

不觸發立即執行
等待下次觸發頻率周期時刻,執行至FinalTime的剩余周期次數
保持FinalTime不變,重新計算剩余周期次數(相當於錯過的當做已執行)

MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

以當前時間為觸發頻率立即觸發執行
以總次數-已執行次數作為剩余周期次數,重新計算FinalTime
調整后的FinalTime會略大於根據starttime計算的到的FinalTime值

MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT

以當前時間為觸發頻率立即觸發執行
保持FinalTime不變,重新計算剩余周期次數(相當於錯過的當做已執行)

MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY

以錯過的第一個頻率時間立刻開始執行

MISFIRE_INSTRUCTION_SMART_POLICY(默認)

智能根據trigger屬性選擇策略:
repeatCount為0,則策略同MISFIRE_INSTRUCTION_FIRE_NOW
repeatCount為REPEAT_INDEFINITELY,則策略同MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
否則策略同MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT

cron觸發器

MISFIRE_INSTRUCTION_DO_NOTHING

是什么都不做,繼續等下一次預定時間再觸發

MISFIRE_INSTRUCTION_FIRE_ONCE_NOW

是立即觸發一次,觸發后恢復正常的頻率

MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY

以錯過的第一個頻率時間立刻開始執行

MISFIRE_INSTRUCTION_SMART_POLICY(默認)

根據創建CronTrigger時選擇的MISFIRE_INSTRUCTION_XXX更新CronTrigger的狀態。
如果misfire指令設置為MISFIRE_INSTRUCTION_SMART_POLICY,則將使用以下方案:
指令將解釋為MISFIRE_INSTRUCTION_FIRE_ONCE_NOW

大家可根據自身情況進行設定

結束

今天跟大家分享了分布式調度的設計思路和想法,由於個人時間問題,這個設計的核心部分雖然已經完成,但是比如web界面,restful api,都還沒有完成,后續有空就會把這些東西都弄上去的.

不過總體來說,把核心的思想講出來了,也歡迎大家提出意見和建議

代碼倉庫 (博客配套代碼)


想獲得最快更新,請關注公眾號

想獲得最快更新,請關注公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM