如何構建延遲任務調度系統


一、需求目標

1.需求描述
之前筆者接觸過一些營銷業務場景,比如說:

用戶注冊未登錄過APP第二天早上10點發一條營銷短信促活
紅包過期前兩天短信通知,下午16:00發送
等等定時任務處理業務。
采用的技術方案是定時任務掃數據匯總表,分頁讀取一定數量然后處理
然而隨着業務的發展,業務多元化,遇到了以下場景:

拼團砍價活動過期前半小時提醒
訂單提交半小時內沒有完成支付,訂單自動取消,庫存退還
用戶幾天內沒有操作過系統,發放激活短信
以上場景處理時間不是固定的某個點,而是業務發生的時間推遲一段時間,針對以上的業務場景,我們考慮可以根據不同業務建表,然后每隔一段時間去定時掃表,各自處理業務。
但是隨着業務增加,表泛濫,而且此類業務其實有很多相同的地方,那么我們可以考慮把相同邏輯抽離出來,利用延遲隊列來處理任務

2.延時隊列設計目標
可靠性:任務進入延時隊列之后,必須被執行一次
高可用性:支持多實例部署
實時性:允許一定時間誤差,當然誤差越小越好
可管理:支持消息刪除
高性能:數據量大的情況下也能保證高性能

二、技術調研

延時隊列實現的幾種方式
java.util.Timer + java.util.TimerTask
java.util.concurrent.ScheduledExecutorService
Quartz
java.util.concurrent.DelayQueue
數據庫輪詢
redis過期鍵通知
rocketMQ中的延時隊列
1. Timer+TimerTask
使用 Timer 實現任務調度的核心類是 Timer 和 TimerTask。其中 Timer 負責設定 TimerTask 的起始與間隔執行時間。使用者只需要創建一個 TimerTask 的繼承類,實現自己的 run 方法,然后將其丟給 Timer 去執行即可

Timer 的設計核心是一個 TaskList 和一個 TaskThread。Timer 將接收到的任務丟到自己的 TaskList 中,TaskList 按照 Task 的最初執行時間進行排序。TimerThread 在創建 Timer 時會啟動成為一個守護線程。這個線程會輪詢所有任務,找到一個最近要執行的任務,然后休眠,當到達最近要執行任務的開始時間點,TimerThread 被喚醒並執行該任務。之后 TimerThread 更新最近一個要執行的任務,繼續休眠。

實現思想:應用維護一個全局的Timer調度器,延時任務實現TimerTask,run方法中實現邏輯。計算好具體的延遲執行時間,交給Timer去調度。

選型評估:簡單易用,但是缺點較多,單線程調度,所有任務都是串行的,性能低,前一個任務的延遲或異常都將會影響到之后的任務,影響實時性,同時也不具備延時隊列的幾點能力

3.2 ScheduledExecutorService
基於Timer的缺陷,JDK5推出了基於線程池設計的 ScheduledExecutor,原理是
每一個被調度的任務都會由線程池中一個線程去執行,因此任務是並發執行的,相互之間不會受到干擾。需要注意的是,只有當任務的執行時間到來時,ScheduedExecutor 才會真正啟動一個線程,其余時間 ScheduledExecutor 都是在輪詢任務的狀態。

選型評估:引入多線程,解決了Timer的一些缺點,但是只適合單機,分布式環境不支持。也不具備延時隊列的幾點能力,需要考慮跟別的技術結合使用評估是否可以滿足延時隊列的能力。

3.3 Quartz
Quartz是個輕量級的任務調度框架,可以跟多個應用集成,並且具有容錯機制,重啟服務的時候內存中丟失的任務可以被持久化

選型評估:

Quartz滿足了我們需要的延時隊列的可靠性: 持久化任務,避免了服務重啟的時候內存中的任務丟失,高可用:執行任務的節點掛了,另外的節點會繼續執行
集群分布式並發環境中使用QUARTZ定時任務調度,會在各個節點會上報任務,存到數據庫中,執行時會從數據庫中取出觸發器來執行,如果觸發器的名稱和執行時間相同,則只有一個節點去執行此任務,Quartz的任務觸發只能在單個節點運行,其它節點不執行任務,性能低,浪費資源
3.4 DelayQueue
DelayQueue 是一個支持延時獲取元素的阻塞隊列, 內部采用優先隊列 PriorityQueue 存儲元素,同時元素必須實現 Delayed 接口;在創建元素時可以指定多久才可以從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素。

選型評估:

效率高,任務觸發時間延遲低
適用於單機,需要結合其他技術運用,
數據是保存在內存,需要自己實現持久化
不具備分布式能力,需要自己實現高可用
3.5 數據庫輪詢
每隔一段時間去查詢數據庫,處理好的記錄標記狀態

選型評估:定期輪詢數據量大的時候會消耗太多IO資源,效率低

3.6 redis過期鍵通知
需要DBA做一些額外的配置,開啟這個功能

選型評估:Redis的發布/訂閱目前是即發即棄(fire and forget)模式的,因此無法實現事件的可靠通知。如果發布/訂閱的客戶端斷鏈之后又重連,則在客戶端斷鏈期間的所有事件都丟失了

7. rocketMQ中的延時隊列
選型評估:rocketMQ中消息延遲時間為固定時間段:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,粒度不夠,不能很好支持業務

總結
延時隊列的技術點還有很多,比如說時間輪之類的方案,要滿足延時隊列的幾點特性,實現高可用,可靠性,我們需要結合多個技術去實現。

三、架構設計

功能設計
系統功能:
延遲任務調度系統提供統一的任務操作接口給業務方調用,業務方可以提交任務,取消任務,查詢任務狀態。
調度服務屬於底層應用,因此采用MQ的方式解耦,所有觸發的延遲任務都通過消息的方式發送給業務消費方,
由消費方控制流量,業務冪等。同時也保證了任務的重試機制。

采用技術:elastic-job + db + delayQueue + mq

整體架構

 

 

業務調用方

業務方在需要延遲任務的時候調用延遲任務服務操作任務
觸發的延遲任務會放到MQ消息隊列里面,由業務方自行消費
業務方消費消息處理完成之后,調用延遲任務服務通知處理結果
延遲任務節點

以dubbo方式提供延遲任務接口供業務方操作,用於添加延遲任務,取消任務,反饋任務處理結果。
集成elastic-job提供數據分片功能,每個節點按照對應分片從數據庫加載即將觸發的延遲任務放到內存中
任務調度觸發的延遲任務發送到MQ消息隊列中
接收業務調用的延遲消息處理結果反饋
Zookeeper

elastic-job注冊中心,存儲作業信息
elastic-job

高可用的分布式任務調度系統
注冊任務實例信息和分片信息到zk上
數據分片

elastic-job作業數據分片
節點添加/刪除,主節點選舉,重新分片
任務加載作業

由elastic-job實現,使用數據分片功能,提升系統總吞吐量
將未來N分鍾內要觸發的任務加載到內存中
任務在內存中的存儲和調度

任務加載作業將未來N分鍾內觸發的任務加載到內存隊列DelayQueue
任務調度依靠DelayQueue精確觸發
數據庫

延遲任務持久化,存儲任務數據
延遲任務狀態

    INIT(1, "初始化"),
    LOAD(2, "任務已加載"),
    SENDING(3, "消息已發放"),
    SUCCESS(4, "業務處理成功"),
    FAIL(5, "業務處理失敗"),
    CANCEL(6, "業務取消");

 


數據庫設計

 

 

CREATE TABLE `delay_task` (
  `delay_task_id` bigint(20) NOT NULL COMMENT '任務ID',
  `sharding_id` tinyint(4) NOT NULL COMMENT '分片ID',
  `topic` varchar(100) NOT NULL COMMENT '消息topic',
  `tag` varchar(100) NOT NULL COMMENT '消息tag',
  `params` varchar(1000) NOT NULL COMMENT '參數',
  `trigger_time` bigint(19) NOT NULL COMMENT '執行時間',
  `status` tinyint(4) NOT NULL COMMENT '任務狀態:1.初始化 2.任務已加載 3.消息已發放 4.業務處理成功 5.業務處理失敗',
  `extend_field` varchar(100) NOT NULL COMMENT '擴展屬性',
  `create_time` bigint(20) NOT NULL COMMENT '創建時間',
  `op_time` bigint(20) NOT NULL COMMENT '最近一次更新時間',
  `last_ver` int(10) NOT NULL COMMENT '版本號',
  `is_valid` tinyint(2) NOT NULL DEFAULT '1' COMMENT '是否有效 0-失效 1-有效',
  PRIMARY KEY (`delay_task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='延遲任務表'

數據庫設計就一張表delay_task,用來存儲延遲任務的數據,包括業務方要消費的消息的tag,topic,以及消息體內容

源碼:https://github.com/caisl/delay-task

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM