lts分布式任務文檔


LTS分布式任務調度文檔

LTS介紹

LTS(light-task-scheduler)主要用於解決分布式任務調度問題,支持實時任務,定時任務,Cron任務,Repeat任務。有較好的伸縮性,擴展性,健壯穩定性而被多家公司使用,同時也希望開源愛好者一起貢獻。

主要功能

    1. 支持分布式,解決多點故障,支持動態擴容,容錯重試等
    1. Spring擴展支持,SpringBoot支持,Spring Quartz Cron任務的無縫接入支持
    1. 節點監控支持,任務執行監控支持,JVM監控支持
    1. 后台運維操作支持, 可以動態提交,更改,停止 任務

項目地址

LTS技術架構

LTS 着力於解決分布式任務調度問題,將任務的提交者和執行者解耦,解決任務執行的單點故障,支持動態擴容,出錯重試等機制。代碼程序設計上,參考了優秀開源項目Dubbo,Hadoop的部分思想。

LTS目前支持四種任務

  • 實時任務:提交了之后立即就要執行的任務。
  • 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
  • Cron任務:CronExpression,和quartz類似(但是不是使用quartz實現的)譬如 0 0/1 * ?
  • Repeat任務:譬如每隔5分鍾執行一次,重復50次就停止。

架構設計上,LTS框架中包含以下五種類型的節點

  • JobClient :主要負責提交任務, 並接收任務執行反饋結果。
  • JobTracker :負責任務調度,接收並分配任務。
  • TaskTracker :負責執行任務,執行完反饋給JobTracker。
  • LTS-Monitor :主要負責收集各個節點的監控信息,包括任務監控信息,節點JVM監控信息
  • LTS-Admin :管理后台)主要負責節點管理,任務隊列管理,監控管理等。

架構圖

  • Registry: 注冊中心,LTS提供多種實現,目前支持zookeeper(推薦)和redis, 主要用於LTS的節點信息暴露和master節點選舉。
  • FailStore: 失敗存儲,主要用於在部分場景遠程RPC調用失敗的情況,采取現存儲本地KV文件系統,待遠程通信恢復的時候再進行數據補償。目前FailStore場景,主要有RetryJobClient提交**任務失敗的時候,存儲FailStore;TaskTracker返回任務執行結果給JobTracker的失敗 時候,FailStore;TaskTracker提交BizLogger的失敗的時候,存儲FailStore. 目前FailStore有四種實現:leveldb,rocksdb,berkeleydb,mapdb(當然用戶也可以實現擴展接口實現自己的FailStore)
  • QueueManager: 任務隊列,目前提供mysql(推薦)和mongodb兩種實現(同樣的用戶可以自己擴容展示其他的,譬如oracle等),主要存儲任務數據和任務執行日志等。
  • RPC: 遠程RPC通信框架,目前也支持多種實現,LTS自帶有netty和mina,用戶可以自行選擇,或者自己SPI擴展實現其他的。
  • NodeGroup: 節點組,同一個節點組中的任何節點都是對等的,等效的,對外提供相同的服務。譬如TaskTracker中有10個nodeGroup都是send_msg的節點組,專門執行發送短信的任務。每個節點組中都有一個master節點,這個master節點是由LTS動態選出來的,當一個master節點掛掉之后,LTS會立馬選出另外一個master節點,框架提供API監聽接口給用戶。
  • ClusterName: LTS集群,就如上圖所示,整個圖就是一個集群,包含LTS的五種節點。

節點圖

    1. 一個節點組等同於一個集群,同一個節點組中的各個節點是對等的,外界無論連接節點組中的任務一個節點都是可以的。
    1. 每個節點組中都有一個master節點,采用zookeeper進行master選舉(master宕機,會自動選舉出新的master節點),框架會提供接口API來監聽master節點的變化,用戶可以自己使用master節點做自己想做的事情。
    1. JobClient和TaskTracker都可以存在多個節點組。譬如 JobClient 可以存在多個節點組。 譬如:JobClient 節點組為 ‘lts_WEB’ 中的一個節點提交提交一個 只有節點組為’lts_TRADE’的 TaskTracker 才能執行的任務。
    1. (每個集群中)JobTacker只有一個節點組。
    1. 多個JobClient節點組和多個TaskTracker節點組再加上一個JobTacker節點組, 組成一個大的集群。

工作流程

    1. JobClient 提交一個 任務 給 JobTracker, 這里我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先存儲到本地leveldb(可以使用NFS來達到同個節點組共享leveldb文件的目的,多線程訪問,做了文件鎖處理),返回給客戶端提交成功的信息,待JobTracker可用的時候,再將任務提交。
    1. JobTracker 收到JobClient提交來的任務,先生成一個唯一的JobID。然后將任務儲存在Mongo集群中。JobTracker 發現有(任務執行的)可用的TaskTracker節點(組) 之后,將優先級最大,最先提交的任務分發給TaskTracker。這里JobTracker會優先分配給比較空閑的TaskTracker節點,達到負載均衡。
    1. TaskTracker 收到JobTracker分發來的任務之后,執行。執行完畢之后,再反饋任務執行結果給JobTracker(成功or 失敗[失敗有失敗錯誤信息]),如果發現JobTacker不可用,那么存儲本地leveldb,等待TaskTracker可用的時候再反饋。反饋結果的同時,詢問JobTacker有沒有新的任務要執行。
    1. JobTacker收到TaskTracker節點的任務結果信息,生成並插入(mongo)任務執行日志。根據任務信息決定要不要反饋給客戶端。不需要反饋的直接刪除, 需要反饋的(同樣JobClient不可用存儲文件,等待可用重發)。
    1. JobClient 收到任務執行結果,進行自己想要的邏輯處理。

特性

  • Spring/Spring Boot支持

LTS可以完全不用Spring框架,但是考慮到很用用戶項目中都是用了Spring框架,所以LTS也提供了對Spring的支持,包括Xml和注解,引入lts-spring.jar即可。

  • 業務日志記錄器

在TaskTracker端提供了業務日志記錄器,供應用程序使用,通過這個業務日志器,可以將業務日志提交到JobTracker,這些業務日志可以通過任務ID串聯起來,可以在LTS-Admin中實時查看任務的執行進度。

  • SPI擴展支持

SPI擴展可以達到零侵入,只需要實現相應的接口,並實現即可被LTS使用,目前開放出來的擴展接口有 對任務隊列的擴展,用戶可以不選擇使用mysql或者mongo作為隊列存儲,也可以自己實現。 對業務日志記錄器的擴展,目前主要支持console,mysql,mongo,用戶也可以通過擴展選擇往其他地方輸送日志。

  • 節點監控

可以對JobTracker,TaskTracker節點進行資源監控,任務監控等,可以實時的在LTS-Admin管理后台查看,進而進行合理的資源調配。

  • 多樣化任務執行結果支持

LTS框架提供四種執行結果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,並對每種結果采取相應的處理機制,譬如重試。 EXECUTE_SUCCESS: 執行成功,這種情況,直接反饋客戶端(如果任務被設置了要反饋給客戶端)。 EXECUTE_FAILED:執行失敗,這種情況,直接反饋給客戶端,不進行重試。 EXECUTE_LATER:稍后執行(需要重試),這種情況,不反饋客戶端,重試策略采用30s的策略,默認最大重試次數為10次,用戶可以通過參數設置修改這些參數。 EXECUTE_EXCEPTION:執行異常, 這中情況也會重試(重試策略,同上)

  • FailStore容錯

采用FailStore機制來進行節點容錯,Fail And Store,不會因為遠程通信的不穩定性而影響當前應用的運行。

  • 負載均衡

    JobClient 和 TaskTracker會隨機連接JobTracker節點組中的一個節點,實現JobTracker負載均衡。當連接上后,將一直保持連接這個節點,保持連接通道,知道這個節點不可用,減少每次都重新連接一個節點帶來的性能開銷。 JobTracker 分發任務時,是優先分配給最空閑的一個TaskTracker節點,實現TaskTracker節點的負載均衡。

  • 健壯性(故障轉移)

    當節點組中的一個節點當機之后,自動轉到其他節點工作。當整個節點組當機之后,將會采用存儲文件的方式,待節點組可用的時候進行重發。 當執行任務的TaskTracker節點當機之后,JobTracker 會將這個TaskTracker上的未完成的任務(死任務),重新分配給節點組中其他節點執行。

  • 動態擴容

因為LTS各個節點都是無狀態的,所以支持動態增加刪除節點,達到負載均衡的目的

環境准備

  • 1. Java JDK

    因為LTS是使用Java語言編寫的,所以必須要有個Java編譯運行環境,目前LTS支持JDK1.6及以上版本。

  • 2. Maven

LTS項目是基於Maven做項目依賴管理的,所以用戶機器上需要配置Maven環境

  • 3. Zookeeper/Redis

因LTS目前支持Zookeeper和Redis作為注冊中心,主要用於節點信息暴露、監聽、master節點選舉。用於選擇其一即可,建議zookeeper。

  • 4. Mysql/Mongodb

LTS目前支持Mysql和mongodb作為任務隊列的存儲引擎。用戶同樣的選擇其中一個即可。

部署

部署建議

  1. Admin后台: 建議Admin后台單獨部署,默認會嵌入一個Monitor
  2. Monitor:默認在Admin后台進程中有一個,如果一個不夠,也可以單獨啟動多個
  3. JobTracker: 建議單獨部署
  4. JobClient:,這個是提交任務的工程,一般是和業務相關的,所以會放在業 務工程中, 當然也要看業務場景
  5. TaskTracker,這個因為是跑任務的,具體看業務場景, 一般情況下也可以是獨立部署

1. JobTracker和LTS-Admin部署

提供(cmd)windows和(shell)linux兩種版本腳本來進行編譯和部署:

  1. 運行根目錄下的sh build.sh或build.cmd腳本,會在dist目錄下生成lts-{version}-bin文件夾
  1. 下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啟動腳本。jobtracker 中是 JobTracker的配置文件和需要使用到的jar包,lts-admin是LTS-Admin相關的war包和配置文件。 lts-{version}-bin的文件結構
-- lts-${version}-bin
    |-- bin
    |   |-- jobtracker.cmd
    |   |-- jobtracker.sh
    |   |-- lts-admin.cmd
    |   |-- lts-admin.sh
    |   |-- lts-monitor.cmd
    |   |-- lts-monitor.sh
    |   |-- tasktracker.sh
    |-- conf
    |   |-- log4j.properties
    |   |-- lts-admin.cfg
    |   |-- lts-monitor.cfg
    |   |-- readme.txt
    |   |-- tasktracker.cfg
    |   |-- zoo
    |       |-- jobtracker.cfg
    |       |-- log4j.properties
    |       |-- lts-monitor.cfg
    |-- lib
    |   |-- *.jar
    |-- war
        |-- jetty
        |   |-- lib
        |       |-- *.jar
        |-- lts-admin.war

 

JobTracker啟動

如果你想啟動一個節點,直接修改下conf/zoo下的配置文件,然后運行 sh jobtracker.sh zoo start即可,如果你想啟動兩個JobTracker節點,那么你需要拷貝一份zoo,譬如命名為zoo2,修改下zoo2下的配置文件,然后運行sh jobtracker.sh zoo2 start即可。logs文件夾下生成jobtracker-zoo.out日志。

LTS-Admin啟動

修改conf/lts-monitor.cfg和conf/lts-admin.cfg下的配置,然后運行bin下的sh lts-admin.sh或lts-admin.cmd腳本即可。logs文件夾下會生成lts-admin.out日志,啟動成功在日志中會打印出訪問地址,用戶可以通過這個訪問地址訪問了。

2. JobClient(部署)使用

需要引入lts的jar包有lts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。

API方式啟動

JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobClient.start();

// 提交任務
Job job = new Job();
job.setTaskId("3213213123");
job.setParam("shopId", "11111");
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
// job.setCronExpression("0 0/1 * * * ?");  // 支持 cronExpression表達式
// job.setTriggerTime(new Date()); // 支持指定時間執行
Response response = jobClient.submitJob(job);

  Spring XML方式啟動

<bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean">
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_jobClient"/>
    <property name="masterChangeListeners">
        <list>
            <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
        </list>
    </property>
    <property name="jobFinishedHandler">
        <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/>
    </property>
    <property name="configs">
        <props>
            <!-- 參數 -->
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

  Spring 全注解方式

@Configuration
public class LTSSpringConfig {

    @Bean(name = "jobClient")
    public JobClient getJobClient() throws Exception {
        JobClientFactoryBean factoryBean = new JobClientFactoryBean();
        factoryBean.setClusterName("test_cluster");
        factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
        factoryBean.setNodeGroup("test_jobClient");
        factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
                new MasterChangeListenerImpl()
        });
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }
}

  

3. TaskTracker(部署使用)

需要引入lts的jar包有lts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。

定義自己的任務執行類

public class MyJobRunner implements JobRunner {
    private final static BizLogger bizLogger = LtsLoggerFactory.getBizLogger();
    @Override
    public Result run(Job job) throws Throwable {
        try {
            // TODO 業務邏輯
            // 會發送到 LTS (JobTracker上)
            bizLogger.info("測試,業務日志啊啊啊啊啊");

        } catch (Exception e) {
            return new Result(Action.EXECUTE_FAILED, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈");
    }
}

API方式啟動

TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(MyJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20);
taskTracker.start();

Spring XML方式啟動

<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
    <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
    <property name="bizLoggerLevel" value="INFO"/>
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_trade_TaskTracker"/>
    <property name="workThreads" value="20"/>
    <property name="masterChangeListeners">
        <list>
            <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
        </list>
    </property>
    <property name="configs">
        <props>
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

Spring注解方式啟動

@Configuration
public class LTSSpringConfig implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    @Bean(name = "taskTracker")
    public TaskTracker getTaskTracker() throws Exception {
        TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
        factoryBean.setApplicationContext(applicationContext);
        factoryBean.setClusterName("test_cluster");
        factoryBean.setJobRunnerClass(MyJobRunner.class);
        factoryBean.setNodeGroup("test_trade_TaskTracker");
        factoryBean.setBizLoggerLevel("INFO");
        factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
        factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
                new MasterChangeListenerImpl()
        });
        factoryBean.setWorkThreads(20);
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);

        factoryBean.afterPropertiesSet();
//        factoryBean.start();
        return factoryBean.getObject();
    }
}

包引入說明

1. JobTracker,JobClient,TaskTracker都需要引入的包

1.1 lts-core
<dependency>
    <groupId>com.github.ltsopensource</groupId>
    <artifactId>lts-core</artifactId>
    <version>${lts版本號}</version>
</dependency>
1.2 zk客戶端包

二選一, 通過 addConfig(“zk.client”, “可選值: curator, zkclient, lts”) 設置, 如果用lts,可以不用引入包

zkclient

<dependency>
    <groupId>com.github.sgroschupf</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.1</version>
</dependency>

curator

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.9.1</version>
</dependency>

zookeeper包

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>${zk.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.jboss.netty</groupId>
            <artifactId>netty</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>
1.3 通訊包

netty或者mina, 二選一, 通過 addConfig(“lts.remoting”, “可選值: netty, mina”) 設置

netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.0.20.Final</version>
</dependency>

mina

<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-core</artifactId>
    <version>2.0.9</version>
</dependency>
1.4 json包

fastjson或者jackson, 二選一, 通過 addConfig(“lts.json”, “可選值: fastjson, jackson”) 設置

fastjson

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.7</version>
</dependency>

jackson

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.6.3</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.6.3</version>
</dependency>
1.5 日志包

可以選用 slf4j, jcl, log4j, 或者使用jdk原生logger LoggerFactory.setLoggerAdapter(“可選值: slf4j, jcl, log4j, jdk”), 不手動設置, 默認按這個順序加載

log4j

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.16</version>
</dependency>

slf4j

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.5</version>
</dependency>

jcl

<dependency>
    <groupId>commons-logging</groupId>
    <artifactId>commons-logging-api</artifactId>
    <version>1.1</version>
</dependency>
1.6

如果需要spring的話,需要引入lts-spring及spring的相關包

<dependency>
    <groupId>com.github.ltsopensource</groupId>
    <artifactId>lts-spring</artifactId>
    <version>${lts版本號}</version>
</dependency>

2. 對於JobTracker端

2.1 必須引入的包:
<dependency>
    <groupId>com.github.ltsopensource</groupId>
    <artifactId>lts-jobtracker</artifactId>
    <version>${lts版本號}</version>
</dependency>
2.2 除了基礎包之外還需要引入任務隊列的包(可以是mongo或者mysql)

mysql

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.26</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.0.14</version>
</dependency>

mongo

<dependency>
    <groupId>org.mongodb.morphia</groupId>
    <artifactId>morphia</artifactId>
    <version>1.0.0-rc1</version>
</dependency>
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongo-java-driver</artifactId>
    <version>3.0.2</version>
</dependency>

3. JobClient需要引入的包

必須引入的包
<dependency>
    <groupId>com.github.ltsopensource</groupId>
    <artifactId>lts-jobclient</artifactId>
    <version>${project.version}</version>
</dependency>

FailStore存儲包(四選一)

通過 jobClient.addConfig(“job.fail.store”, “可選值: leveldb, mapdb, berkeleydb, rocksdb”) 設置

mapdb

<dependency>
    <groupId>org.mapdb</groupId>
    <artifactId>mapdb</artifactId>
    <version>2.0-beta10</version>
</dependency>

leveldb

<dependency>
    <groupId>org.fusesource.leveldbjni</groupId>
    <artifactId>leveldbjni-all</artifactId>
    <version>1.2.7<version>
</dependency>

berkeleydb

<dependency>
    <groupId>com.sleepycat</groupId>
    <artifactId>je</artifactId>
    <version>5.0.73</version>
</dependency>

rocksdb

<dependency>
    <groupId>org.rocksdb</groupId>
    <artifactId>rocksdbjni</artifactId>
    <version>3.10.1</version>
</dependency>

3. TaskTracker需要引入的包

必須引入的包
<dependency>
    <groupId>com.github.ltsopensource</groupId>
    <artifactId>lts-tasktracker</artifactId>
    <version>${project.version}</version>
</dependency>
FailStore存儲包(四選一)

通過 taskTracker.addConfig(“job.fail.store”, “可選值: leveldb, mapdb, berkeleydb, rocksdb”) 設置

mapdb

<dependency>
    <groupId>org.mapdb</groupId>
    <artifactId>mapdb</artifactId>
    <version>2.0-beta10</version>
</dependency>

leveldb

<dependency>
    <groupId>org.fusesource.leveldbjni</groupId>
    <artifactId>leveldbjni-all</artifactId>
    <version>1.2.7<version>
</dependency>

berkeleydb

<dependency>
    <groupId>com.sleepycat</groupId>
    <artifactId>je</artifactId>
    <version>5.0.73</version>
</dependency>

rocksdb

<dependency>
    <groupId>org.rocksdb</groupId>
    <artifactId>rocksdbjni</artifactId>
    <version>3.10.1</version>
</dependency>

 

 

 

原文地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM