本文講述了一種利用 XXL-JOB 來進行分片任務處理的方法,另外加入對執行節點數的靈活控制。
場景
現在一張數據表里有大量數據需要某個服務端應用來處理,要求:
- 能夠並行處理;
- 能夠較靈活地控制並行任務數量。
- 壓力較均衡地分散到不同的服務器節點;
思路
因為需要並行處理同一張數據表里的數據,所以比較自然地想到了分片查詢數據,可以利用對 id 取模的方法進行分片,避免同一條數據被重復處理。
根據第 1、2 點要求,本來想通過對線程池的動態配置來實現,但結合第 3 點來考慮,服務器節點數量有可能會變化,節點之間相互無感知無通信,自己在應用內實現一套調度機制可能會很復雜。
如果有現成的獨立於這些服務器節點之外的調度器就好了——順着這個思路,就想到了已經接入的分布式任務調度平台 XXL-JOB,而在閱讀其 官方文檔 后發現「分片廣播 & 動態分片」很貼合這種場景。
方案
- 利用 XXL-JOB 的路由策略「分片廣播」來調度定時任務;
- 通過任務參數傳入執行任務節點數量;
- 定時任務邏輯里,根據獲取到的分片參數、執行任務節點數量,決策當前節點是否需要執行,分片查詢數據並處理:
- 如果 分片序號 > (執行任務節點數量 - 1),則當前節點不執行任務,直接返回;
- 否則,取 分片序號 和 執行任務節點數量 作為分片參數,查詢數據並處理。
這樣,我們可以實現靈活調度 [1, N] 個節點並行執行任務處理數據。
主要代碼示例
JobHandler 示例:
@XxlJob("demoJobHandler")
public void execute() {
String param = XxlJobHelper.getJobParam();
if (StringUtils.isBlank(param)) {
XxlJobHelper.log("任務參數為空");
XxlJobHelper.handleFail();
return;
}
// 執行任務節點數量
int executeNodeNum = Integer.valueOf(param);
// 分片序號
int shardIndex = XxlJobHelper.getShardIndex();
// 分片總數
int shardTotal = XxlJobHelper.getShardTotal();
if (executeNodeNum <= 0 || executeNodeNum > shardTotal) {
XxlJobHelper.log("執行任務節點數量取值范圍[1,節點總數]");
XxlJobHelper.handleFail();
return;
}
if (shardIndex > (executeNodeNum - 1)) {
XxlJobHelper.log("當前分片 {} 無需執行", shardIndex);
XxlJobHelper.handleSuccess();
return;
}
shardTotal = executeNodeNum;
// 分片查詢數據並處理
process(shardIndex, shardTotal);
XxlJobHelper.handleSuccess();
}
分片查詢數據示例:
select field1, field2
from table_name
where ...
and mod(id, #{shardTotal}) = #{shardIndex}
order by id limit #{rows};
進一步思考
-
如果需要更大的並發量,需要有大於應用節點數量的任務並行,如何處理?
兩種思路:
- 通過任務參數傳入一個並發數,單個節點在處理任務時,將查詢到的數據按這個數字進行再分片,交由線程池並行處理;
- 配置 M 個定時任務,指定相同的 JobHandler,給它們編號 0、1、2...M,並將定時任務編號和 M 這兩個數,由任務參數傳入,定時任務邏輯里,先根據分片參數、定時任務編號、M,重新計算出新的分片參數,如 分片序號 = (分片序號 * M) + 定時任務編號,分片總數 = 分片總數 * M,再查詢數據並處理。
-
如果有可能頻繁調整任務執行邏輯,包括可能要新增任務參數等,而不想重啟服務器,如何解決?
可以考慮使用 XXL-JOB 的「GLUE模式」任務,能夠在線編輯和更新定時任務執行邏輯。