簡介
由於最近工作比較忙,前前后后花了兩個月的時間把TBSchedule的源碼翻了個底朝天。關於TBSchedule的使用,網上也有很多參考資料,這里不做過多的闡述。本文着重介紹TBSchedule的運行機制,架構設計以及優化建議。通過學習別人的經驗,來提高自己的技術能力,感受阿里人的智慧,也向阿里空玄,阿里玄難為開源貢獻致敬。
zookeeper依賴
TBSchedule依賴於ZK存儲調度數據,在使用中充當着nosql的角色,zk的watch機制只用於zk重連,提高可靠性。下圖是zk與tbschedule的部署圖。
TBSchedule有很多特性,包括批量任務,多主機,多線程,動態擴展,實時或定時任務,分片,並發,不重復執行。在介紹這些特性之前,先來了解一下整個zk目錄結構,有助於理解整個調度過程。下圖是zk調度數據結構圖。其中()內表示zk目錄保存的數據。
TBSchedule原理
1)TBSchedule在zookeeper初始化完成之后初始化數據,其中創建basetasktype,stractegy,factory目錄。調用registerManagerFactory,在factory目錄下創建瞬時有序節點,節點名稱(IP+$+HostName+$+UUID+$Sequence),然后根據ip是否在ip管理范圍內,在strategy目錄下添加或刪除對應的(IP+$+HostName+$+UUID+$Sequence)瞬時目錄節點。最后啟動默認的refresh()操作。
2)TBSchedule在每2s中zk正常情況下執行一次refresh操作,該操作如果查詢zk管理信息異常則停止所有調度任務后重新注冊管理器工廠,如果管理器start狀態=false,則停止所有調度任務。具體實現在TBScheduleManagerFactory的reRegisterManagerFactory()中。具體代碼如下:
public void reRegisterManagerFactory() throws Exception { // 根據UUID,在/factory目錄下查找對應目錄,並在/strategy目錄下更具IP數組, //確定可管理的strtegyName下創建(IP+$+HostName+$+UUID+$Sequence)目錄 // 返回不可管理的調度策略類型名稱,並停止對應的調度處理器 List<String> stopList = this.getScheduleStrategyManager() .registerManagerFactory(this); for (String strategyName : stopList) { this.stopServer(strategyName); //停止對應的調度處理器 } //根據策略重新分配調度任務機器的任務數,並在zk上更新對應的ScheduleStrategyRunntime中的AssignNum this.assignScheduleServer(); //注意,一個strategyName下只有唯一表示當前調度服務器的節點(IP+$+HostName+$+UUID+$Sequence) //同時一個strategyName對應該調度服務器多個IStrategyTask任務管理器,一個taskItem對應一個任務管理器 //多則刪停,少則加起 this.reRunScheduleServer(); }
這邊再介紹下tbschedule的任務分配策略,列如當前有4台機器(A,B,C,D),共10個任務(0,1..9)。首先將10個任務均等分,每個服務器可以分配到2個任務,最后剩余兩個任務將給A,B服務器獲得,具體算法如下:
/** * 分配任務數量 * @param serverNum 總的服務器數量 * @param taskItemNum 任務項數量 * @param maxNumOfOneServer 每個server最大任務項數目 * @param maxNum 總的任務數量 * @return */ public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){ int[] taskNums = new int[serverNum]; int numOfSingle = taskItemNum / serverNum; int otherNum = taskItemNum % serverNum; //20150323 刪除, 任務分片保證分配到所有的線程組數上。 開始 // if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) { // numOfSingle = maxNumOfOneServer; // otherNum = 0; // } //20150323 刪除, 任務分片保證分配到所有的線程組數上。 結束 for (int i = 0; i < taskNums.length; i++) { if (i < otherNum) { taskNums[i] = numOfSingle + 1; } else { taskNums[i] = numOfSingle; } } return taskNums; }
3)接下來根據每個strategyName下獲得的任務數,來創建對應任務調度管理器數。具體實現在reRunScheduleServer()方法中,循環創建IStrategyTask,根據調度類型Schedule,Java,bean實例化不同的任務管理器TBScheduleManagerStatic,也包括自定義管理器,只要繼承IStrategyTask接口就可以了。列如自定義管理器,需要配置taskname為java全類名或者bean的名稱。

package com.taobao.pamirs.schedule.test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.taobao.pamirs.schedule.strategy.IStrategyTask; /** * 自定義任務管理器,調度類型為Java,Bean * @author Administrator * */ public class JavaTaskDemo implements IStrategyTask,Runnable { protected static transient Logger log = LoggerFactory.getLogger(JavaTaskDemo.class); private String parameter; private boolean stop = false; public void initialTaskParameter(String strategyName,String taskParameter) { parameter = taskParameter; new Thread(this).start(); } @Override public void stop(String strategyName) throws Exception { this.stop = true; } @Override public void run() { while(stop == false){ log.error("執行任務:" + this.parameter); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
對於常用的Schedule調度類型,使用的是TBScheduleManagerStatic管理器。
4)任務調度分配器TBScheduleManager,能夠使得任務分片被不重復,不遺漏的快速處理。該功能也是TBSchedule的核心實現,一個JVM可以包含不同taskType的多個任務調度分配器。也就是說可以有相同任務taskType的多個任務管理器,也可以存在不同的tasktype的任務管理器。每個任務管理器包含一個任務處理器IScheduleProcessor,IScheduleProcessor實際上是個Runnnable對象,根據任務類型的線程數來初始化調度線程。任務處理器分為SLEEP和NotSleep模式。
下面是創建TBScheduleManager的操作。
TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{ this.factory = aFactory; this.currentSerialNumber = serialNumber(); this.scheduleCenter = aScheduleCenter; this.taskTypeInfo = this.scheduleCenter.loadTaskTypeBaseInfo(baseTaskType); log.info("create TBScheduleManager for taskType:"+baseTaskType); //清除已經過期1天的TASK,OWN_SIGN的組合。超過一天沒有活動server的視為過期 this.scheduleCenter.clearExpireTaskTypeRunningInfo(baseTaskType,ScheduleUtil.getLocalIP() + "清除過期OWN_SIGN信息",this.taskTypeInfo.getExpireOwnSignInterval()); Object dealBean = aFactory.getBean(this.taskTypeInfo.getDealBeanName()); if (dealBean == null) { throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 不存在"); } if (dealBean instanceof IScheduleTaskDeal == false) { throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 沒有實現 IScheduleTaskDeal接口"); } this.taskDealBean = (IScheduleTaskDeal)dealBean; if(this.taskTypeInfo.getJudgeDeadInterval() < this.taskTypeInfo.getHeartBeatRate() * 5){ throw new Exception("數據配置存在問題,死亡的時間間隔,至少要大於心跳線程的5倍。當前配置數據:JudgeDeadInterval = " + this.taskTypeInfo.getJudgeDeadInterval() + ",HeartBeatRate = " + this.taskTypeInfo.getHeartBeatRate()); } //生成ScheduleServer信息。 this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber()); //設置ScheduleServer的ManagerFactoryUUID this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid()); //在/server下注冊ScheduleServer信息,實際上可以看成在server目錄下的每一個子節點表示一個任務調度管理器 scheduleCenter.registerScheduleServer(this.currenScheduleServer); this.mBeanName = "pamirs:name=" + "schedule.ServerMananger." +this.currenScheduleServer.getUuid(); this.heartBeatTimer = new Timer(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-HeartBeat"); this.heartBeatTimer.schedule(new HeartBeatTimerTask(this), new java.util.Date(System.currentTimeMillis() + 500), this.taskTypeInfo.getHeartBeatRate()); initial(); }
5)上面有兩個重要的操作,一個是心跳調度器,主要職責是更新/server目錄下對應的調度管理器心跳信息,清除過期的scheduleServer,如果是leader則進行任務項的分配。
class HeartBeatTimerTask extends java.util.TimerTask { private static transient Logger log = LoggerFactory .getLogger(HeartBeatTimerTask.class); TBScheduleManager manager; public HeartBeatTimerTask(TBScheduleManager aManager) { manager = aManager; } public void run() { try { Thread.currentThread().setPriority(Thread.MAX_PRIORITY); manager.refreshScheduleServerInfo(); } catch (Exception ex) { log.error(ex.getMessage(), ex); } } }
/** * 如果發現本次更新的時間如果已經超過了,服務器死亡的心跳周期,則不能在向服務器更新信息。 * 而應該當作新的服務器,進行重新注冊。 * @throws Exception */ public void refreshScheduleServerInfo() throws Exception { try{ //在/server下更新任務調度服務器的心跳時間,調度信息 rewriteScheduleInfo(); //如果任務信息沒有初始化成功,不做任務相關的處理,未完成init() if(this.isRuntimeInfoInitial == false){ return; } //重新分配任務,leader重新檢查可用調度管理器,並修改taskItem下的current_server,req_server. this.assignScheduleTask(); //判斷是否需要重新加載任務隊列,避免任務處理進程不必要的檢查和等待 //思路:每一次修改了taskitem的任務分配之后,會在/taskitem下保存leader信息,及默認版本號-1 //比較保存的上一次任務加載的版本號是否 < 當前的版本號 boolean tmpBoolean = this.isNeedReLoadTaskItemList(); if(tmpBoolean != this.isNeedReloadTaskItem){ //只要不相同,就設置需要重新裝載,因為在心跳異常的時候,做了清理隊列的事情,恢復后需要重新裝載。 synchronized (NeedReloadTaskItemLock) { this.isNeedReloadTaskItem = true; } rewriteScheduleInfo(); } if(this.isPauseSchedule == true || this.processor != null && processor.isSleeping() == true){ //如果服務已經暫停了,則需要重新定時更新 cur_server 和 req_server //如果服務沒有暫停,一定不能調用的 //調度服務策略如果已經失效,會拋出異常 //加載任務list<taskDefine> this.getCurrentScheduleTaskItemListNow(); } }catch(Throwable e){ //清除內存中所有的已經取得的數據和任務隊列,避免心跳線程失敗時候導致的數據重復 this.clearMemoInfo(); if(e instanceof Exception){ throw (Exception)e; }else{ throw new Exception(e.getMessage(),e); } } }
其中的this.assignScheduleTask();實現了任務調度管理器的變化而相應的修改/taskItem下curr_server和req_server的調度變化。核心思想:rewriteScheduleInfo()中沒有相應的調度服務器,則在/server下注冊。然后獲取有效的所有調度服務器,遍歷所有任務項,如果發現該任務項的curr_server表示的manager不存在,則設置null。然后對所有的任務分片重新分配調度服務器,具體算法如下:

public void assignTaskItem(String taskType, String currentUuid,int maxNumOfOneServer, List<String> taskServerList) throws Exception { if(this.isLeader(currentUuid,taskServerList)==false){ if(log.isDebugEnabled()){ log.debug(currentUuid +":不是負責任務分配的Leader,直接返回"); } return; } if(log.isDebugEnabled()){ log.debug(currentUuid +":開始重新分配任務......"); } if(taskServerList.size()<=0){ //在服務器動態調整的時候,可能出現服務器列表為空的清空 return; } String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(taskType); String zkPath = this.PATH_BaseTaskType + "/" + baseTaskType + "/" + taskType + "/" + this.PATH_TaskItem; List<String> children = this.getZooKeeper().getChildren(zkPath, false); // Collections.sort(children); // 20150323 有些任務分片,業務方其實是用數字的字符串排序的。優先以數字進行排序,否則以字符串排序 Collections.sort(children,new Comparator<String>(){ public int compare(String u1, String u2) { if(StringUtils.isNumeric(u1) && StringUtils.isNumeric(u2)){ int iU1= Integer.parseInt(u1); int iU2= Integer.parseInt(u2); /*if(iU1==iU2){ return 0 ; }else if(iU1>iU2){ return 1 ; }else{ return -1; }*/ return iU1-iU2; }else{ return u1.compareTo(u2); } } }); int unModifyCount =0; int[] taskNums = ScheduleUtil.assignTaskNumber(taskServerList.size(), children.size(), maxNumOfOneServer); int point =0; int count = 0; String NO_SERVER_DEAL = "沒有分配到服務器"; for(int i=0;i <children.size();i++){ String name = children.get(i); if(point <taskServerList.size() && i >= count + taskNums[point]){ count = count + taskNums[point]; point = point + 1; } String serverName = NO_SERVER_DEAL; if(point < taskServerList.size() ){ serverName = taskServerList.get(point); } byte[] curServerValue = this.getZooKeeper().getData(zkPath + "/" + name + "/cur_server",false,null); byte[] reqServerValue = this.getZooKeeper().getData(zkPath + "/" + name + "/req_server",false,null); if(curServerValue == null || new String(curServerValue).equals(NO_SERVER_DEAL)){ //對沒有分配的任務分片,添加調度服務器 this.getZooKeeper().setData(zkPath + "/" + name + "/cur_server",serverName.getBytes(),-1); this.getZooKeeper().setData(zkPath + "/" + name + "/req_server",null,-1); }else if(new String(curServerValue).equals(serverName)==true && reqServerValue == null ){ //不需要做任何事情 當前執行的調度器正好和重新分配的調度器一致 unModifyCount = unModifyCount + 1; }else{ //調度服務器請求轉換 this.getZooKeeper().setData(zkPath + "/" + name + "/req_server",serverName.getBytes(),-1); } } if(unModifyCount < children.size()){ //設置需要所有的服務器重新裝載任務 log.info("設置需要所有的服務器重新裝載任務:updateReloadTaskItemFlag......"+taskType+ " ,currentUuid "+currentUuid ); //設置/server[v.2][reload=true] this.updateReloadTaskItemFlag(taskType); } if(log.isDebugEnabled()){ StringBuffer buffer = new StringBuffer(); for(ScheduleTaskItem taskItem: this.loadAllTaskItem(taskType)){ buffer.append("\n").append(taskItem.toString()); } log.debug(buffer.toString()); } }
在第4點附上的源碼最后有個initial();操作,首先啟動一個獨立的線程,判斷isRuntimeInfoInitial標志位判斷是否已經初始化數據,如果沒有則leader調度器執行initialRunningInfo(),刪除/TaskItem目錄,根據ScheduleTaskType,獲取到的任務項數組,創建任務項節點,同時在/taskItem下設置leader數據。initial()源碼如下:

public void initial() throws Exception{ new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){ @SuppressWarnings("static-access") public void run(){ try{ log.info("開始獲取調度任務隊列...... of " + currenScheduleServer.getUuid()); //並發啟動調度管理器,直至leader初始化任務項完成 while (isRuntimeInfoInitial == false) { if(isStopSchedule == true){ log.debug("外部命令終止調度,退出調度隊列獲取:" + currenScheduleServer.getUuid()); return; } //log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial); try{ initialRunningInfo(); //在/taskitem下的數據判斷是否為leader的數據 isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss( currenScheduleServer.getBaseTaskType(), currenScheduleServer.getOwnSign()); }catch(Throwable e){ //忽略初始化的異常 log.error(e.getMessage(),e); } if(isRuntimeInfoInitial == false){ Thread.currentThread().sleep(1000); } } int count =0; lastReloadTaskItemListTime = scheduleCenter.getSystemTime(); //此處會給currentTaskItemList添加元素,直至加載到任務 while(getCurrentScheduleTaskItemListNow().size() <= 0){ if(isStopSchedule == true){ log.debug("外部命令終止調度,退出調度隊列獲取:" + currenScheduleServer.getUuid()); return; } Thread.currentThread().sleep(1000); count = count + 1; // log.error("嘗試獲取調度隊列,第" + count + "次 ") ; } String tmpStr ="TaskItemDefine:"; for(int i=0;i< currentTaskItemList.size();i++){ if(i>0){ tmpStr = tmpStr +","; } tmpStr = tmpStr + currentTaskItemList.get(i); } log.info("獲取到任務處理隊列,開始調度:" + tmpStr +" of "+ currenScheduleServer.getUuid()); //任務總量 taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size(); //只有在已經獲取到任務處理隊列后才開始啟動任務處理器 computerStart(); }catch(Exception e){ log.error(e.getMessage(),e); String str = e.getMessage(); if(str.length() > 300){ str = str.substring(0,300); } startErrorInfo = "啟動處理異常:" + str; } } }.start(); }
最后的computerStart()方法是實現周期執行的關鍵,TBSchedule基於cronExpression表達式實現周期性調度,執行類型分為兩種TYPE_PAUSE,TYPE_RESUME。並更新setNextRunStartTime和setNextRunEndTime。

/** * 開始的時候,計算第一次執行時間 * @throws Exception */ public void computerStart() throws Exception{ //只有當存在可執行隊列后再開始啟動隊列 boolean isRunNow = false; if(this.taskTypeInfo.getPermitRunStartTime() == null){ isRunNow = true; }else{ String tmpStr = this.taskTypeInfo.getPermitRunStartTime(); if(tmpStr.toLowerCase().startsWith("startrun:")){ isRunNow = true; tmpStr = tmpStr.substring("startrun:".length()); } CronExpression cexpStart = new CronExpression(tmpStr); Date current = new Date( this.scheduleCenter.getSystemTime()); Date firstStartTime = cexpStart.getNextValidTimeAfter(current); this.heartBeatTimer.schedule( new PauseOrResumeScheduleTask(this,this.heartBeatTimer, PauseOrResumeScheduleTask.TYPE_RESUME,tmpStr), firstStartTime); this.currenScheduleServer.setNextRunStartTime(ScheduleUtil.transferDataToString(firstStartTime)); if( this.taskTypeInfo.getPermitRunEndTime() == null || this.taskTypeInfo.getPermitRunEndTime().equals("-1")){ this.currenScheduleServer.setNextRunEndTime("當不能獲取到數據的時候pause"); }else{ try { String tmpEndStr = this.taskTypeInfo.getPermitRunEndTime(); CronExpression cexpEnd = new CronExpression(tmpEndStr); Date firstEndTime = cexpEnd.getNextValidTimeAfter(firstStartTime); Date nowEndTime = cexpEnd.getNextValidTimeAfter(current); if(!nowEndTime.equals(firstEndTime) && current.before(nowEndTime)){ isRunNow = true; firstEndTime = nowEndTime; } this.heartBeatTimer.schedule( new PauseOrResumeScheduleTask(this,this.heartBeatTimer, PauseOrResumeScheduleTask.TYPE_PAUSE,tmpEndStr), firstEndTime); this.currenScheduleServer.setNextRunEndTime(ScheduleUtil.transferDataToString(firstEndTime)); } catch (Exception e) { log.error("計算第一次執行時間出現異常:" + currenScheduleServer.getUuid(), e); throw new Exception("計算第一次執行時間出現異常:" + currenScheduleServer.getUuid(), e); } } } //如果沒有getPermitRunStartTime,則跳過timer調度,立即執行 if(isRunNow == true){ this.resume("開啟服務立即啟動"); } this.rewriteScheduleInfo(); }
從上面的代碼中,我們注意到了這個調度使用同一個timer對象,每次調度執行后在timer添加新的調度task。如果是PAUSE類型調度,則執行manager.pause("到達終止時間,pause調度"),如果是RESUME,則執行manager.resume("到達開始時間,resume調度");,並計算下次調度時間,重新添加到調度隊列。具體實現如下:

class PauseOrResumeScheduleTask extends java.util.TimerTask { private static transient Logger log = LoggerFactory .getLogger(HeartBeatTimerTask.class); public static int TYPE_PAUSE = 1; public static int TYPE_RESUME = 2; TBScheduleManager manager; Timer timer; int type; String cronTabExpress; public PauseOrResumeScheduleTask(TBScheduleManager aManager,Timer aTimer,int aType,String aCronTabExpress) { this.manager = aManager; this.timer = aTimer; this.type = aType; this.cronTabExpress = aCronTabExpress; } public void run() { try { Thread.currentThread().setPriority(Thread.MAX_PRIORITY); this.cancel();//取消調度任務 Date current = new Date(System.currentTimeMillis()); CronExpression cexp = new CronExpression(this.cronTabExpress); Date nextTime = cexp.getNextValidTimeAfter(current); if(this.type == TYPE_PAUSE){ manager.pause("到達終止時間,pause調度"); this.manager.getScheduleServer().setNextRunEndTime(ScheduleUtil.transferDataToString(nextTime)); }else{ manager.resume("到達開始時間,resume調度"); this.manager.getScheduleServer().setNextRunStartTime(ScheduleUtil.transferDataToString(nextTime)); } this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime); } catch (Throwable ex) { log.error(ex.getMessage(), ex); } } }
resume即在可執行時間區間恢復調度,根據SchduleTaskType配置的處理器類型模式Sleep或者NotSleep來初始化處理器。默認使用TBScheduleProcessorSleep處理器。
/** * 處在了可執行的時間區間,恢復運行 * @throws Exception */ public void resume(String message) throws Exception{ if (this.isPauseSchedule == true) { if(log.isDebugEnabled()){ log.debug("恢復調度:" + this.currenScheduleServer.getUuid()); } this.isPauseSchedule = false; this.pauseMessage = message; if (this.taskDealBean != null) { if (this.taskTypeInfo.getProcessorType() != null && this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){ this.taskTypeInfo.setProcessorType("NOTSLEEP"); this.processor = new TBScheduleProcessorNotSleep(this, taskDealBean,this.statisticsInfo); }else{ this.processor = new TBScheduleProcessorSleep(this, taskDealBean,this.statisticsInfo); this.taskTypeInfo.setProcessorType("SLEEP"); } } rewriteScheduleInfo();//更新心跳信息 } }
6)多線程執行,TBScheduleProcessorSleep是一個Runnable對象,多個調度線程共享如下變量:
final LockObject m_lockObject = new LockObject(); //緩存線程對象 List<Thread> threadList = new CopyOnWriteArrayList<Thread>(); /** * 任務管理器 */ protected TBScheduleManager scheduleManager; /** * 任務類型 */ ScheduleTaskType taskTypeInfo; /** * 任務處理的接口類 */ protected IScheduleTaskDeal<T> taskDealBean; /** * 當前任務隊列的版本號 */ protected long taskListVersion = 0; final Object lockVersionObject = new Object(); final Object lockRunningList = new Object(); //任務隊列 protected List<T> taskList = new CopyOnWriteArrayList<T>(); /** * 是否可以批處理 */ boolean isMutilTask = false; /** * 是否已經獲得終止調度信號 */ boolean isStopSchedule = false;// 用戶停止隊列調度 boolean isSleeping = false;
在初始化執行處理器,會啟動ThreadNumber個線程數,
for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
this.startThread(i);
}
下面具體看一下線程的run()操作。一個執行線程的職責主要是執行自定義的IScheduleTaskDealSingle,而IScheduleTaskDealMulti可以實現批量處理,實現區別也是大同小異。其核心思想:
對開始執行的線程計數+1,在沒有停止調度的前提下即resume狀態下,執行客戶自定義ScheduleTask的execute()方法,並完成執行統計。當任務隊列中的所有任務Item都執行完成,隊列為空時,如果正在執行任務的線程數不是最后一個線程,則等待。反之,則加載任務,有數據喚醒所有等待線程繼續執行,沒數據線程sleep SleepTimeNoData時間,並繼續加載任務數據。

public void run(){ try { long startTime =0; while(true){ this.m_lockObject.addThread(); Object executeTask; while (true) { if(this.isStopSchedule == true){//停止隊列調度 this.m_lockObject.realseThread(); this.m_lockObject.notifyOtherThread();//通知所有的休眠線程 synchronized (this.threadList) { this.threadList.remove(Thread.currentThread()); if(this.threadList.size()==0){ this.scheduleManager.unRegisterScheduleServer(); } } return; } //加載調度任務 if(this.isMutilTask == false){ executeTask = this.getScheduleTaskId(); }else{ executeTask = this.getScheduleTaskIdMulti(); } if(executeTask == null){ break; } try {//運行相關的程序 startTime =scheduleManager.scheduleCenter.getSystemTime(); if (this.isMutilTask == false) { if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) { addSuccessNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(1, scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } else { if (((IScheduleTaskDealMulti) this.taskDealBean) .execute((Object[]) executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) { addSuccessNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } }catch (Throwable ex) { if (this.isMutilTask == false) { addFailNum(1,scheduleManager.scheduleCenter.getSystemTime()- startTime, "TBScheduleProcessor.run"); } else { addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime() - startTime, "TBScheduleProcessor.run"); } logger.warn("Task :" + executeTask + " 處理失敗", ex); } } //當前隊列中所有的任務都已經完成了。 if(logger.isTraceEnabled()){ logger.trace(Thread.currentThread().getName() +":當前運行線程數量:" +this.m_lockObject.count()); } if (this.m_lockObject.realseThreadButNotLast() == false) { int size = 0; Thread.currentThread().sleep(100); startTime =scheduleManager.scheduleCenter.getSystemTime(); // 裝載數據 size = this.loadScheduleData(); if (size > 0) { this.m_lockObject.notifyOtherThread(); } else { //判斷當沒有數據的是否,是否需要退出調度 if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){ if(logger.isTraceEnabled()){ logger.trace("沒有裝載到數據,start sleep"); } this.isSleeping = true; Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData()); this.isSleeping = false; if(logger.isTraceEnabled()){ logger.trace("Sleep end"); } }else{ //沒有數據,退出調度,喚醒所有沉睡線程 this.m_lockObject.notifyOtherThread(); } } this.m_lockObject.realseThread(); } else {// 將當前線程放置到等待隊列中。直到有線程裝載到了新的任務數據 if(logger.isTraceEnabled()){ logger.trace("不是最后一個線程,sleep"); } this.m_lockObject.waitCurrentThread(); } } } catch (Throwable e) { logger.error(e.getMessage(), e); } }
TBSchedule思考與挑戰
1)Zookeeper節點遍歷優化
列如存在上圖目錄節點,原有的查找節點數有些問題,不能完全刪除目錄。我這邊提供的思想是遞歸查找目錄樹。最終結果為A-B-E-C-F-D,刪除節點的時候從最后一個節點刪除,不會出現子目錄存在而直接刪除父節點的操作。代碼如下:
/** * 使用遞歸遍歷所有結點 * * @param zk * @param path * @param dealList * @throws Exception * @throws InterruptedException */ private static void getTree(ZooKeeper zk, String path, List<String> dealList) throws Exception, InterruptedException { //添加父目錄 dealList.add(path); List<String> children = zk.getChildren(path, false); if (path.charAt(path.length() - 1) != '/') { path = path + "/"; } //添加子目錄 for (int i = 0; i < children.size(); i++) { getTree(zk, path + children.get(i), dealList); } }
2)線程優化
通過上面TBSchedule的源碼分析,我們知道一個任務調度處理器,會創建一個timer根據cron表達式執行resume和pause操作。每一次resume都會創建TBScheduleProcessorSleep,然后初始化多個線程。
當該timer進行N次調度resume的時候,也就是系統會創建N*threadNum個線程,執行pause操作,則這些線程將會銷毀。我的建議是每一個任務調度處理器,都指定1個線程數的cacheThreadPool線程池。可能會有人說,為何不指定一個ThreadNum數的fixedThreadPool。因為當timer執行多次resume的時候,如果上一次的resume還沒有完成,線程池中沒有空閑的線程來執行新的task,會造成線程依賴而下一調度的超時或者失敗。指定cacheThreadPool,根據ThreadNum值向線程池submit ThreadNum個runnable對象。
3)鎖優化
在任務執行器TBScheduleProcessorSleep中,t通過加載任務item (List<TaskItemDefine> taskItems),執行taskDealBean.selectTasks方法。獲取到的數據存放在CopyOnWriteArrayList中。這里簡單的介紹下寫時拷貝容器CopyOnWriteArrayList,其對並發讀不會加鎖,而對並發寫同步,當有一個寫請求,首先獲取鎖 ,然后復制當前容器內數據,進行增刪改,最后替換掉原有的數組引用,從而達到現場安全的目的,實際上該容器非常適合讀多寫少的場景。而目前的場景並沒有讀get的操作。獲取容器元素調用remove()方法,同樣獲取鎖。
既然讀已經同步,那么在獲取任務的時候,就不需要加synchronized關鍵字了。原有代碼如下:
public synchronized Object getScheduleTaskId() { //可以去除synchronized
if (this.taskList.size() > 0) return this.taskList.remove(0); // 按正序處理 return null; }
4)設計優化
整個TBSchedule的調度,默認兩秒內會執行refresh()操作,停止所有的任務調度器然后重新創建新的任務調度器。這樣的好處可以使得某一個調度節點宕機,或者網絡原因導致心跳失敗,再或者在控制台修改了調度策略配置信息。可以動態的生效。但是如果能夠基於ZK的watch機制,對系統的消耗會更小。由於在factory目錄下創建的都是瞬時節點,如果某一個server宕機。zk會watch到相應的事件。同樣,在ScheduleTaskType下的數據發生改變,zk同樣可以watch到相應的事件。如果發現出現了上述幾種情況,那么TBSchedule可以執行refresh()操作了。
總結
TBSchedule的使用場景還是非常廣泛,如定時數據同步,日志上報等等。不同於quartz的搶占式任務調度,TBSchedule更側重於任務多分片並行處理,基於分布式集群提高任務處理能力。知其然且知其所以然有助於更好的使用框架,並解決實際問題。
更多資料:http://geek.csdn.net/news/detail/65738
源碼:http://code.taobao.org/p/tbschedule/src/
,