接上文......
(三) LTS簡單集成springboot項目
特別說明:本示例的主要目的僅僅是告訴大家如何使用LTS,所以偷了個懶,將所有節點都揉合到了一個工程,實際項目是分開部署的,因需而定。
整個工程其實很簡單:(一定要先搞明白這個項目結構)

(1) 准備工作
- 新建SpringBoot工程
- 導入相應的依賴,完成項目pom文件
(1.1) 項目依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lts-->
<dependency>
<groupId>com.github.ltsopensource</groupId>
<artifactId>lts</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>2.0-beta10</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.14</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.26</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.20.0-GA</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
注意:
如果你用的是Redis作為注冊中心,mongodb作為任務隊列,那么請引入相應的依賴,我這里用的是Zookeeper和mysql。
(1.2) 項目配置文件
配置文件:
# 應用名稱
spring.application.name=lts
server.port=8081
##########################################
# jobclient->負責提交任務以及接收任務執行結果 #
##########################################
#集群名稱
lts.jobclient.cluster-name=test_cluster
#注冊中心
lts.jobclient.registry-address=zookeeper://127.0.0.1:2181
#JobClient節點組名稱
lts.jobclient.node-group=test_jobClient
#是否使用RetryClient
lts.jobclient.use-retry-client=true
#失敗存儲,用於服務正常后再次執行(容錯處理)
lts.jobclient.configs.job.fail.store=mapdb
#######################################
# jobtracker->負責調度任務 接收並分配任務 #
#######################################
lts.jobtracker.cluster-name=test_cluster
lts.jobtracker.listen-port=35001
lts.jobtracker.registry-address=zookeeper://127.0.0.1:2181
lts.jobtracker.configs.job.logger=mysql
lts.jobtracker.configs.job.queue=mysql
lts.jobtracker.configs.jdbc.url=jdbc:mysql://rm-2ze29e1gr6iu0p31oko.mysql.rds.aliyuncs.com:3306/lts?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
lts.jobtracker.configs.jdbc.username=root
lts.jobtracker.configs.jdbc.password=自己mysql的密碼
###########################################################
# tasktracker->負責執行任務 執行完任務將執行結果反饋給JobTracker #
###########################################################
#lts.tasktracker.cluster-name=test_cluster
lts.tasktracker.registry-address=zookeeper://127.0.0.1:2181
#TaskTracker節點組默認是64個線程用於執行任務
#lts.tasktracker.work-threads=64
lts.tasktracker.node-group=test_trade_TaskTracker
#lts.tasktracker.dispatch-runner.enable=true
#lts.tasktracker.dispatch-runner.shard-value=taskId
lts.tasktracker.configs.job.fail.store=mapdb
################################################################
# jmonitor->負責收集各個節點的監控信息,包括任務監控信息,節點JVM監控信息 #
################################################################
lts.monitor.cluster-name=test_cluster
lts.monitor.registry-address=zookeeper://127.0.0.1:2181
lts.monitor.configs.job.logger=mysql
lts.monitor.configs.job.queue=mysql
lts.monitor.configs.jdbc.url=jdbc:mysql://rm-2ze29e1gr6iu0p31oko.mysql.rds.aliyuncs.com:3306/lts?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
lts.monitor.configs.jdbc.username=root
lts.monitor.configs.jdbc.password=自己mysql的密碼
################################################################
################ log4j.properties日志配置文件 ####################
################################################################
log4j.rootLogger=INFO,stdout
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] (%F:%L) %-5p %c %x - %m%n
(1.3) 數據庫操作
在mysql數據庫中創建一個新的數據庫lts即可,項目初始化會自動創建相應表格
完成以上准備工作之后,接着便是實現任務提交以及任務執行了。
另外說明:以上配置信息,均可以在官方示例lts-example找到。
(2) 項目啟動類設置
開啟相應注解即可:
@SpringBootApplication
@EnableJobClient //JobClient
@EnableTaskTracker //TaskTracker
@EnableJobTracker //JobTracker
@EnableMonitor //Monitor
public class LtstestApplication {
public static void main(String[] args) {
SpringApplication.run(LtstestApplication.class, args);
}
}
(3)JobClient提交任務
關於Jobclient使用的官方建議:
一般在一個JVM中只需要一個JobClient實例即可,不要為每種任務都新建一個JobClient實例,這樣會大大的浪費資源,因為一個JobClient可以提交多種任務。
本示例中,我直接寫在了TestController中,模擬了提交兩個不同的任務:
@Autowired
private JobClient jobClient;
@GetMapping("test01")
public Map<String, Object> test01() {
//模擬提交一個任務
Job job = new Job();
job.setTaskId("task-AAAAAAAAAAAAAAA");
job.setCronExpression("0/3 * * * * ?");
//設置任務類型 區分不同的任務 執行不同的業務邏輯
job.setParam("type", "aType");
job.setNeedFeedback(true);
//任務觸發時間 如果設置了 cron 則該設置無效
// job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());
//任務執行節點組
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
//當任務隊列中存在這個任務的時候,是否替換更新
job.setReplaceOnExist(false);
Map<String, Object> submitResult = new HashMap<String, Object>(4);
try {
//任務提交返回值 response
Response response = jobClient.submitJob(job);
submitResult.put("success", response.isSuccess());
submitResult.put("msg", response.getMsg());
submitResult.put("code", response.getCode());
} catch (Exception e) {
log.error("提交任務失敗", e);
throw new RuntimeException("提交任務失敗");
}
return submitResult;
}
@GetMapping("test02")
public Map<String, Object> test02() {
//模擬提交一個任務
Job job = new Job();
job.setTaskId("task-BBBBBBBBBBBBBBB");
job.setCronExpression("0/6 * * * * ?");
//設置任務類型 區分不同的任務 執行不同的業務邏輯
job.setParam("type", "bType");
job.setNeedFeedback(true);
//任務觸發時間 如果設置了 cron 則該設置無效
// job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());
//任務執行節點組
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
//當任務隊列中存在這個任務的時候,是否替換更新
job.setReplaceOnExist(false);
Map<String, Object> submitResult = new HashMap<String, Object>(4);
try {
Response response = jobClient.submitJob(job);
submitResult.put("success", response.isSuccess());
submitResult.put("msg", response.getMsg());
submitResult.put("code", response.getCode());
} catch (Exception e) {
log.error("提交任務失敗", e);
throw new RuntimeException("提交任務失敗");
}
return submitResult;
}
注意:JobClient我們可以直接引入,然后構建一個Job,通過JobClient進行提交。任務提交之后,JobTracker會對任務進行分發,分發方式有如下兩種:
TaskTracker會定時發送pull請求給JobTracker, 默認1s一次, 在發送pull請求之前,會檢查當前TaskTracker是否有可用的空閑線程,如果沒有則不會發送pull請求,同時也會檢查本節點機器資源是否足夠,主要是檢查cpu和內存使用率,默認超過90%就不會發送pull請求,當JobTracker收到TaskTracker節點的pull請求之后,再從任務隊列中取出相應的已經到了執行時間點的任務 push給TaskTracker,這里push的個數等於TaskTracker的空余線程數。
還有一種途徑是,每個TaskTracker線程處理完當前任務之后,在反饋給JobTracker的時候,同時也會詢問JobTracker是否有新的任務需要執行,如果有JobTracker會同時返回給TaskTracker一個新的任務執行。所以在任務量足夠大的情況下,每個TaskTracker基本上是滿負荷的執行的。
(4) TaskTracker執行任務
關於TaskTracker使用的官方建議:
一個JVM一般也盡量保持只有一個TaskTracker實例即可,多了就可能造成資源浪費。
當遇到一個TaskTracker要運行多種任務的時候,在一個JVM中,最好使用一個TaskTracker去運行多種任務,因為一個JVM中使用多個TaskTracker實例比較浪費資源(當然當你某種任務量比較多的時候,可以將這個任務單獨使用一個TaskTracker節點來執行)。
上面提交了兩個任務,分別是任務A和任務B,所以這里演示的是一個TaskTracker執行多種不同的任務。
任務的執行必須實現JobRunner接口,如下任務A:
public class JobRunnerA implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
// TODO A類型Job的邏輯
System.out.println("我是Runner A");
return null;
}
}
任務B同理,就不重復貼出代碼了
需要指出的是,在SpringBoot中,任務的執行需要添加@JobRunner4TaskTracker注解,但是有且只能有一個@JobRunner4TaskTracker注解。所以,對於同一個TaskTracker執行不同的任務,需要進行調度執行,如下:
/**
* 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
* JobClient 提交 任務時指定 Job 類型 job.setParam("type", "aType")
*/
@JobRunner4TaskTracker
public class JobRunnerDispatcher implements JobRunner {
private static final Logger log = LoggerFactory.getLogger(JobRunnerDispatcher.class);
private static final ConcurrentHashMap<String/*type*/, JobRunner>
JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
static {
JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以從Spring中拿
JOB_RUNNER_MAP.put("bType", new JobRunnerB());
}
@Override
public Result run(JobContext jobContext) throws Throwable {
Job job = jobContext.getJob();
String type = job.getParam("type");
return JOB_RUNNER_MAP.get(type).run(jobContext);
}
}
說明:該JobRunnerDispatcher 類同樣實現了JobRunner接口,並且添加了 @JobRunner4TaskTracker注解,表示該類才是真正會執行任務的地方。通過該類,實現不同的任務執行。
實際上,到這里基本整個LTS任務從提交到執行就已經完成了,也就是簡單的集成完成了。
可以直接啟動項目了~
(5) master節點監聽以及任務完成處理類
這個不必多說,直接看代碼好了(來自lts-example):
/**
* 主節點監聽
*/
@MasterNodeListener
public class MasterNodeChangeListener implements MasterChangeListener {
private static final Logger log = LoggerFactory.getLogger(MasterNodeChangeListener.class);
/**
* @param master master節點
* @param isMaster 表示當前節點是不是master節點
*/
@Override
public void change(Node master, boolean isMaster) {
// 一個節點組master節點變化后的處理 , 譬如我多個JobClient, 但是有些事情只想只有一個節點能做。
if (isMaster) {
log.info("我變成了節點組中的master節點了, 恭喜, 我要放大招了");
} else {
log.info(StringUtils.format("master節點變成了{},不是我,我不能放大招,要猥瑣", master));
}
}
}
/**
* 任務完成處理類
*/
@Component
public class JobCompletedHandlerImpl implements JobCompletedHandler {
private static final Logger log = LoggerFactory.getLogger(JobCompletedHandlerImpl.class);
@Override
public void onComplete(List<JobResult> jobResults) {
//對任務執行結果進行處理 打印相應的日志信息
if (CollectionUtils.isNotEmpty(jobResults)) {
for (JobResult jobResult : jobResults) {
String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
log.info("任務執行完成taskId={}, 執行完成時間={}, job={}",
jobResult.getJob().getTaskId(), time, jobResult.getJob().toString());
}
}
}
}
(6) Admin后台管理
(6.1) 下載源碼
(6.2) 修改配置文件
lts-admin項目下\lightTaskScheduler\lts-admin\src\main\resources
lts-admin.cfg配置文件:(主要修改mysql和zookeeper地址)
// 后台的用戶名密碼
console.username=admin
console.password=admin
# 注冊中心地址,可以是zk,也可以是redis
registryAddress=zookeeper://127.0.0.1:2181
# registryAddress=redis://127.0.0.1:6379
# 集群名稱
clusterName=test_cluster
# zk客戶端,可選值 zkclient, curator
configs.zk.client=zkclient
# ------ 這個是Admin存儲數據的地方,也可以和JobQueue的地址一樣 ------
configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
configs.jdbc.username=fiora
configs.jdbc.password=fiora
# admin 數據使用mysql 默認 mysql, 可以自行擴展
jdbc.datasource.provider=mysql
# 使用 可選值 fastjson, jackson
# configs.lts.json=fastjson
# 是否在admin啟動monitor服務, monitor服務也可以單獨啟動
lts.monitorAgent.enable=true
#======================以下相關配置是JobTracker的JobQueue和JobLogger的相關配置 要保持和JobTracker一樣==========================
## (可選配置)jobT. 開頭的, 因為JobTracker和Admin可能使用的數據庫不是同一個
# LTS業務日志, 可選值 mysql, mongo
jobT.job.logger=mysql
# ---------以下是任務隊列配置-----------
# 任務隊列,可選值 mysql, mongo
jobT.job.queue=mysql
# ------ 1. 如果是mysql作為任務隊列 (如果不配置,表示和Admin的在一個數據庫)------
# jobT.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
# jobT.jdbc.username=root
# jobT.jdbc.password=root
# ------ 2. 如果是mongo作為任務隊列 ------
# jobT.mongo.addresses=127.0.0.1:27017
# jobT.mongo.database=lts
# jobT.mongo.username=xxx #如果有的話
# jobT.mongo.password=xxx #如果有的話
# admin 數據使用mysql 默認 mysql, 可以自行擴展
# jobT.jdbc.datasource.provider=mysql
lts-monitor.cfg,自行修改zookeeper和mysql配置
# 注冊中心地址,可以是zk,也可以是redis
registryAddress=zookeeper://127.0.0.1:2181
# registryAddress=redis://127.0.0.1:6379
# 集群名稱
clusterName=test_cluster
# LTS業務日志, 可選值 mysql, mongo
configs.job.logger=mysql
# zk客戶端,可選值 zkclient, curator
configs.zk.client=zkclient
# ---------以下是任務隊列配置-----------
# 任務隊列,可選值 mysql, mongo
configs.job.queue=mysql
# ------ 1. 如果是mysql作為任務隊列 ------
configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
configs.jdbc.username=fiora
configs.jdbc.password=fiora
# ------ 2. 如果是mongo作為任務隊列 ------
configs.mongo.addresses=127.0.0.1:27017
configs.mongo.database=lts
# configs.mongo.username=xxx #如果有的話
# configs.mongo.password=xxx #如果有的話
# admin 數據使用mysql, h2 默認 h2 embedded
jdbc.datasource.provider=mysql
# 使用 可選值 fastjson, jackson
# configs.lts.json=fastjson
(6.2) 編譯打包
下載完的源碼中,總目錄下,有build.cmd腳本,執行該腳本

編譯,獲得lts-admin管理項目war包
(6.3) 啟動訪問
將war包扔給tomcat執行即可
訪問路徑:

注意:默認賬戶密碼為admin,admin
輕量級分布式任務調度框架LTS系列
輕量級分布式任務調度框架(一、LTS簡介、特點、工作流程)
輕量級分布式任務調度框架(二、LTS編譯、打包、部署)
輕量級分布式任務調度框架(三、LTS簡單集成springboot項目)
