任務管理界面 (WEB)
支持在 WEB 界面
中對任務進行管理,例如:新增任務、編輯任務、啟用/禁用任務、手動執行任務 等。
任務的屬性包括:
- 任務名稱
- 執行方式
SHELL
HTTP
- 表達式(*/5 * * * *)
- 命令
- 超時時間(秒)
- 重試次數
- 重試間隔(秒)
- 執行結束是否通知
- 不通知
- 失敗通知
- 結束通知
- 結果關鍵字匹配通知
- 狀態
- 備注
當執行方式為 HTTP
時,支持選擇請求方式 GET
或 POST
;
當設置執行結束通知時,支持選擇通知方式 郵件 或 Webhook
;
當設置郵件通知時,支持輸入郵箱地址多個用,分割;
當設置結果關鍵字匹配通知時,支持輸入關鍵字多個用,分割;
任務增加完成后,會把任務數據持久化到 MySQL
中。
任務調度器
參考了兩個開源組件:
最終選擇使用 jakecoffman/cron ,后者是在前者的基礎上做了一定的補充,例如 AddFunc()
增加了 name
參數,同時還增加了 RemoveJob(name string)
支持刪除特定的任務。
// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func(), name string) {
c.AddJob(spec, FuncJob(cmd), name)
}
...
// RemoveJob removes a Job from the Cron based on name.
func (c *Cron) RemoveJob(name string) {
if !c.running {
i := c.entries.pos(name)
if i == -1 {
return
}
c.entries = c.entries[:i+copy(c.entries[i:], c.entries[i+1:])]
return
}
c.remove <- name
}
對其簡單封裝下就可以使用了,下面是封裝的方法,方法的具體實現與使用從 go-gin-api 中獲取。
type Server interface {
i()
// Start 啟動 cron 服務
Start()
// Stop 停止 cron 服務
Stop()
// AddTask 增加定時任務
AddTask(task *cron_task_repo.CronTask)
// RemoveTask 刪除定時任務
RemoveTask(taskId int)
// AddJob 增加定時任務執行的工作內容
AddJob(task *cron_task_repo.CronTask) cron.FuncJob
}
當調用 Start()
啟動服務時,會把 MySQL
中的任務列表加載到調度器中。
通過以上方法,當從 WEB 界面
操作 新增、編輯、啟用/禁用、手動執行任務時,可以動態的對調度器中的任務進行管理。
任務執行器
任務執行器指的是任務真實執行所在的機器。
我的思路是使用 Kafka
的發布與訂閱功能,當調度器發現需要執行的任務時,將任務信息寫到 Kafka
的 Topic
中,任務執行器訂閱相關的 Topic
獲取任務信息然后執行任務。
如果任務的執行方式為 HTTP
,那么任務執行器可以為一組集群,專門處理調用 HTTP
任務,這里可以為一個消費組(Consumer Group
),也可適具體場景而定。
如果任務的執行方式為 SHELL
,那么任務執行器必須在腳本所在的宿主機上,這里可以為一個具體任務的消費者。
如果任務量過多,可以考慮根據業務場景多設置幾個 Topic
。
在項目中為了便於演示,不寫入到 Kafka
中,僅記錄了日志。
func (s *server) AddJob(task *cron_task_repo.CronTask) cron.FuncJob {
return func() {
s.taskCount.Add()
defer s.taskCount.Done()
msg := fmt.Sprintf("開始執行任務:(%d)%s [%s]", task.Id, task.Name, task.Spec)
s.logger.Info(msg)
}
}
日志目錄:/logs/go-gin-api-cron.log
。
小結
本文純屬拋磚引玉,有問題,歡迎批評指正。
go-gin-api 項目安裝簡單,開箱即用,創建一個后台任務試試吧。