1、LTS初步認識
1.1、lts是什么?
LTS(light-task-scheduler)是一個分布式任務調度框架,支持設置節點組搭建多節點集群水平拓展框架架構,提供多種任務類型包括實時任務、定時任務、corn任務的支持, 主要用於系統中的任務調度以及調度信息監控等
1.2、lts能夠解決什么問題?
- 支持分布式,避免單點故障,支持動態擴容,容錯重試
- Spring擴展支持,SpringBoot支持,Spring Quartz Cron任務的無縫接入支持
- 節點監控、任務執行狀態監控、jvm等信息監控
- 多節點任務單一執行、故障轉移
2、LTS架構說明
2.1、lts節點類型
- JobClient:主要負責提交任務, 並接收任務執行反饋結果。
- JobTracker:負責接收並分配任務,任務調度。
- TaskTracker:負責執行任務,執行完反饋給JobTracker。
- LTS-Admin:(管理后台)主要負責節點管理,任務隊列管理,監控管理等。
2.2、注冊中心支持
- zookeeper
- redis
2.3、節點通信方式
- netty
2.4、任務隊列和日志存儲
- mysql
- mongodb
2.5、任務類型
- 實時任務:提交了之后立即就要執行的任務。
- 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
- Cron任務:CronExpression,和quartz類似(但是不是使用quartz實現的)譬如 0 0/1 * ?
2.6、官方架構圖
2.7、任務執行流程圖
3、LTS簡單實戰示例
3.1、JobClient提交示例
@Before public void before(){ job = new Job(); job.setTaskId(randomTaskId); job.setParam("shardValue", shardValue); job.setSubmitNodeGroup(nodeGroup); job.setTaskTrackerNodeGroup("taskTracker_node_group"); job.setNeedFeedback(true); // 當任務隊列中存在這個任務的時候,是否替換更新 job.setReplaceOnExist(true); } /** 提交實時job */ @Test public void submitRealTimeJob(){ response = jobClient.submitJob(job); } /** 提交corn任務 */ @Test public void submitCronJob() { // corn表達式, 每5s執行一次 job.setCronExpression("0/5 * * * * ? "); response = jobClient.submitJob(job); } /** 提交重復任務 */ @Test public void submitRepeatJob() { // 一共執行5次 job.setRepeatCount(5); // 10s執行一次 job.setRepeatInterval(10 * 1000L); response = jobClient.submitJob(job); } /** 提交定時觸發任務 */ @Test public void submitTriggerTimeJob() { // 1分鍾后執行 job.setTriggerTime(DateUtils.addMinute(new Date(), 1).getTime()); Response response = jobClient.submitJob(job); System.out.println(response); }
3.2、TaskTracker job定義說明
@LTS public class JobApiScheduler { public static final Logger logger = Logger.getLogger(JobApiScheduler.class); @JobRunnerItem(shardValue = "apiTestJob") public Result apiTestJob(JobContext jobContext){ try { logger.info(">>>>>>>>> apiTestJob run..."); BizLogger bizLogger = jobContext.getBizLogger(); // 會發送到 LTS (JobTracker上) bizLogger.info("run apiTestJob success"); } catch (Exception e) { logger.info(">>>>>>>>> run apiTestJob failed!", e); return new Result(Action.EXECUTE_LATER, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "success"); } }
3.3、web控制台功能演示
3.4、多節點任務單一執行
- 啟動多個TaskTracker節點, 發起一個任務查看是否只有一個TaskTracker執行
3.5、故障轉移
- 一個TaskTracker節點掛掉后, 重新選舉Master節點, 其它任務負載在未故障的節點
3.6、failStore
- TaskTracker節點掛掉后未能成功接收到JobTracker發送的任務, 當TaskTracker重新上線后恢復執行
3.7、用戶自定義參數
@LTS public class JobParamScheduler { public static final Logger logger = Logger.getLogger(JobParamScheduler.class); @JobRunnerItem(shardValue = "paramJob") public Result paramJob(JobContext jobContext){ try{ logger.info(">>>>>>>>> paramJob run..."); // 獲取用戶輸入參數 Map<String, String> extParams = jobContext.getJob().getExtParams(); Integer orgId = Integer.parseInt(extParams.get("orgId").toString()); logger.info(String.format("開始租戶%s相關任務處理...")); BizLogger bizLogger = jobContext.getBizLogger(); // 會發送到 LTS (JobTracker上) bizLogger.info("run paramJob success"); }catch(Exception e) { logger.info(">>>>>>>>> run paramJob failed!", e); return new Result(Action.EXECUTE_LATER, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "success"); } }
4、原理分析, 相關類介紹
4.1、配置常量
- com.github.ltsopensource.core.constant.ExtConfig
4.2、mysql相關
- com.github.ltsopensource.monitor.access.mysql.MysqlAbstractJdbcAccess
- com.github.ltsopensource.monitor.access.mysql.MysqlJobClientMAccess
- com.github.ltsopensource.monitor.access.mysql.MysqlJobTrackerMAccess
- com.github.ltsopensource.monitor.access.mysql.MysqlTaskTrackerMAccess
4.3、任務分發
- com.github.ltsopensource.spring.tasktracker.JobDispatcher
4.4、任務處理器
- com.github.ltsopensource.jobtracker.support.JobReceiver
4.5、負載均衡
- com.github.ltsopensource.core.loadbalance.LoadBalance
- com.github.ltsopensource.core.loadbalance.AbstractLoadBalance
4.6、任務拉取
- com.github.ltsopensource.tasktracker.support.JobPullMachine
4.7、任務推送
- com.github.ltsopensource.jobtracker.processor.JobPullProcessor
5、任務執行結果
LTS框架提供四種執行結果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,並對每種結果采取相應的處理機制,譬如重試。
- EXECUTE_SUCCESS: 執行成功,這種情況,直接反饋客戶端(如果任務被設置了要反饋給客戶端)。
- EXECUTE_FAILED:執行失敗,這種情況,直接反饋給客戶端,不進行重試。
- EXECUTE_LATER:稍后執行(需要重試),這種情況,不反饋客戶端,重試策略采用30s的策略,默認最大重試次數為10次,用戶可以通過參數設置修改這些參數。
- EXECUTE_EXCEPTION:執行異常, 這中情況也會重試(重試策略,同上)
6、任務調度方案比較
主要根據LTS支持的幾種任務(實時任務、定時任務、Cron任務,Repeat任務)和其他一些開源框架在應用場景上做比較
實時任務,實時執行
這種場景下,當任務量比較小的時候,單機都可以完成的時候.自己采用線程池或者去輪訓數據庫取任務的方式(或者其他方式)就可以解決 · 但如果是任務執行時間比較長或者任務量比較大,單機不足以滿足需求的時候,就需要自己去做分布式的功能,還有很重要的是,怎么做容錯,怎么保證同一個任務只被一個節點執行,怎么解決執行失敗異常的情形等等,你就需要自己去做很多事情,頭可能就大了。這時候 LTS 就派上用場了.因為這些問題 LTS 都幫你解決了,你只需關注你的業務邏輯,而不用為上面的這些事情而煩惱。當然這時候有人可能會想到如果用 MQ 去解決,利用 MQ 的異步去解耦,也同時可以實現分布還有容錯等。當然有時候是可以的,為什么說是可以的呢,因為 LTS 的架構也和 MQ 的類似, JobClient 相當於 MQ 的 Producer , JobTracker 相當於 MQ 的 Broker , TaskTracker 相當於 MQ 的 Consumer ,經過我這么一說,是不是覺得貌似是很像哈。但是我又為什么說是有時候是可以的呢,而不是一定是可以的呢,因為如果你同一個任務(消息)提交 MQ 兩次. MQ 隊列中有兩條同樣的任務消息,那么當你這個任務不能有兩個節點同時執行的時候(同時執行一個任務可能出現各種問題) , MQ 就不能滿足了,因為他不知道你這兩條消息是同一個任務,它會把這兩條消息可能會發給兩個不同的節點同時執行(或者同一個節點的不同線程去執行),這時候你就需要自己去做一些事情去保證同一個任務不能同時被兩個線程(或節點)去執行問題,這時候頭又大了,那么 LTS 又派上用場了,以為 LTS 就可以保證這一點。說到任務調度.很多人一下就想到了 QuartZ ,對於這種實時任務的情況. QuartZ 是不太適合的,它也不能很好的解決故障轉移(譬如執行中的節點突然停掉了, QuartZ 不能將這個執行中的任務立馬分配給其他節點執行,最多設置了 QuartZ 的可恢復性,在停掉的節點重啟之后重新執行該任務.但如果這個節點再也不啟動起來了呢?那就只能呵呵了)等問題,這類場景 QuartZ 就不做比較了。有些人可能問,說了這么多,你倒是舉個例子呀。嗯,我舉幾個例子: 1 .發短信驗證碼,這種可以用 MQ 去實現,也可以單機去實現(如果你量不大的話),當然 LTS 也是可以的.如果你量非常非常大的話,建議還是用性能比較好的 MQ 代替 2 .實時的給在線用戶算數據,觸發者是用戶自己(自己手動點),但是算任務的只能同時由一個線程去執行,這是就可以用 LTS 了
定時任務
某個時間點觸發這種場景下,和實時任務相比,只有一個不同,就是要指定一個時間點去執行,可能是 1 個小時之后,可能是 1 天之后.也可能是 1 天,小時之后。有些人可能用輪訓的業務數據庫的方式去解決,輪訓業務數據庫有什么問題呢.當然你數據量很小我就不說了。如果你數據量還比較大的情況下,輪訓數據庫,勢必會影響業務查詢,如果有其他業務查詢的話。還有就是對於分布式的支持不是很好,還有當表中存在多種不同執行(延遲)時間的任務,這個輪訓頻率就比較關鍵了,太短,影響性能,太長,影響業務,執行不及時.導致任務執行延遲太久,等等。這時候如果用MQ ,雖然有些 MQ 支持延遲隊列 (RabbitMQ , RocketMQ 等).但他們都是指定的一些特定的 Level 級別延遲,但是不支持任意時間精度.譬如, 1 min , 5 min . 10 min 等等,但如果是 7 分鍾,或者 20 天之后呢。如果 MQ 支持任意時間精度,那么它的性能就只能呵呵了,這種情況 MQ 就排除了,但是如果 MQ 的這些特定的 Level 剛好滿足你的需求,那么選 MQ 也是可以的。再說說 Quartz吧, Quartz 可以支持定時任務.支持某個時間點觸發,也支持集群,它在架構上是分布式的,沒有負責幾種管理的節點。 Quartz 是通過數據庫行級鎖的方式實現多線程之間任務爭用的問題。行鎖有嘟些特點呢,開銷大,加鎖慢,會出現死鎖,並發度相比表級鎖,頁級鎖高一點。但是在任務量比較大的時候,並發度較大的時候,行級鎖就顯得比較吃力了,而且很容易發生死鎖。那么 LTS 是怎么解決並發性的問題的呢, LTS 采用預加載和樂觀鎖的方式,批量的將部分要執行的任務預加載到內存中,所以在取任務的時候只需要兩步: 1 .從內存中取到一個任務,當然內存中保證同一個線程拿到同一個任務是很容易的也是很高效的, 2 .將拿到的這個任務對應的數據庫記錄鎖住,那么這里采用樂觀鎖, CAS 的方式去修改記錄(如果任務己經被別的節點拿走了,那么重新執行 1 , 2 步,這種己經被別的節點拿走的情況,主要是在多個 JobTracker 的情形下,單個 JobTracker 不會出現這種情況,但是在多個 JobTracker 下,內存中的預加載數據采用不同步長的方式來減小兩個 JobTracker 內存中數據重復的概率,很好的解決了這個問題,這里稍微提下 》 ,所以這個時候LTS相對於QuartZ 的優勢一下就體現出來了。還有就是上面說的 Quartz 對故障轉移做的不是很好。還有就是當 QuartZ 對應的 MySQL 數據庫掛了,這時候問題就來了客戶端提交的任務提交不成功了,那么有人會想將這些數據保存在內存中,等 MySOL 重啟起來了再重試提交,那么如果你當前節點也掛了呢,你內存中的數據就會全部丟失了.所以這時候你需要自己額外的去做一些失敗任務本地持久化的工作.不過如果你用LTS的話, LTS 支持Failstore ,任務提交失敗了,自動幫你本地持久化,然后待 JobTracker 可用的時候重試,不管你是 JobTracker 掛了,還是 JobTracker 對應的數據庫掛了,都是 ok 的。舉個例子吧,在一個小時之后給某些用戶發短信,或者當用戶點擊退款操作之后,從點擊退貨的這個時間點開始, n 天后將這個退款關閉
cron、repeat任務
這種場景下,和定時任務相比,不一樣的地方,就只有.這個是會重復執行的,相當於重復執行的定時任務。所以這種場景下的對比,可以繼續考照定時任務的對比。 LTS在這種場景下提供的特性有,提供統一的后台監控和后台管理。當某次定時任務執行失敗,會執行重試操作,並提供執行日志
模塊說明
開發步驟
I、編寫Java代碼
@LTS public class SampleJob { @JobRunnerItem(shardValue = "sampleJob") public Result sampleJob(JobContext jobContext){ try { BizLogger bizLogger = jobContext.getBizLogger(); // 會發送到 LTS (JobTracker上) bizLogger.info(jobContext.getJob().getExtParams().get("shardValue") + " execute completed!"); } catch (Exception e) { return new Result(Action.EXECUTE_LATER, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "success"); } }
- @LTS用來標注該類為一個job類, 只有添加了該注解, 在這個類下面的job方法才會被掃描到
- @JobRunnerItem用來指定一個job方法, 需要通過該注解的shardValue屬性來指定該job方法的ID, 后續添加任務會用到, 需保證唯一性
- 通過com.github.ltsopensource.tasktracker.runner.JobContext可以獲取添加任務時指定的任務參數, 也可以獲取BizLogger對象, 用來打印日志, 只有通過該類打印的日志才會被反饋到jobTracker去, 並被記錄到job日志信息去
- 方法返回信息請參考如上寫法