寫在前面
在實際工作中,很多小伙伴在開發定時任務時,會采取定時掃描數據表的方式實現。然而,這種方式存在着重大的缺陷:如果數據量大的話,頻繁的掃描數據表會對數據庫造成巨大的壓力;難以支撐大規模的分布式定時任務;難以支持精准的定時任務;大量浪費CPU的資源;掃描的數據大部分是不需要執行的任務。那么,既然定時掃描數據表存在這么多的弊端,那么,有沒有一種方式來解決這些問題呢?今天,冰河就帶着他的開源項目mykit-delay來了!!開源地址:https://github.com/sunshinelyz/mykit-delay
在使用框架過程中如有任何問題,都可以添加冰河微信【sun_shine_lyz】進行交流。
文章已收錄到https://github.com/sunshinelyz/technology-binghe
項目簡述
Mykit體系中提供的簡單、穩定、可擴展的延遲消息隊列框架,提供精准的定時任務和延遲隊列處理功能。
項目模塊說明
- mykit-delay-common: mykit-delay 延遲消息隊列框架通用工具模塊,提供全局通用的工具類
- mykit-delay-config: mykit-delay 延遲消息隊列框架通用配置模塊,提供全局配置
- mykit-delay-queue: mykit-delay 延遲消息隊列框架核心實現模塊,目前所有主要的功能都在此模塊實現
- mykit-delay-controller: mykit-delay 延遲消息隊列框架Restful接口實現模塊,對外提供Restful接口訪問,兼容各種語言調用
- mykit-delay-core: mykit-delay 延遲消息隊列框架的入口,整個框架的啟動程序在此模塊實現
- mykit-delay-test: mykit-delay 延遲消息隊列框架通用測試模塊,主要提供Junit單元測試用例
需求背景
- 用戶下訂單后未支付,30分鍾后支付超時
- 在某個時間點通知用戶參加系統活動
- 業務執行失敗之后隔10分鍾重試一次
類似的場景比較多 簡單的處理方式就是使用定時任務 假如數據比較多的時候 有的數據可能延遲比較嚴重,而且越來越多的定時業務導致任務調度很繁瑣不好管理。
隊列設計
整體架構設計如下圖所示。
開發前需要考慮的問題
- 及時性 消費端能按時收到
- 同一時間消息的消費權重
- 可靠性 消息不能出現沒有被消費掉的情況
- 可恢復 假如有其他情況 導致消息系統不可用了 至少能保證數據可以恢復
- 可撤回 因為是延遲消息 沒有到執行時間的消息支持可以取消消費
- 高可用 多實例 這里指HA/主備模式並不是多實例同時一起工作
- 消費端如何消費
當然初步選用redis作為數據緩存的主要原因是因為redis自身支持zset的數據結構(score 延遲時間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時間維度去判定執行的順序 同時也支持map list數據結構。
簡單定義一個消息數據結構
private String topic;/***topic**/
private String id;/***自動生成 全局惟一 snowflake**/
private String bizKey;
private long delay;/***延時毫秒數**/
private int priority;//優先級
private long ttl;/**消費端消費的ttl**/
private String body;/***消息體**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();
運行原理:
- 用Map來存儲元數據。id作為key,整個消息結構序列化(json/…)之后作為value,放入元消息池中。
- 將id放入其中(有N個)一個zset有序列表中,以createTime+delay+priority作為score。修改狀態為正在延遲中
- 使用timer實時監控zset有序列表中top 10的數據 。 如果數據score<=當前時間毫秒就取出來,根據topic重新放入一個新的可消費列表(list)中,在zset中刪除已經取出來的數據,並修改狀態為待消費
- 客戶端獲取數據只需要從可消費隊列中獲取就可以了。並且狀態必須為待消費 運行時間需要<=當前時間的 如果不滿足 重新放入zset列表中,修改狀態為正在延遲。如果滿足修改狀態為已消費。或者直接刪除元數據。
客戶端
因為涉及到不同程序語言的問題,所以當前默認支持http訪問方式。
- 添加延時消息添加成功之后返回消費唯一ID POST /push {…..消息體}
- 刪除延時消息 需要傳遞消息ID GET /delete?id=
- 恢復延時消息 GET /reStore?expire=true|false expire是否恢復已過期未執行的消息。
- 恢復單個延時消息 需要傳遞消息ID GET /reStore/id
- 獲取消息 需要長連接 GET /get/topic
用nginx暴露服務,配置為輪詢 在添加延遲消息的時候就可以流量平均分配。
目前系統中客戶端並沒有采用HTTP長連接的方式來消費消息,而是采用MQ的方式來消費數據這樣客戶端就可以不用關心延遲消息隊列。只需要在發送MQ的時候攔截一下 如果是延遲消息就用延遲消息系統處理。
消息可恢復
實現恢復的原理 正常情況下一般都是記錄日志,比如mysql的binlog等。
這里我們直接采用mysql數據庫作為記錄日志。
目前創建以下2張表:
- 消息表 字段包括整個消息體
- 消息流轉表 字段包括消息ID、變更狀態、變更時間、zset掃描線程Name、host/ip
定義zset掃描線程Name是為了更清楚的看到消息被分發到具體哪個zset中。前提是zset的key和監控zset的線程名稱要有點關系 這里也可以是zset key。
支持消息恢復
假如redis服務器宕機了,重啟之后發現數據也沒有了。所以這個恢復是很有必要的,只需要從表1也就是消息表中把消息狀態不等於已消費的數據全部重新分發到延遲隊列中去,然后同步一下狀態就可以了。
當然恢復單個任務也可以這么干。
數據表設計
這里,我就直接給出創建數據表的SQL語句。
DROP TABLE IF EXISTS `mykit_delay_queue_job`;
CREATE TABLE `mykit_delay_queue_job` (
`id` varchar(128) NOT NULL,
`bizkey` varchar(128) DEFAULT NULL,
`topic` varchar(128) DEFAULT NULL,
`subtopic` varchar(250) DEFAULT NULL,
`delay` bigint(20) DEFAULT NULL,
`create_time` bigint(20) DEFAULT NULL,
`body` text,
`status` int(11) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
`update_time` datetime(3) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),
KEY `mykit_delay_queue_job_STATUS` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for mykit_delay_queue_job_log
-- ----------------------------
DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;
CREATE TABLE `mykit_delay_queue_job_log` (
`id` varchar(128) NOT NULL,
`status` int(11) DEFAULT NULL,
`thread` varchar(60) DEFAULT NULL,
`update_time` datetime(3) DEFAULT NULL,
`host` varchar(128) DEFAULT NULL,
KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
關於高可用
分布式協調還是選用zookeeper。
如果有多個實例最多同時只能有1個實例工作 這樣就避免了分布式競爭鎖帶來的壞處,當然如果業務需要多個實例同時工作也是支持的,也就是一個消息最多只能有1個實例處理,可以選用zookeeper或者redis就能實現分布式鎖了。
最終做了一下測試多實例同時運行,可能因為會涉及到鎖的問題性能有所下降,反而單機效果很好。所以比較推薦基於docker的主備部署模式。
運行模式
- 支持 master,slave (HA)需要配置
mykit.delay.registry.serverList
zk集群地址列表 - 支持 cluster 會涉及到分布式鎖競爭 效果不是很明顯 分布式鎖采用
redis
的setNx
實現 - StandAlone
目前,經過測試,推薦使用master slave的模式,后期會優化Cluster模式
如何接入
為了提供一個統一的精准定時任務和延時隊列框架,mykit-delay提供了HTTP Rest接口供其他業務系統調用,接口使用簡單方便,只需要簡單的調用接口,傳遞相應的參數即可。
消息體
以JSON數據格式參數 目前只提供了http
協議
- body 業務消息體
- delay 延時毫秒 距
createTime
的間隔毫秒數 - id 任務ID 系統自動生成 任務創建成功返回
- status 狀態 默認不填寫
- topic 標題
- subtopic 保留字段
- ttl 保留字段
- createTime 創建任務時間 非必填 系統默認
添加任務
/push
POST application/json
{"body":"{hello world}","delay":10000,"id":"20","status":0,"topic":"ces","subtopic":"",ttl":12}
刪除任務
刪除任務 需要記錄一個JobId
/delete?jobId=xxx
GET
恢復單個任務
用於任務錯亂 腦裂情況 根據日志恢復任務
/reStoreJob?JobId=xxx
GET
恢復所有未完成的任務
根據日志恢復任務
/reStore?expire=true
GET
參數expire
表示是否需要恢復已過期還未執行的數據
清空隊列數據
根據日志中未完成的數據清空隊列中全部數據。清空之后 會刪除緩存中的所有任務
/clearAll
GET
客戶端獲取隊列方式
目前默認實現了RocketMQ
與ActiveMQ
的推送方式。依賴MQ的方式來實現延時框架與具體業務系統的耦合。
消息體中消息與RocketMQ
和 ActiveMQ
消息字段對應關系
mykit-delay | RocketMQ | ActiveMQ | 備注 | |
---|---|---|---|---|
topic | topic | topic | 點對點發送隊列名稱或者主題名稱 | |
subtopic | subtopic | subtopic | 點對點發送隊列子名稱或者主題子名稱 | |
body | 消息內容 | 消息內容 | 消息內容 |
關於系統配置
延遲框架與具體執行業務系統的交互方式通過延遲框架配置實現,具體配置文件位置為mykit-delay-config項目下的resources/properties/starter.properties
文件中。
測試
需要配置好數據庫地址和Redis的地址 如果不是單機模式 也需要配置好Zookeeper
運行mykit-delay-test模塊下的測試類io.mykit.delay.test.PushTest
添加任務到隊列中
啟動mykit-delay-test模塊下的io.mykit.delay.TestDelayQueue
消費前面添加數據 為了方便查詢效果 默認的消費方式是consoleCQ
控制台輸出
擴展
支持zset隊列個數可配置 避免大數據帶來高延遲的問題。
近期規划
- 分區(buck)支持動態設置
- redis與數據庫數據一致性的問題 (
重要
) - 實現自己的推拉機制
- 支持可切換實現方式 目前只是依賴Redis實現,后續待優化
- 支持Web控制台管理隊列
- 實現消息消費
TTL
機制
如果這款開源框架對你有幫助,請小伙伴們打開github鏈接:https://github.com/sunshinelyz/mykit-delay ,給個Star,讓更多的小伙伴看到,減輕工作中繁瑣的掃描數據表的定時任務開發。也希望能夠有越來越多的小伙伴參與這個開源項目,我們一起養肥它!!
好了,不早了,今天就到這兒吧,我是冰河,我們下期見!!
重磅福利
微信搜一搜【冰河技術】微信公眾號,關注這個有深度的程序員,每天閱讀超硬核技術干貨,公眾號內回復【PDF】有我准備的一線大廠面試資料和我原創的超硬核PDF技術文檔,以及我為大家精心准備的多套簡歷模板(不斷更新中),希望大家都能找到心儀的工作,學習是一條時而郁郁寡歡,時而開懷大笑的路,加油。如果你通過努力成功進入到了心儀的公司,一定不要懈怠放松,職場成長和新技術學習一樣,不進則退。如果有幸我們江湖再見!
另外,我開源的各個PDF,后續我都會持續更新和維護,感謝大家長期以來對冰河的支持!!