輕量級分布式任務調度框架(三、LTS簡單集成springboot項目)


接上文......

(三) LTS簡單集成springboot項目

特別說明:本示例的主要目的僅僅是告訴大家如何使用LTS,所以偷了個懶,將所有節點都揉合到了一個工程,實際項目是分開部署的,因需而定。

整個工程其實很簡單:(一定要先搞明白這個項目結構)

(1) 准備工作

  • 新建SpringBoot工程
  • 導入相應的依賴,完成項目pom文件

(1.1) 項目依賴

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--lts-->
        <dependency>
            <groupId>com.github.ltsopensource</groupId>
            <artifactId>lts</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId>org.mapdb</groupId>
            <artifactId>mapdb</artifactId>
            <version>2.0-beta10</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.26</version>
        </dependency>
        <dependency>
            <groupId>org.javassist</groupId>
            <artifactId>javassist</artifactId>
            <version>3.20.0-GA</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

注意:

如果你用的是Redis作為注冊中心,mongodb作為任務隊列,那么請引入相應的依賴,我這里用的是Zookeeper和mysql。

(1.2) 項目配置文件

配置文件:

# 應用名稱
spring.application.name=lts
server.port=8081

##########################################
# jobclient->負責提交任務以及接收任務執行結果 #
##########################################
#集群名稱
lts.jobclient.cluster-name=test_cluster
#注冊中心
lts.jobclient.registry-address=zookeeper://127.0.0.1:2181
#JobClient節點組名稱
lts.jobclient.node-group=test_jobClient
#是否使用RetryClient
lts.jobclient.use-retry-client=true
#失敗存儲,用於服務正常后再次執行(容錯處理)
lts.jobclient.configs.job.fail.store=mapdb

#######################################
# jobtracker->負責調度任務 接收並分配任務 #
#######################################
lts.jobtracker.cluster-name=test_cluster
lts.jobtracker.listen-port=35001
lts.jobtracker.registry-address=zookeeper://127.0.0.1:2181
lts.jobtracker.configs.job.logger=mysql
lts.jobtracker.configs.job.queue=mysql
lts.jobtracker.configs.jdbc.url=jdbc:mysql://rm-2ze29e1gr6iu0p31oko.mysql.rds.aliyuncs.com:3306/lts?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
lts.jobtracker.configs.jdbc.username=root
lts.jobtracker.configs.jdbc.password=自己mysql的密碼

###########################################################
# tasktracker->負責執行任務 執行完任務將執行結果反饋給JobTracker #
###########################################################
#lts.tasktracker.cluster-name=test_cluster
lts.tasktracker.registry-address=zookeeper://127.0.0.1:2181
#TaskTracker節點組默認是64個線程用於執行任務
#lts.tasktracker.work-threads=64
lts.tasktracker.node-group=test_trade_TaskTracker
#lts.tasktracker.dispatch-runner.enable=true
#lts.tasktracker.dispatch-runner.shard-value=taskId
lts.tasktracker.configs.job.fail.store=mapdb


################################################################
# jmonitor->負責收集各個節點的監控信息,包括任務監控信息,節點JVM監控信息 #
################################################################
lts.monitor.cluster-name=test_cluster
lts.monitor.registry-address=zookeeper://127.0.0.1:2181
lts.monitor.configs.job.logger=mysql
lts.monitor.configs.job.queue=mysql
lts.monitor.configs.jdbc.url=jdbc:mysql://rm-2ze29e1gr6iu0p31oko.mysql.rds.aliyuncs.com:3306/lts?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
lts.monitor.configs.jdbc.username=root
lts.monitor.configs.jdbc.password=自己mysql的密碼

################################################################
################ log4j.properties日志配置文件	####################
################################################################
log4j.rootLogger=INFO,stdout
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] (%F:%L) %-5p %c %x - %m%n

(1.3) 數據庫操作

在mysql數據庫中創建一個新的數據庫lts即可,項目初始化會自動創建相應表格

完成以上准備工作之后,接着便是實現任務提交以及任務執行了。
另外說明:以上配置信息,均可以在官方示例lts-example找到。

(2) 項目啟動類設置

開啟相應注解即可:

@SpringBootApplication
@EnableJobClient        //JobClient
@EnableTaskTracker      //TaskTracker
@EnableJobTracker       //JobTracker 
@EnableMonitor          //Monitor
public class LtstestApplication {

    public static void main(String[] args) {
        SpringApplication.run(LtstestApplication.class, args);
    }

}

(3)JobClient提交任務

關於Jobclient使用的官方建議

一般在一個JVM中只需要一個JobClient實例即可,不要為每種任務都新建一個JobClient實例,這樣會大大的浪費資源,因為一個JobClient可以提交多種任務。

本示例中,我直接寫在了TestController中,模擬了提交兩個不同的任務:

 	@Autowired
    private JobClient jobClient;	

	@GetMapping("test01")
    public Map<String, Object> test01() {
        //模擬提交一個任務
        Job job = new Job();
        job.setTaskId("task-AAAAAAAAAAAAAAA");
        job.setCronExpression("0/3 * * * * ?");
        //設置任務類型 區分不同的任務 執行不同的業務邏輯
        job.setParam("type", "aType");
        job.setNeedFeedback(true);
        //任務觸發時間 如果設置了 cron 則該設置無效
//        job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());
        //任務執行節點組
        job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
        //當任務隊列中存在這個任務的時候,是否替換更新
        job.setReplaceOnExist(false);
        Map<String, Object> submitResult = new HashMap<String, Object>(4);
        try {
            //任務提交返回值 response
            Response response = jobClient.submitJob(job);
            submitResult.put("success", response.isSuccess());
            submitResult.put("msg", response.getMsg());
            submitResult.put("code", response.getCode());
        } catch (Exception e) {
            log.error("提交任務失敗", e);
            throw new RuntimeException("提交任務失敗");
        }
        return submitResult;
    }

    @GetMapping("test02")
    public Map<String, Object> test02() {
        //模擬提交一個任務
        Job job = new Job();
        job.setTaskId("task-BBBBBBBBBBBBBBB");
        job.setCronExpression("0/6 * * * * ?");
        //設置任務類型 區分不同的任務 執行不同的業務邏輯
        job.setParam("type", "bType");
        job.setNeedFeedback(true);
        //任務觸發時間 如果設置了 cron 則該設置無效
//        job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());
        //任務執行節點組
        job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
        //當任務隊列中存在這個任務的時候,是否替換更新
        job.setReplaceOnExist(false);
        Map<String, Object> submitResult = new HashMap<String, Object>(4);
        try {
            Response response = jobClient.submitJob(job);
            submitResult.put("success", response.isSuccess());
            submitResult.put("msg", response.getMsg());
            submitResult.put("code", response.getCode());
        } catch (Exception e) {
            log.error("提交任務失敗", e);
            throw new RuntimeException("提交任務失敗");
        }
        return submitResult;
    }

注意:JobClient我們可以直接引入,然后構建一個Job,通過JobClient進行提交。任務提交之后,JobTracker會對任務進行分發,分發方式有如下兩種:

TaskTracker會定時發送pull請求給JobTracker, 默認1s一次, 在發送pull請求之前,會檢查當前TaskTracker是否有可用的空閑線程,如果沒有則不會發送pull請求,同時也會檢查本節點機器資源是否足夠,主要是檢查cpu和內存使用率,默認超過90%就不會發送pull請求,當JobTracker收到TaskTracker節點的pull請求之后,再從任務隊列中取出相應的已經到了執行時間點的任務 push給TaskTracker,這里push的個數等於TaskTracker的空余線程數。

還有一種途徑是,每個TaskTracker線程處理完當前任務之后,在反饋給JobTracker的時候,同時也會詢問JobTracker是否有新的任務需要執行,如果有JobTracker會同時返回給TaskTracker一個新的任務執行。所以在任務量足夠大的情況下,每個TaskTracker基本上是滿負荷的執行的。

(4) TaskTracker執行任務

關於TaskTracker使用的官方建議

一個JVM一般也盡量保持只有一個TaskTracker實例即可,多了就可能造成資源浪費。
當遇到一個TaskTracker要運行多種任務的時候,在一個JVM中,最好使用一個TaskTracker去運行多種任務,因為一個JVM中使用多個TaskTracker實例比較浪費資源(當然當你某種任務量比較多的時候,可以將這個任務單獨使用一個TaskTracker節點來執行)。

上面提交了兩個任務,分別是任務A任務B,所以這里演示的是一個TaskTracker執行多種不同的任務
任務的執行必須實現JobRunner接口,如下任務A:

public class JobRunnerA implements JobRunner {

    @Override
    public Result run(JobContext jobContext) throws Throwable {
        //  TODO A類型Job的邏輯
        System.out.println("我是Runner A");
        return null;
    }

}

任務B同理,就不重復貼出代碼了

需要指出的是,在SpringBoot中,任務的執行需要添加@JobRunner4TaskTracker注解,但是有且只能有一個@JobRunner4TaskTracker注解。所以,對於同一個TaskTracker執行不同的任務,需要進行調度執行,如下:

/**
 * 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
 * JobClient 提交 任務時指定 Job 類型  job.setParam("type", "aType")
 */
@JobRunner4TaskTracker
public class JobRunnerDispatcher implements JobRunner {

    private static final Logger log = LoggerFactory.getLogger(JobRunnerDispatcher.class);

    private static final ConcurrentHashMap<String/*type*/, JobRunner>
            JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();

    static {
        JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以從Spring中拿
        JOB_RUNNER_MAP.put("bType", new JobRunnerB());
    }

    @Override
    public Result run(JobContext jobContext) throws Throwable {
        Job job = jobContext.getJob();
        String type = job.getParam("type");
        return JOB_RUNNER_MAP.get(type).run(jobContext);
    }

}

說明:該JobRunnerDispatcher 類同樣實現了JobRunner接口,並且添加了 @JobRunner4TaskTracker注解,表示該類才是真正會執行任務的地方。通過該類,實現不同的任務執行。

實際上,到這里基本整個LTS任務從提交到執行就已經完成了,也就是簡單的集成完成了。
可以直接啟動項目了~

(5) master節點監聽以及任務完成處理類

這個不必多說,直接看代碼好了(來自lts-example):

/**
 * 主節點監聽
 */
@MasterNodeListener
public class MasterNodeChangeListener implements MasterChangeListener {

    private static final Logger log = LoggerFactory.getLogger(MasterNodeChangeListener.class);


    /**
     * @param master   master節點
     * @param isMaster 表示當前節點是不是master節點
     */
    @Override
    public void change(Node master, boolean isMaster) {
        // 一個節點組master節點變化后的處理 , 譬如我多個JobClient, 但是有些事情只想只有一個節點能做。
        if (isMaster) {
            log.info("我變成了節點組中的master節點了, 恭喜, 我要放大招了");
        } else {
            log.info(StringUtils.format("master節點變成了{},不是我,我不能放大招,要猥瑣", master));
        }
    }

}
/**
 * 任務完成處理類
 */
@Component
public class JobCompletedHandlerImpl implements JobCompletedHandler {

    private static final Logger log = LoggerFactory.getLogger(JobCompletedHandlerImpl.class);

    @Override
    public void onComplete(List<JobResult> jobResults) {
        //對任務執行結果進行處理 打印相應的日志信息
        if (CollectionUtils.isNotEmpty(jobResults)) {
            for (JobResult jobResult : jobResults) {
                String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                log.info("任務執行完成taskId={}, 執行完成時間={}, job={}",
                        jobResult.getJob().getTaskId(), time, jobResult.getJob().toString());
            }
        }
    }

}

(6) Admin后台管理

(6.1) 下載源碼

下載源碼

(6.2) 修改配置文件

lts-admin項目下\lightTaskScheduler\lts-admin\src\main\resources
lts-admin.cfg配置文件:(主要修改mysql和zookeeper地址)

// 后台的用戶名密碼
console.username=admin
console.password=admin

# 注冊中心地址,可以是zk,也可以是redis
registryAddress=zookeeper://127.0.0.1:2181
# registryAddress=redis://127.0.0.1:6379

# 集群名稱
clusterName=test_cluster

# zk客戶端,可選值 zkclient, curator
configs.zk.client=zkclient

# ------ 這個是Admin存儲數據的地方,也可以和JobQueue的地址一樣 ------
configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
configs.jdbc.username=fiora
configs.jdbc.password=fiora

# admin 數據使用mysql 默認 mysql, 可以自行擴展
jdbc.datasource.provider=mysql

# 使用 可選值  fastjson, jackson
# configs.lts.json=fastjson

# 是否在admin啟動monitor服務, monitor服務也可以單獨啟動
lts.monitorAgent.enable=true

#======================以下相關配置是JobTracker的JobQueue和JobLogger的相關配置 要保持和JobTracker一樣==========================
## (可選配置)jobT. 開頭的, 因為JobTracker和Admin可能使用的數據庫不是同一個
# LTS業務日志, 可選值 mysql, mongo
jobT.job.logger=mysql
# ---------以下是任務隊列配置-----------
# 任務隊列,可選值 mysql, mongo
jobT.job.queue=mysql

# ------ 1. 如果是mysql作為任務隊列 (如果不配置,表示和Admin的在一個數據庫)------
# jobT.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
# jobT.jdbc.username=root
# jobT.jdbc.password=root

# ------ 2. 如果是mongo作為任務隊列 ------
# jobT.mongo.addresses=127.0.0.1:27017
# jobT.mongo.database=lts
# jobT.mongo.username=xxx #如果有的話
# jobT.mongo.password=xxx #如果有的話

# admin 數據使用mysql 默認 mysql, 可以自行擴展
# jobT.jdbc.datasource.provider=mysql

lts-monitor.cfg,自行修改zookeeper和mysql配置


# 注冊中心地址,可以是zk,也可以是redis
registryAddress=zookeeper://127.0.0.1:2181
# registryAddress=redis://127.0.0.1:6379

# 集群名稱
clusterName=test_cluster

# LTS業務日志, 可選值 mysql, mongo
configs.job.logger=mysql

# zk客戶端,可選值 zkclient, curator
configs.zk.client=zkclient

# ---------以下是任務隊列配置-----------
# 任務隊列,可選值 mysql, mongo
configs.job.queue=mysql

# ------ 1. 如果是mysql作為任務隊列 ------
configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
configs.jdbc.username=fiora
configs.jdbc.password=fiora

# ------ 2. 如果是mongo作為任務隊列 ------
configs.mongo.addresses=127.0.0.1:27017
configs.mongo.database=lts
# configs.mongo.username=xxx #如果有的話
# configs.mongo.password=xxx #如果有的話

# admin 數據使用mysql, h2 默認 h2 embedded
jdbc.datasource.provider=mysql

# 使用 可選值  fastjson, jackson
# configs.lts.json=fastjson

(6.2) 編譯打包

下載完的源碼中,總目錄下,有build.cmd腳本,執行該腳本

編譯,獲得lts-admin管理項目war

(6.3) 啟動訪問

war包扔給tomcat執行即可
訪問路徑:

http://localhost:8080/項目名/


注意:默認賬戶密碼為admin,admin


輕量級分布式任務調度框架LTS系列

輕量級分布式任務調度框架(一、LTS簡介、特點、工作流程)
輕量級分布式任務調度框架(二、LTS編譯、打包、部署)
輕量級分布式任務調度框架(三、LTS簡單集成springboot項目)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM