前言:學習一個技術最好的方式就是先學會運行一個demo,去了解一個大概后,再去深究細節;
所以,本篇教大家首先如何部署任務的調度中心控制台,然后書寫自己的任務以及部署執行器,最后通過控制台來實現任務調度;
源碼鏈接:
xxl-job-master(任務調度管理中心控制台): https://files.cnblogs.com/files/blogs/721495/xxl-job-master.zip?t=1647765632
xxljob-demo(任務編寫,任務執行器):https://files.cnblogs.com/files/blogs/721495/xxljob-demo.zip?t=1647765603
一、部署調度中心
下載地址:https://files.cnblogs.com/files/blogs/721495/xxl-job-master.zip?t=1647765632 步驟如下
1、復制下載好代碼中doc目錄下的db目錄中的sql語句到MySQL執行一遍創建數據庫。

2、xxl-job-admin即為調度中心控制台,需要修改application.properties文件中數據源的配置信息為自己剛剛創建的數據庫。
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
3、啟動xxl-job-admin項目,訪問http://localhost:8080/xxl-job-admin,用戶名:admin,密碼:123456,到此調度中心部署完畢。

二、編寫任務,部署執行器
剛才下載的代碼中,其實xxl-job-executor-samples模塊中包含官方提供的編寫任務部署執行器的相關代碼,但這邊為了更好地理解,我們將這部分代碼重新抽離了出來;
源碼鏈接: https://files.cnblogs.com/files/blogs/721495/xxljob-demo.zip?t=1647765603 主要步驟如下:
2.1、添加依賴
<!-- 任務調度xxl-job -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
2.2、編寫application.yml配置文件
注意點:xxl.job.admin.addresses這邊地址的書寫不要用127.0.0.1,通過ipconfig去查看具體是多少,不然后面執行任務會報錯,端口為9999,后續會用到;
server:
port: 8186
spring:
application:
name: xxljob-demo
# xxl-job配置
xxl:
job:
admin:
# 調度中心部署跟地址 [選填]:如調度中心集群部署存在多個地址則用逗號分隔。執行器將會使用該地址進行"執行器心跳注冊"和"任務結果回調";為空則關閉自動注冊;
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
# 執行器注冊 [選填]:優先使用該配置作為注冊地址,為空時使用內嵌服務 ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執行器動態IP和動態映射端口問題。
address:
# 執行器AppName [選填]:執行器心跳注冊分組依據;為空則關閉自動注冊
appname: demo-app
# 執行器IP [選填]:默認為空表示自動獲取IP,多網卡時可手動設置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用於 "執行器注冊" 和 "調度中心請求並觸發任務";
ip:
# 執行器端口號 [選填]:小於等於0則自動獲取;默認端口為9999,單機部署多個執行器時,注意要配置不同執行器端口;
port: 9999
# 執行器運行日志文件存儲磁盤路徑 [選填] :需要對該路徑擁有讀寫權限;為空則使用默認路徑;
logpath: /Users/luoyu/Documents/log/xxl-job/jobhandler
# 執行器日志文件保存天數 [選填] : 過期日志自動清理, 限制值大於等於3時生效; 否則, 如-1, 關閉自動清理功能;
logretentiondays: 15
# 執行器通訊TOKEN [選填]:非空時啟用;
accessToken:
2.2、編寫配置類
這邊配置類主要是用到了剛才配置文件中定義的信息來配置XxlJobConfig;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 針對多網卡、容器內部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊IP;
*
* 1、引入依賴:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器啟動變量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、獲取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
2.3、創建各種任務
簡單任務、分片廣播任務、命令行任務、跨平台Http任務、生命周期任務示例:任務初始化與銷毀時,支持自定義相關邏輯;
@XxlJob("demoJobHandler") :該注解中的value待會在控制台會用到,用來關聯該任務以便后續的啟動執行;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit;
/**
* XxlJob開發示例(Bean模式)
*
* 開發步驟:
* 1、在Spring Bean實例中,開發Job方法,方式格式要求為 "public ReturnT<String> execute(String param)"
* 2、為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應的是調度中心新建任務的JobHandler屬性的值。
* 3、執行日志:需要通過 "XxlJobLogger.log" 打印執行日志;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Slf4j
@Component
public class XxlJobService {
/**
* 1、簡單任務示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
/**
* 2、分片廣播任務
*/
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片參數
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業務邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return ReturnT.SUCCESS;
}
/**
* 3、命令行任務
*/
@XxlJob("commandJobHandler")
public ReturnT<String> commandJobHandler(String param) throws Exception {
String command = param;
int exitValue = -1;
BufferedReader bufferedReader = null;
try {
// command process
Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobLogger.log(line);
}
// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobLogger.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
}
}
/**
* 4、跨平台Http任務
*/
@XxlJob("httpJobHandler")
public ReturnT<String> httpJobHandler(String param) throws Exception {
// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(param);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod("GET");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// do connection
connection.connect();
//Map<String, List<String>> map = connection.getHeaderFields();
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();
XxlJobLogger.log(responseMsg);
return ReturnT.SUCCESS;
} catch (Exception e) {
XxlJobLogger.log(e);
return ReturnT.FAIL;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobLogger.log(e2);
}
}
}
/**
* 5、生命周期任務示例:任務初始化與銷毀時,支持自定義相關邏輯;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public ReturnT<String> demoJobHandler2(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
return ReturnT.SUCCESS;
}
public void init(){
log.info("init");
}
public void destroy(){
log.info("destory");
}
}
2.4、啟動XxljobApplication
控制台信息:

三、控制台調度執行任務
3.1、觀察已存在的任務信息
可以看到控制台上已經有了一個任務,通過操作中的編輯,我們可以注意到JobHandler與第一個簡單任務注解中@XxlJob("demoJobHandler")的value相同;


3.2、啟動任務執行
這邊的機器地址需要與任務執行器的地址一致,端口因為我們在編寫任務的配置文件中定義了9999;

3.3、查詢日志



5 總結
通過簡單任務模式去了解了 如何去編寫任務以及控制台如何去管理調度執行任務,其他的幾種任務方式大家可以自行去理解哈, 覺得有幫助的話可以關注下公眾號哈,歡迎關注微信公眾號<彭曉琪>
少年的志向,不應該是房子,他們應該伏案疾書,或為心中的夢想而揮灑汗水,暢想着自己未來光明的人生,少年的夢想,也不應該是生活,他們應該想要集齊七顆龍珠,或者幻想着擁有一顆皮卡丘。歡迎關注微信公眾號<彭曉琪>

