前段時間,公司要改造現有的單節點調度為分布式任務調度,然后就研究了目前市面上主流的開源分布式任務調度框架,用起來就一個感覺:麻煩!特別是之前在一個類里寫了好多個調度任務,改造起來更加麻煩。我這人又比較懶,總感覺用了別人寫好的工具還要改一大堆,心里就有點不舒服。於是我就想自己寫一個框架,畢竟自己覺得分布式任務調度在所有分布式系統中是最簡單的,因為一般公司任務調度本身不可能同時調度海量的任務,很大的並發,改造成分布式主要還是為了分散任務到多個節點,以便同一時間處理更多的任務。后面有一天,我在公司前台取快遞,看到這樣一個現象:我們好幾個同事(包括我)在前台那從頭到尾看快遞是不是自己的,是自己的就取走,不是就忽略,然后我就收到了啟發。這個場景類比到分布式調度系統中,我們可以認為是快遞公司或者快遞員已經把每個快遞按照我們名字電話分好了快遞,我們只需要取走自己的就行了。但是從另外一個角度看,也可以理解成我們每個人都是從頭到尾看了所有快遞,然后按照某種約定的規則,如果是自己的快遞就拿走,不是自己的就忽略繼續看下一個。如果把快遞想象成任務,一堆人去拿一堆快遞也可以很順利的拿到各自的快遞,那么一堆節點自己去取任務是不是也可以很好的處理各自的任務呢?
傳統的分布式任務調度都有一個調度中心,這個調度中心也都要部署稱多節點的集群,避免單點故障,然后還有一堆執行器,執行器負責執行調度中心分發的任務。按照上面的啟發,我的思路是放棄中心式的調度中心直接由各個執行器節點去公共的地方按照約定的規則去取任務,然后執行。設計示意圖如下
有人可能懷疑那任務db庫不是有單點問題嗎,我想反問下,難道其他的分布式任務調度框架沒有這個問題嗎?針對數據庫單點我們可以單獨類似業務庫那樣考慮高可用方案,這里不是這篇文章的討論重點。很明顯我們重點放在執行節點那里到底怎么保證高可用,單個任務不會被多個節點同時執行,單個節點執行到一半突然失聯了,這個任務怎么辦等復雜的問題。后續我們使用未經修飾的代碼的方式一一解決這個問題(未經修飾主要是沒有優化結構流水賬式的代碼風格,主要是很多人包括我自己看別人源碼時總是感覺暈頭轉向的,仿佛置身迷宮般,看起來特別費勁,可能是我自己境界未到吧)
既然省略了集中式的調度,那么既然叫任務調度很明顯必須要有調度的過程,不然多個節點去搶一個任務怎么避免沖突呢?我這里解決方式是:首先先明確一個任務的幾種狀態:待執行,執行中,有異常,已完成。每個節點起一個線程一直去查很快就要開始執行的待執行任務,然后遍歷這些任務,使用樂觀鎖的方式先更新這個任務的版本號(版本號+1)和狀態(變成執行中),如果更新成功就放入節點自己的延時隊列中等待執行。由於每個節點的線程都是去數據庫查待執行的任務,很明顯變成執行中的任務下次就不會被其他節點再查詢到了,至於對於那些在本節點更新狀態之前就查到的待執行任務也會經過樂觀鎖嘗試后更新失敗從而跳過這個任務,這樣就可以避免一個任務同時被多個節點重復執行。關鍵代碼如下:
package com.rdpaas.task.scheduler; import com.rdpaas.task.common.*; import com.rdpaas.task.config.EasyJobConfig; import com.rdpaas.task.repository.NodeRepository; import com.rdpaas.task.repository.TaskRepository; import com.rdpaas.task.strategy.Strategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * 任務調度器 * @author rongdi * @date 2019-03-13 21:15 */ @Component public class TaskExecutor { private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class); @Autowired private TaskRepository taskRepository; @Autowired private NodeRepository nodeRepository; @Autowired private EasyJobConfig config;/** * 創建任務到期延時隊列 */ private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>(); /** * 可以明確知道最多只會運行2個線程,直接使用系統自帶工具就可以了 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2); /** * 聲明工作線程池 */ private ThreadPoolExecutor workerPool; @PostConstruct public void init() { /** * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,並且等待隊列滿后 * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閑60s后自動回收.自定義線程池是因為Executors那幾個線程工具 * 各有各的弊端,不適合生產使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 執行待處理任務加載線程 */ bossPool.execute(new Loader()); /** * 執行任務調度線程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 查找還有指定時間(單位秒)開始的主任務列表 */ List<Task> tasks = taskRepository.listPeddingTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * 使用樂觀鎖嘗試更新狀態,如果更新成功,其他節點就不會更新成功。如果在查詢待執行任務列表 * 和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不一樣了,這里更新 * 必然會返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封裝成延時對象放入延時隊列 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } } class Boss implements Runnable { @Override public void run() { for (;;) { try { /** * 時間到了就可以從延時隊列拿出任務對象,然后交給worker線程池去執行 */ DelayItem<Task> item = taskQueue.take(); if(item != null && item.getItem() != null) { Task task = item.getItem(); workerPool.execute(new Worker(task)); } } catch (Exception e) { logger.error("fetch task failed,cause by:{}", e); } } } } class Worker implements Runnable { private Task task; public Worker(Task task) { this.task = task; } @Override public void run() { logger.info("Begin to execute task:{}",task.getId()); TaskDetail detail = null; try { //開始任務 detail = taskRepository.start(task); if(detail == null) return; //執行任務 task.getInvokor().invoke(); //完成任務 finish(task,detail); logger.info("finished execute task:{}",task.getId()); } catch (Exception e) { logger.error("execute task:{} error,cause by:{}",task.getId(), e); try { taskRepository.fail(task,detail,e.getCause().getMessage()); } catch(Exception e1) { logger.error("fail task:{} error,cause by:{}",task.getId(), e); } } } } /** * 完成子任務,如果父任務失敗了,子任務不會執行 * @param task * @param detail * @throws Exception */ private void finish(Task task,TaskDetail detail) throws Exception { //查看是否有子類任務 List<Task> childTasks = taskRepository.getChilds(task.getId()); if(childTasks == null || childTasks.isEmpty()) { //當沒有子任務時完成父任務 taskRepository.finish(task,detail); return; } else { for (Task childTask : childTasks) { //開始任務 TaskDetail childDetail = null; try { //將子任務狀態改成執行中 childTask.setStatus(TaskStatus.DOING); childTask.setNodeId(config.getNodeId()); //開始子任務 childDetail = taskRepository.startChild(childTask,detail); //使用樂觀鎖更新下狀態,不然這里可能和恢復線程產生並發問題 int n = taskRepository.updateWithVersion(childTask); if (n > 0) { //再從數據庫取一下,避免上面update修改后version不同步 childTask = taskRepository.get(childTask.getId()); //執行子任務 childTask.getInvokor().invoke(); //完成子任務 finish(childTask, childDetail); } } catch (Exception e) { logger.error("execute child task error,cause by:{}", e); try { taskRepository.fail(childTask, childDetail, e.getCause().getMessage()); } catch (Exception e1) { logger.error("fail child task error,cause by:{}", e); } } } /** * 當有子任務時完成子任務后再完成父任務 */ taskRepository.finish(task,detail); } } }
如上所述,可以保證一個任務同一個時間只會被一個節點調度執行。這時候如果部署多個節點,正常應該可以很順利的將任務庫中的任務都執行到,就像一堆人去前台取快遞一樣,可以很順利的拿走所有快遞。畢竟對於每個快遞不是自己的就是其他人的,自己的快遞也不會是其他人的。但是這里的調度和取快遞有一點不一樣,取快遞的每個人都知道怎么去區分到底哪個快遞是自己的。這里的調度完全沒這個概念,完全是哪個節點運氣好使用樂觀鎖更新了這個任務狀態就是哪個節點的。總的來說區別就是需要一個約定的規則,快遞是不是自己的,直接看快遞上的名字和手機號碼就知道了。任務到底該不該自己執行我們也可以出一個這種規則,明確哪些任務那些應該是哪些節點可以執行,從而避免無謂的鎖競爭。這里可以借鑒負載均衡的那些策略,目前我想實現如下規則:
1) id_hash : 按照任務自增id的對節點個數取余,余數值和當前節點的實時序號匹配,可以匹配就可以拿走執行,否則請自覺忽略掉這個任務
2) least_count:最少執行任務的節點優先去取任務
3) weight:按照節點權重去取任務
4) default: 默認先到先得,沒有其它規則
根據上面規則也可以說是任務的負載均衡策略可以知道除了默認規則,其余規則都需要知道全局的節點信息,比如節點執行次數,節點序號,節點權重等,所以我們需要給節點添加一個心跳,隔一個心跳周期上報一下自己的信息到數據庫,心跳核心代碼如下:
/** * 創建節點心跳延時隊列 */ private DelayQueue<DelayItem<Node>> heartBeatQueue = new DelayQueue<>(); /** * 可以明確知道最多只會運行2個線程,直接使用系統自帶工具 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2);
@PostConstruct public void init() { /** * 如果恢復線程開關是開着,並且心跳開關也是開着 */ if(config.isRecoverEnable() && config.isHeartBeatEnable()) { /** * 初始化一個節點到心跳隊列,延時為0,用來注冊節點 */ heartBeatQueue.offer(new DelayItem<>(0,new Node(config.getNodeId()))); /** * 執行心跳線程 */ bossPool.execute(new HeartBeat()); /** * 執行異常恢復線程 */ bossPool.execute(new Recover()); } }
class HeartBeat implements Runnable { @Override public void run() { for(;;) { try { /** * 時間到了就可以從延時隊列拿出節點對象,然后更新時間和序號, * 最后再新建一個超時時間為心跳時間的節點對象放入延時隊列,形成循環的心跳 */ DelayItem<Node> item = heartBeatQueue.take(); if(item != null && item.getItem() != null) { Node node = item.getItem(); handHeartBeat(node); } heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId()))); } catch (Exception e) { logger.error("task heart beat error,cause by:{} ",e); } } } } /** * 處理節點心跳 * @param node */ private void handHeartBeat(Node node) { if(node == null) { return; } /** * 先看看數據庫是否存在這個節點 * 如果不存在:先查找下一個序號,然后設置到node對象中,最后插入 * 如果存在:直接根據nodeId更新當前節點的序號和時間 */ Node currNode= nodeRepository.getByNodeId(node.getNodeId()); if(currNode == null) { node.setRownum(nodeRepository.getNextRownum()); nodeRepository.insert(node); } else { nodeRepository.updateHeartBeat(node.getNodeId()); } }
數據庫有了節點信息后,我們就可以實現各種花式的取任務的策略了,代碼如下:
/** * 抽象的策略接口 * @author rongdi * @date 2019-03-16 12:36 */ public interface Strategy { /** * 默認策略 */ String DEFAULT = "default"; /** * 按任務ID hash取余再和自己節點序號匹配 */ String ID_HASH = "id_hash"; /** * 最少執行次數 */ String LEAST_COUNT = "least_count"; /** * 按節點權重 */ String WEIGHT = "weight"; public static Strategy choose(String key) { switch(key) { case ID_HASH: return new IdHashStrategy(); case LEAST_COUNT: return new LeastCountStrategy(); case WEIGHT: return new WeightStrategy(); default: return new DefaultStrategy(); } } public boolean accept(List<Node> nodes,Task task,Long myNodeId); }
/** * 按照任務ID hash方式針對有效節點個數取余,然后余數+1后和各個節點的順序號匹配, * 這種方式效果其實等同於輪詢,因為任務id是自增的 * @author rongdi * @date 2019-03-16 */ public class IdHashStrategy implements Strategy { /** * 這里的nodes集合必然不會為空,外面調度那判斷了,而且是按照nodeId的升序排列的 */ @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { int size = nodes.size(); long taskId = task.getId(); /** * 找到自己的節點 */ Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); return myNode == null ? false : (taskId % size) + 1 == myNode.getRownum(); } }
/** * 最少處理任務次數策略,也就是每次任務來了,看看自己是不是處理任務次數最少的,是就可以消費這個任務 * @author rongdi * @date 2019-03-16 21:56 */ public class LeastCountStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { /** * 獲取次數最少的那個節點,這里可以類比成先按counts升序排列然后取第一個元素 * 然后是自己就返回true */ Optional<Node> min = nodes.stream().min((o1, o2) -> o1.getCounts().compareTo(o2.getCounts())); return min.isPresent()? min.get().getNodeId() == myNodeId : false; } }
/** * 按權重的分配策略,方案如下,假如 * 節點序號 1 ,2 ,3 ,4 * 節點權重 2 ,3 ,3 ,2 * 則取余后 0,1 | 2,3,4 | 5,6,7 | 8,9 * 序號1可以消費按照權重的和取余后小於2的 * 序號2可以消費按照權重的和取余后大於等於2小於2+3的 * 序號3可以消費按照權重的和取余后大於等於2+3小於2+3+3的 * 序號3可以消費按照權重的和取余后大於等於2+3+3小於2+3+3+2的 * 總結:本節點可以消費的按照權重的和取余后大於等於前面節點的權重和小於包括自己的權重和的這個范圍 * 不知道有沒有大神有更好的算法思路 * @author rongdi * @date 2019-03-16 23:16 */ public class WeightStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); if(myNode == null) { return false; } /** * 計算本節點序號前面的節點的權重和 */ int preWeightSum = nodes.stream().filter(node -> node.getRownum() < myNode.getRownum()).collect(Collectors.summingInt(Node::getWeight)); /** * 計算全部權重的和 */ int weightSum = nodes.stream().collect(Collectors.summingInt(Node::getWeight)); /** * 計算對權重和取余的余數 */ int remainder = (int)(task.getId() % weightSum); return remainder >= preWeightSum && remainder < preWeightSum + myNode.getWeight(); } }
然后我們再改造下調度類
/** * 獲取任務的策略 */ private Strategy strategy; @PostConstruct public void init() { /** * 根據配置選擇一個節點獲取任務的策略 */ strategy = Strategy.choose(config.getNodeStrategy()); /** * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,並且等待隊列滿后 * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閑60s后自動回收.自定義線程池是因為Executors那幾個線程工具 * 各有各的弊端,不適合生產使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 執行待處理任務加載線程 */ bossPool.execute(new Loader()); /** * 執行任務調度線程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 先獲取可用的節點列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { continue; } /** * 查找還有指定時間(單位秒)開始的主任務列表 */ List<Task> tasks = taskRepository.listPeddingTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { boolean accept = strategy.accept(nodes, task, config.getNodeId()); /** * 不該自己拿就不要搶 */ if(!accept) { continue; } task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * 使用樂觀鎖嘗試更新狀態,如果更新成功,其他節點就不會更新成功。如果在查詢待執行任務列表 * 和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不一樣了,這里更新 * 必然會返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封裝成延時對象放入延時隊列 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } }
如上可以通過各種花式的負載策略來平衡各個節點獲取的任務,同時也可以顯著降低各個節點對同一個任務的競爭。但是還有個問題,假如某個節點拿到了任務更新成了執行中,執行到一半,沒執行完也沒發生異常,突然這個節點由於各種原因掛了,那么這時候這個任務永遠沒有機會再執行了。這就是傳說中的占着茅坑不拉屎。解決這個問題可以用最終一致系統常見的方法,異常恢復線程。在這種場景下只需要檢測一下指定心跳超時時間(比如默認3個心跳周期)下沒有更新心跳時間的節點所屬的未完成任務,將這些任務狀態重新恢復成待執行,並且下次執行時間改成當前就可以了。核心代碼如下:
class Recover implements Runnable { @Override public void run() { for (;;) { try { /** * 查找需要恢復的任務,這里界定需要恢復的任務是任務還沒完成,並且所屬執行節點超過3個 * 心跳周期沒有更新心跳時間。由於這些任務由於當時執行節點沒有來得及執行完就掛了,所以 * 只需要把狀態再改回待執行,並且下次執行時間改成當前時間,讓任務再次被調度一次 */ List<Task> tasks = taskRepository.listRecoverTasks(config.getHeartBeatSeconds() * 3); if(tasks == null || tasks.isEmpty()) { return; } /** * 先獲取可用的節點列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { return; } long maxNodeId = nodes.get(nodes.size() - 1).getNodeId(); for (Task task : tasks) { /** * 每個節點有一個恢復線程,為了避免不必要的競爭,從可用節點找到一個最靠近任務所屬節點的節點 */ long currNodeId = chooseNodeId(nodes,maxNodeId,task.getNodeId()); long myNodeId = config.getNodeId(); /** * 如果不該當前節點處理直接跳過 */ if(currNodeId != myNodeId) { continue; } /** * 直接將任務狀態改成待執行,並且節點改成當前節點 */ task.setStatus(TaskStatus.PENDING); task.setNextStartTime(new Date()); task.setNodeId(config.getNodeId()); taskRepository.updateWithVersion(task); } Thread.sleep(config.getRecoverSeconds() * 1000); } catch (Exception e) { logger.error("Get next task failed,cause by:{}", e); } } } } /** * 選擇下一個節點 * @param nodes * @param maxNodeId * @param nodeId * @return */ private long chooseNodeId(List<Node> nodes,long maxNodeId,long nodeId) { if(nodes.size() == 0 || nodeId >= maxNodeId) { return nodes.get(0).getNodeId(); } return nodes.stream().filter(node -> node.getNodeId() > nodeId).findFirst().get().getNodeId(); }
如上為了避免每個節點的異常恢復線程對同一個任務做無謂的競爭,每個異常任務只能被任務所屬節點ID的下一個正常節點去恢復。這樣處理后就能確保就算出現了上面那種任務沒執行完節點掛了的情況,一段時間后也可以自動恢復。總的來說上面那些不考慮優化應該可以做為一個還不錯的任務調度框架了。如果你們以為這樣就完了,我只能說抱歉了,還有,哈哈!前面提到我是嫌棄其它任務調度用起來麻煩,特別是習慣用spring的注解寫調度的,那些很可能一個類里寫了n個帶有@Scheduled注解的調度方法,這樣改造起來更加麻煩,我是希望做到如下方式就可以直接整合到分布式任務調度里:
/** * 測試調度功能 * @author rongdi * @date 2019-03-17 16:54 */ @Component public class SchedulerTest { @Scheduled(cron = "0/10 * * * * ?") public void test1() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間1:"+sdf.format(new Date())); } @Scheduled(cron = "0/20 * * * * ?",parent = "test1") public void test2() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間2:"+sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?",parent = "test2") public void test3() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間3:"+sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?",parent = "test3") public void test4() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間4:"+sdf.format(new Date())); } }
為了達到上述目標,我們還需要在spring啟動后加載自定義的注解(名稱和spring的一樣),代碼如下
/** * spring容器啟動完后,加載自定義注解 * @author rongdi * @date 2019-03-15 21:07 */ @Component public class ContextRefreshedListener implements ApplicationListener<ContextRefreshedEvent> { @Autowired private TaskExecutor taskExecutor; /** * 用來保存方法名/任務名和任務插入后數據庫的ID的映射,用來處理子任務新增用 */ private Map<String,Long> taskIdMap = new HashMap<>(); @Override public void onApplicationEvent(ContextRefreshedEvent event) { /** * 判斷根容器為Spring容器,防止出現調用兩次的情況(mvc加載也會觸發一次) */ if(event.getApplicationContext().getParent()==null){ /** * 判斷調度開關是否打開 * 如果打開了:加載調度注解並將調度添加到調度管理中 */ ApplicationContext context = event.getApplicationContext(); Map<String,Object> beans = context.getBeansWithAnnotation(org.springframework.scheduling.annotation.EnableScheduling.class); if(beans == null) { return; } /** * 用來存放被調度注解修飾的方法名和Method的映射 */ Map<String,Method> methodMap = new HashMap<>(); /** * 查找所有直接或者間接被Component注解修飾的類,因為不管Service,Controller等都包含了Component,也就是 * 只要是被納入了spring容器管理的類必然直接或者間接的被Component修飾 */ Map<String,Object> allBeans = context.getBeansWithAnnotation(org.springframework.stereotype.Component.class); Set<Map.Entry<String,Object>> entrys = allBeans.entrySet(); /** * 遍歷bean和里面的method找到被Scheduled注解修飾的方法,然后將任務放入任務調度里 */ for(Map.Entry entry:entrys){ Object obj = entry.getValue(); Class clazz = obj.getClass(); Method[] methods = clazz.getMethods(); for(Method m:methods) { if(m.isAnnotationPresent(Scheduled.class)) { methodMap.put(clazz.getName() + Delimiters.DOT + m.getName(),m); } } } /** * 處理Sheduled注解 */ handleSheduledAnn(methodMap); /** * 由於taskIdMap只是啟動spring完成后使用一次,這里可以直接清空 */ taskIdMap.clear(); } } /** * 循環處理方法map中的所有Method * @param methodMap */ private void handleSheduledAnn(Map<String,Method> methodMap) { if(methodMap == null || methodMap.isEmpty()) { return; } Set<Map.Entry<String,Method>> entrys = methodMap.entrySet(); /** * 遍歷bean和里面的method找到被Scheduled注解修飾的方法,然后將任務放入任務調度里 */ for(Map.Entry<String,Method> entry:entrys){ Method m = entry.getValue(); try { handleSheduledAnn(methodMap,m); } catch (Exception e) { e.printStackTrace(); continue; } } } /** * 遞歸添加父子任務 * @param methodMap * @param m * @throws Exception */ private void handleSheduledAnn(Map<String,Method> methodMap,Method m) throws Exception { Class<?> clazz = m.getDeclaringClass(); String name = m.getName(); Scheduled sAnn = m.getAnnotation(Scheduled.class); String cron = sAnn.cron(); String parent = sAnn.parent(); /** * 如果parent為空,說明該方法代表的任務是根任務,則添加到任務調度器中,並且保存在全局map中 * 如果parent不為空,則表示是子任務,子任務需要知道父任務的id * 先根據parent里面代表的方法全名或者方法名(父任務方法和子任務方法在同一個類直接可以用方法名, * 不然要帶上類的全名)從taskIdMap獲取父任務ID * 如果找不到父任務ID,先根據父方法全名在methodMap找到父任務的method對象,調用本方法遞歸下去 * 如果找到父任務ID,則添加子任務 */ if(StringUtils.isEmpty(parent)) { if(!taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addTask(name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } else { String parentMethodName = parent.lastIndexOf(Delimiters.DOT) == -1 ? clazz.getName() + Delimiters.DOT + parent : parent; Long parentTaskId = taskIdMap.get(parentMethodName); if(parentTaskId == null) { Method parentMethod = methodMap.get(parentMethodName); handleSheduledAnn(methodMap,parentMethod); /** * 遞歸回來一定要更新一下這個父任務ID */ parentTaskId = taskIdMap.get(parentMethodName); } if(parentTaskId != null && !taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addChildTask(parentTaskId, name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } } }
上述代碼就完成了spring初始化完成后加載了自己的自定義任務調度的注解,並且也受spring的調度開關@EnableScheduling的控制,實現無縫整合到spring或者springboot中去,達到了我這種的懶人的要求。
好了其實寫這個框架差不多就用了5天業余時間,估計會有一些隱藏的坑,不過明顯的坑我自己都解決了,開源出來的目的既是為了拋磚引玉,也為了廣大屌絲程序員提供一種新的思路,希望對大家有所幫助,同時也希望大家多幫忙找找bug,一起來完善這個東西,大神們請忽略。文筆不好,主要是好久沒寫作文了,請大家多多擔待。詳細的流水賬式的源碼加上長篇大論式的漢語注釋盡情查看:https://github.com/rongdi/easy-job