閱讀一篇「定時任務框架選型」的文章時,一位網友的留言電到了我:
我看過那么多所謂的教程,大部分都是教“如何使用工具”的,沒有多少是教“如何制作工具”的,能教“如何仿制工具”的都已經是鳳毛麟角,中國 軟件行業,缺的是真正可以“制作工具”的程序員,而絕對不缺那些“使用工具”的程序員! ...... ”這個業界最不需要的就是“會使用XX工具的工程師”,而是“有創造力的軟件工程師”!業界所有的飯碗,本質就是“有創造力的軟件工程師”提供出來的啊!
寫這篇文章,想和大家從頭到腳說說任務調度,希望大家讀完之后,能夠理解實現一個任務調度系統的核心邏輯。
1 Quartz
Quartz是一款Java開源任務調度框架,也是很多Java工程師接觸任務調度的起點。
下圖顯示了任務調度的整體流程:
Quartz的核心是三個組件。
- 任務:Job 用於表示被調度的任務;
- 觸發器:Trigger 定義調度時間的元素,即按照什么時間規則去執行任務。一個Job可以被多個Trigger關聯,但是一個Trigger 只能關聯一個Job;
- 調度器 :工廠類創建Scheduler,根據觸發器定義的時間規則調度任務。
上圖代碼中Quartz 的JobStore是 RAMJobStore,Trigger 和 Job 存儲在內存中。
執行任務調度的核心類是 QuartzSchedulerThread 。
- 調度線程從JobStore中獲取需要執行的的觸發器列表,並修改觸發器的狀態;
- Fire觸發器,修改觸發器信息(下次執行觸發器的時間,以及觸發器狀態),並存儲起來。
- 最后創建具體的執行任務對象,通過worker線程池執行任務。
接下來再聊聊 Quartz 的集群部署方案。
Quartz的集群部署方案,需要針對不同的數據庫類型(MySQL , ORACLE) 在數據庫實例上創建Quartz表,JobStore是: JobStoreSupport 。
這種方案是分布式的,沒有負責集中管理的節點,而是利用數據庫行級鎖的方式來實現集群環境下的並發控制。
scheduler實例在集群模式下首先獲取{0}LOCKS表中的行鎖,Mysql 獲取行鎖的語句:
{0}會替換為配置文件默認配置的QRTZ_
。sched_name為應用集群的實例名,lock_name就是行級鎖名。Quartz主要有兩個行級鎖觸發器訪問鎖 (TRIGGER_ACCESS) 和 狀態訪問鎖(STATE_ACCESS)。
這個架構解決了任務的分布式調度問題,同一個任務只能有一個節點運行,其他節點將不執行任務,當碰到大量短任務時,各個節點頻繁的競爭數據庫鎖,節點越多性能就會越差。
2 分布式鎖模式
Quartz的集群模式可以水平擴展,也可以分布式調度,但需要業務方在數據庫中添加對應的表,有一定的強侵入性。
有不少研發同學為了避免這種侵入性,也探索出分布式鎖模式。
業務場景:電商項目,用戶下單后一段時間沒有付款,系統就會在超時后關閉該訂單。
通常我們會做一個定時任務每兩分鍾來檢查前半小時的訂單,將沒有付款的訂單列表查詢出來,然后對訂單中的商品進行庫存的恢復,然后將該訂單設置為無效。
我們使用Spring Schedule的方式做一個定時任務。
@Scheduled(cron = "0 */2 * * * ? ")
public void doTask() {
log.info("定時任務啟動");
//執行關閉訂單的操作
orderService.closeExpireUnpayOrders();
log.info("定時任務結束");
}
在單服務器運行正常,考慮到高可用,業務量激增,架構會演進成集群模式,在同一時刻有多個服務執行一個定時任務,有可能會導致業務紊亂。
解決方案是在任務執行的時候,使用Redis 分布式鎖來解決這類問題。
@Scheduled(cron = "0 */2 * * * ? ")
public void doTask() {
log.info("定時任務啟動");
String lockName = "closeExpireUnpayOrdersLock";
RedisLock redisLock = redisClient.getLock(lockName);
//嘗試加鎖,最多等待3秒,上鎖以后5分鍾自動解鎖
boolean locked = redisLock.tryLock(3, 300, TimeUnit.SECONDS);
if(!locked){
log.info("沒有獲得分布式鎖:{}" , lockName);
return;
}
try{
//執行關閉訂單的操作
orderService.closeExpireUnpayOrders();
} finally {
redisLock.unlock();
}
log.info("定時任務結束");
}
Redis的讀寫性能極好,分布式鎖也比Quartz數據庫行級鎖更輕量級。當然Redis鎖也可以替換成Zookeeper鎖,也是同樣的機制。
在小型項目中,使用:定時任務框架(Quartz/Spring Schedule)和 分布式鎖(redis/zookeeper)有不錯的效果。
但是呢?我們可以發現這種組合有兩個問題:
- 定時任務在分布式場景下有空跑的情況,而且任務也無法做到分片;
- 要想手工觸發任務,必須添加額外的代碼才能完成。
3 ElasticJob-Lite 框架
ElasticJob-Lite 定位為輕量級無中心化解決方案,使用 jar 的形式提供分布式任務的協調服務。
應用內部定義任務類,實現SimpleJob接口,編寫自己任務的實際業務流程即可。
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
舉例:應用A有五個任務需要執行,分別是A,B,C,D,E。任務E需要分成四個子任務,應用部署在兩台機器上。
應用A在啟動后, 5個任務通過 Zookeeper 協調后被分配到兩台機器上,通過Quartz Scheduler 分開執行不同的任務。
ElasticJob 從本質上來講 ,底層任務調度還是通過 Quartz ,相比Redis分布式鎖 或者 Quartz 分布式部署 ,它的優勢在於可以依賴 Zookeeper 這個大殺器 ,將任務通過負載均衡算法分配給應用內的 Quartz Scheduler容器。
從使用者的角度來講,是非常簡單易用的。但從架構來看,調度器和執行器依然在同一個應用方JVM內,而且容器在啟動后,依然需要做負載均衡。應用假如頻繁的重啟,不斷的去選主,對分片做負載均衡,這些都是相對比較重的操作。
另外,ElasticJob 的控制台是比較粗糙的,通過讀取注冊中心數據展現作業狀態,更新注冊中心數據修改全局任務配置。
4 中心化流派
中心化的原理是:把調度和任務執行,隔離成兩個部分:調度中心和執行器。調度中心模塊只需要負責任務調度屬性,觸發調度命令。執行器接收調度命令,去執行具體的業務邏輯,而且兩者都可以進行分布式擴容。
4.1 MQ模式
先談談我在藝龍促銷團隊接觸的第一種中心化架構。
調度中心依賴Quartz集群模式,當任務調度時候,發送消息到RabbitMQ 。業務應用收到任務消息后,消費任務信息。
這種模型充分利用了MQ解耦的特性,調度中心發送任務,應用方作為執行器的角色,接收任務並執行。
但這種設計強依賴消息隊列,可擴展性和功能,系統負載都和消息隊列有極大的關聯。這種架構設計需要架構師對消息隊列非常熟悉。
4.2 XXL-JOB
XXL-JOB 是一個分布式任務調度平台,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼並接入多家公司線上產品線,開箱即用。
我們重點剖析下架構圖 :
▍ 網絡通訊 server-worker 模型
調度中心和執行器 兩個模塊之間通訊是 server-worker 模式。調度中心本身就是一個SpringBoot 工程,啟動會監聽8080端口。
執行器啟動后,會啟動內置服務( EmbedServer )監聽9994端口。這樣雙方都可以給對方發送命令。
那調度中心如何知道執行器的地址信息呢 ?上圖中,執行器會定時發送注冊命令 ,這樣調度中心就可以獲取在線的執行器列表。
通過執行器列表,就可以根據任務配置的路由策略選擇節點執行任務。常見的路由策略有如下三種:
- 隨機節點執行:選擇集群中一個可用的執行節點執行調度任務。適用場景:離線訂單結算。
-
廣播執行:在集群中所有的執行節點分發調度任務並執行。適用場景:批量更新應用本地緩存。
-
分片執行:按照用戶自定義分片邏輯進行拆分,分發到集群中不同節點並行執行,提升資源利用效率。適用場景:海量日志統計。
▍ 調度器
調度器是任務調度系統里面非常核心的組件。XXL-JOB 的早期版本是依賴Quartz。
但在v2.1.0版本中完全去掉了Quartz的依賴,原來需要創建的 Quartz表也替換成了自研的表。
核心的調度類是:JobTriggerPoolHelper 。調用start方法后,會啟動兩個線程:scheduleThread 和 ringThread 。
首先 scheduleThread 會定時從數據庫加載需要調度的任務,這里從本質上還是基於數據庫行鎖保證同時只有一個調度中心節點觸發任務調度。
Connection conn = XxlJobAdminConfig.getAdminConfig()
.getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement(
"select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
# 觸發任務調度 (偽代碼)
for (XxlJobInfo jobInfo: scheduleList) {
// 省略代碼
}
# 事務提交
conn.commit();
調度線程會根據任務的「下次觸發時間」,采取不同的動作:
已過期的任務需要立刻執行的,直接放入線程池中觸發執行 ,五秒內需要執行的任務放到 ringData 對象里。
ringThread 啟動后,定時從 ringData 對象里獲取需要執行的任務列表 ,放入到線程池中觸發執行。
5 自研在巨人的肩膀上
2018年,我有一段自研任務調度系統的經歷。
背景是:兼容技術團隊自研的RPC框架,技術團隊不需要修改代碼,RPC注解方法可以托管在任務調度系統中,直接當做一個任務來執行。
自研過程中,研讀了XXL-JOB 源碼,同時從阿里雲分布式任務調度 SchedulerX 吸取了很多營養。
- Schedulerx-console 是任務調度的控制台,用於創建、管理定時任務。負責數據的創建、修改和查詢。在產品內部與 schedulerx-server 交互。
- Schedulerx-server 是任務調度的服務端,是 Scheduler的核心組件。負責客戶端任務的調度觸發以及任務執行狀態的監測。
- Schedulerx-client 是任務調度的客戶端。每個接入客戶端的應用進程就是一個的 Worker。 Worker 負責與 schedulerx-server 建立通信,讓 schedulerx-server發現客戶端的機器。 並向schedulerx-server注冊當前應用所在的分組,這樣 schedulerx-server 才能向客戶端定時觸發任務。
我們模仿了SchedulerX的模塊,架構設計如下圖:
我選擇了 RocketMQ 源碼的通訊模塊 remoting 作為自研調度系統的通訊框架。基於如下兩點:
-
我對業界大名鼎鼎的 Dubbo不熟悉,而remoting我已經做了多個輪子,我相信自己可以搞定;
-
在閱讀 SchedulerX 1.0 client 源碼中,發現 SchedulerX 的通訊框架和RocketMQ Remoting很多地方都很類似。它的源碼里有現成的工程實現,完全就是一個寶藏。
我將 RocketMQ remoting 模塊去掉名字服務代碼,做了一定程度的定制。
在RocketMQ的remoting里,服務端采用 Processor 模式。
調度中心需要注冊兩個處理器:回調結果處理器CallBackProcessor和心跳處理器HeartBeatProcessor 。執行器需要注冊觸發任務處理器TriggerTaskProcessor 。
public void registerProcessor(
int requestCode,
NettyRequestProcessor processor,
ExecutorService executor);
處理器的接口:
public interface NettyRequestProcessor {
RemotingCommand processRequest(
ChannelHandlerContext ctx,
RemotingCommand request) throws Exception;
boolean rejectRequest();
}
對於通訊框架來講,我並不需要關注通訊細節,只需要實現處理器接口即可。
以觸發任務處理器TriggerTaskProcessor舉例:
搞定網絡通訊后,調度器如何設計 ?最終我還是選擇了Quartz 集群模式。主要是基於以下幾點原因:
- 調度量不大的情況下 ,Quartz 集群模式足夠穩定,而且可以兼容原來的XXL-JOB任務;
- 使用時間輪的話,本身沒有足夠的實踐經驗,擔心出問題。 另外,如何讓任務通過不同的調度服務(schedule-server)觸發, 需要有一個協調器。於是想到Zookeeper。但這樣的話,又引入了新的組件。
- 研發周期不能太長,想快點出成果。
自研版的調度服務花費一個半月上線了。系統運行非常穩定,研發團隊接入也很順暢。 調度量也不大 ,四個月總共接近4000萬到5000萬之間的調度量。
坦率的講,自研版的瓶頸,我的腦海里經常能看到。 數據量大,我可以搞定分庫分表,但 Quartz 集群基於行級鎖的模式 ,注定上限不會太高。
為了解除心中的困惑,我寫一個輪子DEMO看看可否work:
- 去掉外置的注冊中心,調度服務(schedule-server)管理會話;
- 引入zookeeper,通過zk協調調度服務。但是HA機制很粗糙,相當於一個任務調度服務運行,另一個服務standby;
- Quartz 替換成時間輪 (參考Dubbo里的時間輪源碼)。
這個Demo版本在開發環境可以運行,但有很多細節需要優化,僅僅是個玩具,並沒有機會運行到生產環境。
最近讀阿里雲的一篇文章《如何通過任務調度實現百萬規則報警》,SchedulerX2.0 高可用架構見下圖:
文章提到:
每個應用都會做三備份,通過 zk 搶鎖,一主兩備,如果某台 Server 掛了,會進行 failover,由其他 Server 接管調度任務。
這次自研任務調度系統從架構來講,並不復雜,實現了XXL-JOB的核心功能,也兼容了技術團隊的RPC框架,但並沒有實現工作流以及mapreduce分片。
SchedulerX 在升級到2.0之后基於全新的Akka 架構,這種架構號稱實現高性能工作流引擎,實現進程間通信,減少網絡通訊代碼。
在我調研的開源任務調度系統中,PowerJob也是基於Akka 架構,同時也實現了工作流和MapReduce執行模式。
我對PowerJob非常感興趣,也會在學習實踐后輸出相關文章,敬請期待。
6 技術選型
首先我們將任務調度開源產品和商業產品 SchedulerX 放在一起,生成一張對照表:
Quartz 和 ElasticJob從本質上還是屬於框架的層面。
中心化產品從架構上來講更加清晰,調度層面更靈活,可以支持更復雜的調度(mapreduce動態分片,工作流)。
XXL-JOB 從產品層面已經做到極簡,開箱即用,調度模式可以滿足大部分研發團隊的需求。簡單易用 + 能打,所以非常受大家歡迎。
其實每個技術團隊的技術儲備不盡相同,面對的場景也不一樣,所以技術選型並不能一概而論。
不管是使用哪種技術,在編寫任務業務代碼時,還是需要注意兩點:
- 冪等。當任務被重復執行的時候,或者分布式鎖失效的時候,程序依然可以輸出正確的結果;
- 任務不跑了,千萬別驚慌。查看調度日志,JVM層面使用Jstack命令查看堆棧,網絡通訊要添加超時時間 ,一般能解決大部分問題。
7 寫到最后
2015年其實是非常有趣的一年。ElasticJob 和 XXL-JOB 這兩種不同流派的任務調度項目都開源了。
在 XXL-JOB 源碼里,至今還保留着許雪里老師在開源中國的一條動態截圖:
剛寫的任務調度框架 ,Web動態管理任務,實時生效,熱乎的。沒有意外的話,明天中午推送到git.osc上去。哈哈,下樓炒個面加個荷包蛋慶祝下。
看到這個截圖,內心深處竟然會有一種共情,嘴角不自禁的上揚。
我又想起:2016年,ElasticJob的作者張亮老師開源了sharding-jdbc 。我在github上創建了一個私有項目,參考sharding-jdbc的源碼,自己實現分庫分表的功能。第一個類名叫:ShardingDataSource,時間定格在 2016/3/29。
我不知道如何定義“有創造力的軟件工程師”,但我相信:一個有好奇心,努力學習,樂於分享,願意去幫助別人的工程師,運氣肯定不會太差。
覺得對您有幫助的話,請給作者一個「點贊」和「收藏」,我們下期見。