分布式任務調度平台xxl-job


1,在工作中,經常會涉及到定時任務,那么定時任務的實現方式有哪些?

實現方式1:Thread類下通過sleep方法來實現定時任務的效果

                    缺點:過於簡單,只能實現很簡單的定時任務的邏輯,不夠靈活

public class Demo01 {
    static long count = 0;
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(1000);
                        count++;
                        System.out.println(count);
                    } catch (Exception e) {
                    }
                }
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
    }
}

實現方式二:通過TimerTask 來實現

                     邏輯比較簡單,只能實現簡單的功能

package com.aiyuesheng;

import java.util.Timer;
import java.util.TimerTask;

public class Test {
    static long count = 0;

    public static void main(String[] args) {
        TimerTask timerTask = new TimerTask() {

            @Override
            public void run() {
                count++;
                System.out.println(count);
            }
        };
        Timer timer = new Timer();
        // 天數
        long delay = 0;
        // 秒數,每隔2秒來執行下定時任務
        long period = 2000;
        timer.scheduleAtFixedRate(timerTask, delay, period);
    }
}

實現方式三:ScheduledExecutorService 線程池實現方式

public class Test02 {
            public static void main(String[] args) {
                Runnable runnable = new Runnable() {
                    public void run() {
                        // task to run goes here
                        System.out.println("Hello !!");
                    }
                };
                ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
                // 第二個參數為首次執行的延時時間,第三個參數為定時執行的間隔時間
                service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
            }
}

實現方式四:quartz 

maven 包:

<!-- quartz 定時任務 -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>

新建定時任務:

package com.aiyuesheng.quartz;

import java.util.Date;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext arg0) throws JobExecutionException {
        System.out.println("quartz MyJob date:" + new Date().getTime());
    }

}

啟動定時任務

package com.aiyuesheng.quartz;

import java.util.Date;

import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class App {
    
    public static void main(String[] args) throws SchedulerException {
         //1.創建Scheduler的工廠
        SchedulerFactory sf = new StdSchedulerFactory();
        //2.從工廠中獲取調度器實例
        Scheduler scheduler = sf.getScheduler();
        //3.創建JobDetail
        JobDetail jb = JobBuilder.newJob(MyJob.class)
                .withDescription("this is a ram job") //job的描述
                .withIdentity("ramJob", "ramGroup") //job 的name和group
                .build();

        //任務運行的時間,SimpleSchedle類型觸發器有效
        long time=  System.currentTimeMillis() + 3*1000L; //3秒后啟動任務
        Date statTime = new Date(time);

        //4.創建Trigger
            //使用SimpleScheduleBuilder或者CronScheduleBuilder
        Trigger t = TriggerBuilder.newTrigger()
                    .withDescription("")
                    .withIdentity("ramTrigger", "ramTriggerGroup")
                    //.withSchedule(SimpleScheduleBuilder.simpleSchedule())
                    .startAt(statTime)  //默認當前時間啟動
                    .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //兩秒執行一次
                    .build();

        //5.注冊任務和定時器
        scheduler.scheduleJob(jb, t);
        //6.啟動 調度器
        scheduler.start();
    }
}

實現方式五:springboot 項目中通過@schedule 

//或直接指定時間間隔,例如:5秒
//@Scheduled(fixedRate=5000)

@Component
public class SecheduleJob {
    @Scheduled(cron = "0/5 * * * * ?")
    private void configureTasks() {
        System.err.println("執行靜態定時任務時間: " + LocalDateTime.now());
    }
}

 

在集群環境下:定時任務可能會出現重復執行的情況

怎么避免定時任務重復執行?

1,可以使用zookeeper 實現分布式鎖,獲得鎖的JVM 執行定時任務

     缺點:不易擴展,需要使用創建臨時節點,事件通知,比較繁瑣

2,XXLJOB 分布式調度平台

簡介:

XXLJOB介紹
1、簡單:支持通過Web頁面對任務進行CRUD操作,操作簡單,一分鍾上手;
2、動態:支持動態修改任務狀態、暫停/恢復任務,以及終止運行中任務,即時生效;
3、調度中心HA(中心式):調度采用中心式設計,“調度中心”基於集群Quartz實現,可保證調度中心HA;
4、執行器HA(分布式):任務分布式執行,任務"執行器"支持集群部署,可保證任務執行HA;
5、任務Failover:執行器集群部署時,任務路由策略選擇"故障轉移"情況下調度失敗時將會平滑切換執行器進行Failover;
6、一致性:“調度中心”通過DB鎖保證集群分布式調度的一致性, 一次任務調度只會觸發一次執行;
7、自定義任務參數:支持在線配置調度任務入參,即時生效;
8、調度線程池:調度系統多線程觸發調度運行,確保調度精確執行,不被堵塞;
9、彈性擴容縮容:一旦有新執行器機器上線或者下線,下次調度時將會重新分配任務;
10、郵件報警:任務失敗時支持郵件報警,支持配置多郵件地址群發報警郵件;
11、狀態監控:支持實時監控任務進度;
12、Rolling執行日志:支持在線查看調度結果,並且支持以Rolling方式實時查看執行器輸出的完整的執行日志;
13、GLUE:提供Web IDE,支持在線開發任務邏輯代碼,動態發布,實時編譯生效,省略部署上線的過程。支持30個版本的歷史版本回溯。
14、數據加密:調度中心和執行器之間的通訊進行數據加密,提升調度信息安全性;
15、任務依賴:支持配置子任務依賴,當父任務執行結束且執行成功后將會主動觸發一次子任務的執行, 多個子任務用逗號分隔;
16、推送maven中央倉庫: 將會把最新穩定版推送到maven中央倉庫, 方便用戶接入和使用;
17、任務注冊: 執行器會周期性自動注冊任務, 調度中心將會自動發現注冊的任務並觸發執行。同時,也支持手動錄入執行器地址;
18、路由策略:執行器集群部署時提供豐富的路由策略,包括:第一個、最后一個、輪詢、隨機、一致性HASH、最不經常使用、最近最久未使用、故障轉移、忙碌轉移等;
19、運行報表:支持實時查看運行數據,如任務數量、調度次數、執行器數量等;以及調度報表,如調度日期分布圖,調度成功分布圖等;
20、腳本任務:支持以GLUE模式開發和運行腳本任務,包括Shell、Python等類型腳本;
21、阻塞處理策略:調度過於密集執行器來不及處理時的處理策略,策略包括:單機串行(默認)、丟棄后續調度、覆蓋之前調度;
22、失敗處理策略;調度失敗時的處理策略,策略包括:失敗告警(默認)、失敗重試;
23、分片廣播任務:執行器集群部署時,任務路由策略選擇"分片廣播"情況下,一次任務調度將會廣播觸發對應集群中所有執行器執行一次任務,同時傳遞分片參數;可根據分片參數開發分片任務;
24、動態分片:分片廣播任務以執行器為維度進行分片,支持動態擴容執行器集群從而動態增加分片數量,協同進行業務處理;在進行大數據量業務操作時可顯著提升任務處理能力和速度。
25、事件觸發:除了"Cron方式"和"任務依賴方式"觸發任務執行之外,支持基於事件的觸發任務方式。調度中心提供觸發任務單次執行的API服務,可根據業務事件靈活觸發。

開源項目下載:https://github.com/xuxueli/xxl-job

文檔:http://www.xuxueli.com/xxl-job/#/?id=%e4%b8%80%e3%80%81%e7%ae%80%e4%bb%8b

 

項目下載完成,放入到本地,解決jar包問題,之后,如下:

- /doc :文檔資料

- /db :“調度數據庫”建表腳本

- /xxl-job-admin :調度中心,項目源碼

- /xxl-job-core :公共Jar依賴

- /xxl-job-executor-samples :執行器,Sample示例項目(大家可以在該項目上進行開發,也可以將現有項目改造生成執行器項目)

第一步:將xxl-job-master 里面的sql 執行到mysql 數據庫

第二步:配置xxl-job-admin (job 注冊中心)

### xxl-job db  (use &amp; replace & in xml)
xxl.job.db.driverClass=com.mysql.jdbc.Driver
xxl.job.db.url=jdbc:mysql://localhost:3306/xxl-job?useUnicode=true&characterEncoding=UTF-8
xxl.job.db.user=root
xxl.job.db.password=123456

### xxl-job email
xxl.job.mail.host=smtp.qq.com
xxl.job.mail.port=25
xxl.job.mail.ssl=false
xxl.job.mail.username=12222240325@qq.com
xxl.job.mail.password=tusupjsyygechdbb  
xxl.job.mail.sendNick=《任務調度平台XXL-JOB》

### xxl-job login
xxl.job.login.username=admin
xxl.job.login.password=123456

### xxl-job, access token
xxl.job.accessToken=

### xxl-job, i18n (default empty as chinese, "en" as english)
xxl.job.i18n=
xxl.job.mail.password 是郵件的授權碼,不是密碼

第三步:配置完成之后,放到tomcat 運行,啟動,打開頁面

 

在執行器管理界新增執行器:執行器:代表了這個job 真實的服務器,比如一個springboot 定時任務,里面會有一個內嵌的服務,就是執行器,端口默認9999,springboot 項目啟動,會將服務信息注冊到xxl-admin 容器,xxl-admin 頁面執行任務,分發給這個執行器,會找到這個springboot 服務,然后開始執行

新建執行任務:指定哪個執行器,jobHandler,執行事件,路由策略等

路由策略:第一台(指定第一台機器執行)

                  輪詢(輪詢機制)

                  分片廣播:任務場景:10個執行器的集群來處理10w條數據,每台機器只需要處理1w條數據,耗時降低10倍;

 

第四部:新建springboot job

# web port
server.port=8081

# log config
logging.config=classpath:logback.xml


### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job executor address
xxl.job.executor.appname=aiyuesheng-job
xxl.job.executor.ip=
xxl.job.executor.port=9999

### xxl-job, access token
xxl.job.accessToken=

### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job log retention days
xxl.job.executor.logretentiondays=-1

Job 邏輯 @JobHandler(value = "demoJobHandler") 是和執行任務里面的JobHandler 相對應

@JobHandler(value = "demoJobHandler")
@Component
public class DemoJobHandler extends IJobHandler {
    @Value("${xxl.job.executor.port}")
    private String executorPort;

    // @JobHandler 底層實現 value demoJobHandler 名稱 對應存放類的class地址
    // com.xxl.job.executor.service.jobhandler 在使用反射機制 執行execute
    // 目的是為了反射統一執行方法
    @Override
    public ReturnT<String> execute(String param) throws Exception {
        // 任務調度執行地址
        int i = 1/0;
        System.out.println("####DemoJobHandler####execute()執行 executorPort:" + executorPort);
        return SUCCESS;
    }

}

 

調度中心集群

如果要配置集群的話,采用Nginx 做負載均衡就可以
調度中心支持集群部署,提升調度系統容災和可用性。
調度中心集群部署時,幾點要求和建議:

DB配置保持一致;

登陸賬號配置保持一致;

群機器時鍾保持一致(單機集群忽視);

建議:推薦通過nginx為調度中心集群做負載均衡,分配域名。調度中心訪問、執行器回調配置、調用API服務等操作均通過該域名進行。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM