LTS分布式任務調度文檔
LTS介紹
LTS(light-task-scheduler)主要用於解決分布式任務調度問題,支持實時任務,定時任務,Cron任務,Repeat任務。有較好的伸縮性,擴展性,健壯穩定性而被多家公司使用,同時也希望開源愛好者一起貢獻。
主要功能
-
- 支持分布式,解決多點故障,支持動態擴容,容錯重試等
-
- Spring擴展支持,SpringBoot支持,Spring Quartz Cron任務的無縫接入支持
-
- 節點監控支持,任務執行監控支持,JVM監控支持
-
- 后台運維操作支持, 可以動態提交,更改,停止 任務
項目地址
- github地址:https://github.com/ltsopensource/light-task-scheduler
- oschina地址:http://git.oschina.net/hugui/light-task-scheduler
- 例子: https://github.com/ltsopensource/lts-examples
- 文檔地址(正在更新中): https://www.gitbook.com/book/qq254963746/lts/details
LTS技術架構
LTS 着力於解決分布式任務調度問題,將任務的提交者和執行者解耦,解決任務執行的單點故障,支持動態擴容,出錯重試等機制。代碼程序設計上,參考了優秀開源項目Dubbo,Hadoop的部分思想。
LTS目前支持四種任務
- 實時任務:提交了之后立即就要執行的任務。
- 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
- Cron任務:CronExpression,和quartz類似(但是不是使用quartz實現的)譬如 0 0/1 * ?
- Repeat任務:譬如每隔5分鍾執行一次,重復50次就停止。
架構設計上,LTS框架中包含以下五種類型的節點
- JobClient :主要負責提交任務, 並接收任務執行反饋結果。
- JobTracker :負責任務調度,接收並分配任務。
- TaskTracker :負責執行任務,執行完反饋給JobTracker。
- LTS-Monitor :主要負責收集各個節點的監控信息,包括任務監控信息,節點JVM監控信息
- LTS-Admin :管理后台)主要負責節點管理,任務隊列管理,監控管理等。
架構圖
- Registry: 注冊中心,LTS提供多種實現,目前支持zookeeper(推薦)和redis, 主要用於LTS的節點信息暴露和master節點選舉。
- FailStore: 失敗存儲,主要用於在部分場景遠程RPC調用失敗的情況,采取現存儲本地KV文件系統,待遠程通信恢復的時候再進行數據補償。目前FailStore場景,主要有RetryJobClient提交**任務失敗的時候,存儲FailStore;TaskTracker返回任務執行結果給JobTracker的失敗 時候,FailStore;TaskTracker提交BizLogger的失敗的時候,存儲FailStore. 目前FailStore有四種實現:leveldb,rocksdb,berkeleydb,mapdb(當然用戶也可以實現擴展接口實現自己的FailStore)
- QueueManager: 任務隊列,目前提供mysql(推薦)和mongodb兩種實現(同樣的用戶可以自己擴容展示其他的,譬如oracle等),主要存儲任務數據和任務執行日志等。
- RPC: 遠程RPC通信框架,目前也支持多種實現,LTS自帶有netty和mina,用戶可以自行選擇,或者自己SPI擴展實現其他的。
- NodeGroup: 節點組,同一個節點組中的任何節點都是對等的,等效的,對外提供相同的服務。譬如TaskTracker中有10個nodeGroup都是send_msg的節點組,專門執行發送短信的任務。每個節點組中都有一個master節點,這個master節點是由LTS動態選出來的,當一個master節點掛掉之后,LTS會立馬選出另外一個master節點,框架提供API監聽接口給用戶。
- ClusterName: LTS集群,就如上圖所示,整個圖就是一個集群,包含LTS的五種節點。
節點圖
-
- 一個節點組等同於一個集群,同一個節點組中的各個節點是對等的,外界無論連接節點組中的任務一個節點都是可以的。
-
- 每個節點組中都有一個master節點,采用zookeeper進行master選舉(master宕機,會自動選舉出新的master節點),框架會提供接口API來監聽master節點的變化,用戶可以自己使用master節點做自己想做的事情。
-
- JobClient和TaskTracker都可以存在多個節點組。譬如 JobClient 可以存在多個節點組。 譬如:JobClient 節點組為 ‘lts_WEB’ 中的一個節點提交提交一個 只有節點組為’lts_TRADE’的 TaskTracker 才能執行的任務。
-
- (每個集群中)JobTacker只有一個節點組。
-
- 多個JobClient節點組和多個TaskTracker節點組再加上一個JobTacker節點組, 組成一個大的集群。
工作流程
-
- JobClient 提交一個 任務 給 JobTracker, 這里我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先存儲到本地leveldb(可以使用NFS來達到同個節點組共享leveldb文件的目的,多線程訪問,做了文件鎖處理),返回給客戶端提交成功的信息,待JobTracker可用的時候,再將任務提交。
-
- JobTracker 收到JobClient提交來的任務,先生成一個唯一的JobID。然后將任務儲存在Mongo集群中。JobTracker 發現有(任務執行的)可用的TaskTracker節點(組) 之后,將優先級最大,最先提交的任務分發給TaskTracker。這里JobTracker會優先分配給比較空閑的TaskTracker節點,達到負載均衡。
-
- TaskTracker 收到JobTracker分發來的任務之后,執行。執行完畢之后,再反饋任務執行結果給JobTracker(成功or 失敗[失敗有失敗錯誤信息]),如果發現JobTacker不可用,那么存儲本地leveldb,等待TaskTracker可用的時候再反饋。反饋結果的同時,詢問JobTacker有沒有新的任務要執行。
-
- JobTacker收到TaskTracker節點的任務結果信息,生成並插入(mongo)任務執行日志。根據任務信息決定要不要反饋給客戶端。不需要反饋的直接刪除, 需要反饋的(同樣JobClient不可用存儲文件,等待可用重發)。
-
- JobClient 收到任務執行結果,進行自己想要的邏輯處理。
特性
-
Spring/Spring Boot支持
LTS可以完全不用Spring框架,但是考慮到很用用戶項目中都是用了Spring框架,所以LTS也提供了對Spring的支持,包括Xml和注解,引入lts-spring.jar即可。
-
業務日志記錄器
在TaskTracker端提供了業務日志記錄器,供應用程序使用,通過這個業務日志器,可以將業務日志提交到JobTracker,這些業務日志可以通過任務ID串聯起來,可以在LTS-Admin中實時查看任務的執行進度。
-
SPI擴展支持
SPI擴展可以達到零侵入,只需要實現相應的接口,並實現即可被LTS使用,目前開放出來的擴展接口有 對任務隊列的擴展,用戶可以不選擇使用mysql或者mongo作為隊列存儲,也可以自己實現。 對業務日志記錄器的擴展,目前主要支持console,mysql,mongo,用戶也可以通過擴展選擇往其他地方輸送日志。
-
節點監控
可以對JobTracker,TaskTracker節點進行資源監控,任務監控等,可以實時的在LTS-Admin管理后台查看,進而進行合理的資源調配。
-
多樣化任務執行結果支持
LTS框架提供四種執行結果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,並對每種結果采取相應的處理機制,譬如重試。 EXECUTE_SUCCESS: 執行成功,這種情況,直接反饋客戶端(如果任務被設置了要反饋給客戶端)。 EXECUTE_FAILED:執行失敗,這種情況,直接反饋給客戶端,不進行重試。 EXECUTE_LATER:稍后執行(需要重試),這種情況,不反饋客戶端,重試策略采用30s的策略,默認最大重試次數為10次,用戶可以通過參數設置修改這些參數。 EXECUTE_EXCEPTION:執行異常, 這中情況也會重試(重試策略,同上)
-
FailStore容錯
采用FailStore機制來進行節點容錯,Fail And Store,不會因為遠程通信的不穩定性而影響當前應用的運行。
-
負載均衡
JobClient 和 TaskTracker會隨機連接JobTracker節點組中的一個節點,實現JobTracker負載均衡。當連接上后,將一直保持連接這個節點,保持連接通道,知道這個節點不可用,減少每次都重新連接一個節點帶來的性能開銷。 JobTracker 分發任務時,是優先分配給最空閑的一個TaskTracker節點,實現TaskTracker節點的負載均衡。
-
健壯性(故障轉移)
當節點組中的一個節點當機之后,自動轉到其他節點工作。當整個節點組當機之后,將會采用存儲文件的方式,待節點組可用的時候進行重發。 當執行任務的TaskTracker節點當機之后,JobTracker 會將這個TaskTracker上的未完成的任務(死任務),重新分配給節點組中其他節點執行。
-
動態擴容
因為LTS各個節點都是無狀態的,所以支持動態增加刪除節點,達到負載均衡的目的
環境准備
-
1. Java JDK
因為LTS是使用Java語言編寫的,所以必須要有個Java編譯運行環境,目前LTS支持JDK1.6及以上版本。
-
2. Maven
LTS項目是基於Maven做項目依賴管理的,所以用戶機器上需要配置Maven環境
- 3. Zookeeper/Redis
因LTS目前支持Zookeeper和Redis作為注冊中心,主要用於節點信息暴露、監聽、master節點選舉。用於選擇其一即可,建議zookeeper。
- 4. Mysql/Mongodb
LTS目前支持Mysql和mongodb作為任務隊列的存儲引擎。用戶同樣的選擇其中一個即可。
部署
部署建議
- Admin后台: 建議Admin后台單獨部署,默認會嵌入一個Monitor
- Monitor:默認在Admin后台進程中有一個,如果一個不夠,也可以單獨啟動多個
- JobTracker: 建議單獨部署
- JobClient:,這個是提交任務的工程,一般是和業務相關的,所以會放在業 務工程中, 當然也要看業務場景
- TaskTracker,這個因為是跑任務的,具體看業務場景, 一般情況下也可以是獨立部署
1. JobTracker和LTS-Admin部署
提供(cmd)windows和(shell)linux兩種版本腳本來進行編譯和部署:
- 運行根目錄下的sh build.sh或build.cmd腳本,會在dist目錄下生成lts-{version}-bin文件夾
- 下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啟動腳本。jobtracker 中是 JobTracker的配置文件和需要使用到的jar包,lts-admin是LTS-Admin相關的war包和配置文件。 lts-{version}-bin的文件結構
-- lts-${version}-bin |-- bin | |-- jobtracker.cmd | |-- jobtracker.sh | |-- lts-admin.cmd | |-- lts-admin.sh | |-- lts-monitor.cmd | |-- lts-monitor.sh | |-- tasktracker.sh |-- conf | |-- log4j.properties | |-- lts-admin.cfg | |-- lts-monitor.cfg | |-- readme.txt | |-- tasktracker.cfg | |-- zoo | |-- jobtracker.cfg | |-- log4j.properties | |-- lts-monitor.cfg |-- lib | |-- *.jar |-- war |-- jetty | |-- lib | |-- *.jar |-- lts-admin.war
JobTracker啟動
如果你想啟動一個節點,直接修改下conf/zoo下的配置文件,然后運行 sh jobtracker.sh zoo start即可,如果你想啟動兩個JobTracker節點,那么你需要拷貝一份zoo,譬如命名為zoo2,修改下zoo2下的配置文件,然后運行sh jobtracker.sh zoo2 start即可。logs文件夾下生成jobtracker-zoo.out日志。
LTS-Admin啟動
修改conf/lts-monitor.cfg和conf/lts-admin.cfg下的配置,然后運行bin下的sh lts-admin.sh或lts-admin.cmd腳本即可。logs文件夾下會生成lts-admin.out日志,啟動成功在日志中會打印出訪問地址,用戶可以通過這個訪問地址訪問了。
2. JobClient(部署)使用
需要引入lts的jar包有lts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。
API方式啟動
JobClient jobClient = new RetryJobClient(); jobClient.setNodeGroup("test_jobClient"); jobClient.setClusterName("test_cluster"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start(); // 提交任務 Job job = new Job(); job.setTaskId("3213213123"); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表達式 // job.setTriggerTime(new Date()); // 支持指定時間執行 Response response = jobClient.submitJob(job);
Spring XML方式啟動
<bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_jobClient"/> <property name="masterChangeListeners"> <list> <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="jobFinishedHandler"> <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/> </property> <property name="configs"> <props> <!-- 參數 --> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
Spring 全注解方式
@Configuration public class LTSSpringConfig { @Bean(name = "jobClient") public JobClient getJobClient() throws Exception { JobClientFactoryBean factoryBean = new JobClientFactoryBean(); factoryBean.setClusterName("test_cluster"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setNodeGroup("test_jobClient"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); return factoryBean.getObject(); } }
3. TaskTracker(部署使用)
需要引入lts的jar包有lts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。
定義自己的任務執行類
public class MyJobRunner implements JobRunner { private final static BizLogger bizLogger = LtsLoggerFactory.getBizLogger(); @Override public Result run(Job job) throws Throwable { try { // TODO 業務邏輯 // 會發送到 LTS (JobTracker上) bizLogger.info("測試,業務日志啊啊啊啊啊"); } catch (Exception e) { return new Result(Action.EXECUTE_FAILED, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈"); } }
API方式啟動
TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(MyJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setClusterName("test_cluster"); taskTracker.setWorkThreads(20); taskTracker.start();
Spring XML方式啟動
<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start"> <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/> <property name="bizLoggerLevel" value="INFO"/> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_trade_TaskTracker"/> <property name="workThreads" value="20"/> <property name="masterChangeListeners"> <list> <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="configs"> <props> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
Spring注解方式啟動
@Configuration public class LTSSpringConfig implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Bean(name = "taskTracker") public TaskTracker getTaskTracker() throws Exception { TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean(); factoryBean.setApplicationContext(applicationContext); factoryBean.setClusterName("test_cluster"); factoryBean.setJobRunnerClass(MyJobRunner.class); factoryBean.setNodeGroup("test_trade_TaskTracker"); factoryBean.setBizLoggerLevel("INFO"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); factoryBean.setWorkThreads(20); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); // factoryBean.start(); return factoryBean.getObject(); } }
包引入說明
1. JobTracker,JobClient,TaskTracker都需要引入的包
1.1 lts-core
<dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-core</artifactId> <version>${lts版本號}</version> </dependency>
1.2 zk客戶端包
二選一, 通過 addConfig(“zk.client”, “可選值: curator, zkclient, lts”) 設置, 如果用lts,可以不用引入包
zkclient
<dependency> <groupId>com.github.sgroschupf</groupId> <artifactId>zkclient</artifactId> <version>0.1</version> </dependency>
curator
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.9.1</version> </dependency>
zookeeper包
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zk.version}</version> <exclusions> <exclusion> <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
1.3 通訊包
netty或者mina, 二選一, 通過 addConfig(“lts.remoting”, “可選值: netty, mina”) 設置
netty
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.20.Final</version> </dependency>
mina
<dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.9</version> </dependency>
1.4 json包
fastjson或者jackson, 二選一, 通過 addConfig(“lts.json”, “可選值: fastjson, jackson”) 設置
fastjson
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency>
jackson
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.6.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.3</version> </dependency>
1.5 日志包
可以選用 slf4j, jcl, log4j, 或者使用jdk原生logger LoggerFactory.setLoggerAdapter(“可選值: slf4j, jcl, log4j, jdk”), 不手動設置, 默認按這個順序加載
log4j
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency>
slf4j
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency>
jcl
<dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging-api</artifactId> <version>1.1</version> </dependency>
1.6
如果需要spring的話,需要引入lts-spring及spring的相關包
<dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-spring</artifactId> <version>${lts版本號}</version> </dependency>
2. 對於JobTracker端
2.1 必須引入的包:
<dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-jobtracker</artifactId> <version>${lts版本號}</version> </dependency>
2.2 除了基礎包之外還需要引入任務隊列的包(可以是mongo或者mysql)
mysql
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.26</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.14</version> </dependency>
mongo
<dependency> <groupId>org.mongodb.morphia</groupId> <artifactId>morphia</artifactId> <version>1.0.0-rc1</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.0.2</version> </dependency>
3. JobClient需要引入的包
必須引入的包
<dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-jobclient</artifactId> <version>${project.version}</version> </dependency>
FailStore存儲包(四選一)
通過 jobClient.addConfig(“job.fail.store”, “可選值: leveldb, mapdb, berkeleydb, rocksdb”) 設置
mapdb
<dependency> <groupId>org.mapdb</groupId> <artifactId>mapdb</artifactId> <version>2.0-beta10</version> </dependency>
leveldb
<dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.2.7<version> </dependency>
berkeleydb
<dependency> <groupId>com.sleepycat</groupId> <artifactId>je</artifactId> <version>5.0.73</version> </dependency>
rocksdb
<dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>3.10.1</version> </dependency>
3. TaskTracker需要引入的包
必須引入的包
<dependency> <groupId>com.github.ltsopensource</groupId> <artifactId>lts-tasktracker</artifactId> <version>${project.version}</version> </dependency>
FailStore存儲包(四選一)
通過 taskTracker.addConfig(“job.fail.store”, “可選值: leveldb, mapdb, berkeleydb, rocksdb”) 設置
mapdb
<dependency> <groupId>org.mapdb</groupId> <artifactId>mapdb</artifactId> <version>2.0-beta10</version> </dependency>
leveldb
<dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.2.7<version> </dependency>
berkeleydb
<dependency> <groupId>com.sleepycat</groupId> <artifactId>je</artifactId> <version>5.0.73</version> </dependency>
rocksdb
<dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>3.10.1</version> </dependency>