如何實現分布式定時任務(xxl的實現)


1、前言

    定時任務在任何系統中都非常重要,如:訂單48小時自動完成,每日重新給會員送優惠券,游戲中每隔半小時給玩家添加體力等等。

對於小型系統我們可以用quartz和spring task實現定時任務,這樣都任務存在如下幾個任務:

1)單點問題,如果任務服務器掛了,定時任務就掛了;

2)如果任務服務和業務代碼耦合在一起,業務服務部署多台主機,任務服務在每天機器上都會觸發,引起任務重復執行;

3)任務不可預知執行情況,需要開發人員每天去檢查日志,查看是否執行成功;

4)當任務失敗了之后,沒辦法手動執行任務 

   這時候分布式任務就該出場了。那么分布式任務是如何解決上面當問題當昵?

2、名詞說明

    調度中心:負責任務調度當服務;

    執行器:   執行任務當服務器;

    管理中心:負責任務的創建更新刪除,查看任務狀態,執行過程的服務器。

3、架構圖

     

  說明

   1)服務注冊中心可以是zookeeper,eureka,也可以是自己實現的。

   2)leader選擇器可以替換為分布式鎖(redission),在調度任務的時候控制只有一個調度中心在分配任務,當然也可以使用select * from for update。

         目前xxl-job就是采用select * from for update 加時間輪的方式實現的。

package  com.xxl.job.admin.core.thread;
 
import  com.xxl.job.admin.core.conf.XxlJobAdminConfig;
import  com.xxl.job.admin.core.cron.CronExpression;
import  com.xxl.job.admin.core.model.XxlJobInfo;
import  com.xxl.job.admin.core.trigger.TriggerTypeEnum;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
 
import  java.sql.Connection;
import  java.sql.PreparedStatement;
import  java.sql.SQLException;
import  java.util.*;
import  java.util.concurrent.ConcurrentHashMap;
import  java.util.concurrent.TimeUnit;
 
/**
  * @author xuxueli 2019-05-21
  */
public  class  JobScheduleHelper {
     private  static  Logger logger = LoggerFactory.getLogger(JobScheduleHelper. class );
 
     private  static  JobScheduleHelper instance =  new  JobScheduleHelper();
     public  static  JobScheduleHelper getInstance(){
         return  instance;
     }
 
     private  Thread scheduleThread;
     private  Thread ringThread;
     private  volatile  boolean  toStop =  false ;
     private  volatile  static  Map<Integer, List<Integer>> ringData =  new  ConcurrentHashMap<>();
 
     public  void  start(){
 
         // schedule thread
         scheduleThread =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 try  {
                     TimeUnit.MILLISECONDS.sleep( 5000  - System.currentTimeMillis()% 1000  );
                 catch  (InterruptedException e) {
                     if  (!toStop) {
                         logger.error(e.getMessage(), e);
                     }
                 }
                 logger.info( ">>>>>>>>> init xxl-job admin scheduler success." );
 
                 while  (!toStop) {
 
                     // 掃描任務
                     long  start = System.currentTimeMillis();
                     Connection conn =  null ;
                     PreparedStatement preparedStatement =  null ;
                     try  {
                         if  (conn== null  || conn.isClosed()) {
                             conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                         }
                         conn.setAutoCommit( false );
 
                         preparedStatement = conn.prepareStatement(   "select * from xxl_job_lock where lock_name = 'schedule_lock' for update"  );
                         preparedStatement.execute();
 
                         // tx start
 
                         // 1、預讀10s內調度任務
                         long  maxNextTime = System.currentTimeMillis() +  10000 ;
                         long  nowTime = System.currentTimeMillis();
                         List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime);
                         if  (scheduleList!= null  && scheduleList.size()> 0 ) {
                             // 2、推送時間輪
                             for  (XxlJobInfo jobInfo: scheduleList) {
 
                                 // 時間輪刻度計算
                                 int  ringSecond = - 1 ;
                                 if  (jobInfo.getTriggerNextTime() < nowTime -  10000 ) {    // 過期超10s:本地忽略,當前時間開始計算下次觸發時間
                                     ringSecond = - 1 ;
 
                                     jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                     jobInfo.setTriggerNextTime(
                                             new  CronExpression(jobInfo.getJobCron())
                                                     .getNextValidTimeAfter( new  Date())
                                                     .getTime()
                                     );
                                 else  if  (jobInfo.getTriggerNextTime() < nowTime) {     // 過期10s內:立即觸發一次,當前時間開始計算下次觸發時間
                                     ringSecond = ( int )((nowTime/ 1000 )% 60 );
 
                                     jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                     jobInfo.setTriggerNextTime(
                                             new  CronExpression(jobInfo.getJobCron())
                                                     .getNextValidTimeAfter( new  Date())
                                                     .getTime()
                                     );
                                 else  {     // 未過期:正常觸發,遞增計算下次觸發時間
                                     ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 );
 
                                     jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                     jobInfo.setTriggerNextTime(
                                             new  CronExpression(jobInfo.getJobCron())
                                                     .getNextValidTimeAfter( new  Date(jobInfo.getTriggerNextTime()))
                                                     .getTime()
                                     );
                                 }
                                 if  (ringSecond == - 1 ) {
                                     continue ;
                                 }
 
                                 // push async ring
                                 List<Integer> ringItemData = ringData.get(ringSecond);
                                 if  (ringItemData ==  null ) {
                                     ringItemData =  new  ArrayList<Integer>();
                                     ringData.put(ringSecond, ringItemData);
                                 }
                                 ringItemData.add(jobInfo.getId());
 
                                 logger.debug( ">>>>>>>>>>> xxl-job, push time-ring : "  + ringSecond +  " = "  + Arrays.asList(ringItemData) );
                             }
 
                             // 3、更新trigger信息
                             for  (XxlJobInfo jobInfo: scheduleList) {
                                 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                             }
 
                         }
 
                         // tx stop
 
                         conn.commit();
                     catch  (Exception e) {
                         if  (!toStop) {
                             logger.error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}" , e);
                         }
                     finally  {
                         if  (conn !=  null ) {
                             try  {
                                 conn.close();
                             catch  (SQLException e) {
                             }
                         }
                         if  ( null  != preparedStatement) {
                             try  {
                                 preparedStatement.close();
                             catch  (SQLException ignore) {
                             }
                         }
                     }
                     long  cost = System.currentTimeMillis()-start;
 
                     // next second, align second
                     try  {
                         if  (cost <  1000 ) {
                             TimeUnit.MILLISECONDS.sleep( 1000  - System.currentTimeMillis()% 1000 );
                         }
                     catch  (InterruptedException e) {
                         if  (!toStop) {
                             logger.error(e.getMessage(), e);
                         }
                     }
 
                 }
                 logger.info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop" );
             }
         });
         scheduleThread.setDaemon( true );
         scheduleThread.setName( "xxl-job, admin JobScheduleHelper#scheduleThread" );
         scheduleThread.start();
 
 
         // ring thread
         ringThread =  new  Thread( new  Runnable() {
             @Override
             public  void  run() {
 
                 // align second
                 try  {
                     TimeUnit.MILLISECONDS.sleep( 1000  - System.currentTimeMillis()% 1000  );
                 catch  (InterruptedException e) {
                     if  (!toStop) {
                         logger.error(e.getMessage(), e);
                     }
                 }
 
                 int  lastSecond = - 1 ;
                 while  (!toStop) {
 
                     try  {
                         // second data
                         List<Integer> ringItemData =  new  ArrayList<>();
                         int  nowSecond = ( int )((System.currentTimeMillis()/ 1000 )% 60 );    // 避免處理耗時太長,跨過刻度;
                         if  (lastSecond == - 1 ) {
                             lastSecond = (nowSecond+ 59 )% 60 ;
                         }
                         for  ( int  i =  1 ; i <= 60 ; i++) {
                             int  secondItem = (lastSecond+i)% 60 ;
 
                             List<Integer> tmpData = ringData.remove(secondItem);
                             if  (tmpData !=  null ) {
                                 ringItemData.addAll(tmpData);
                             }
 
                             if  (secondItem == nowSecond) {
                                 break ;
                             }
                         }
                         lastSecond = nowSecond;
 
                         // ring trigger
                         logger.debug( ">>>>>>>>>>> xxl-job, time-ring beat : "  + nowSecond +  " = "  + Arrays.asList(ringItemData) );
                         if  (ringItemData!= null  && ringItemData.size()> 0 ) {
                             // do trigger
                             for  ( int  jobId: ringItemData) {
                                 // do trigger
                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, - 1 null null );
                             }
                             // clear
                             ringItemData.clear();
                         }
                     catch  (Exception e) {
                         if  (!toStop) {
                             logger.error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}" , e);
                         }
                     }
 
                     // next second, align second
                     try  {
                         TimeUnit.MILLISECONDS.sleep( 1000  - System.currentTimeMillis()% 1000 );
                     catch  (InterruptedException e) {
                         if  (!toStop) {
                             logger.error(e.getMessage(), e);
                         }
                     }
                 }
                 logger.info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop" );
             }
         });
         ringThread.setDaemon( true );
         ringThread.setName( "xxl-job, admin JobScheduleHelper#ringThread" );
         ringThread.start();
     }
 
     public  void  toStop(){
         toStop =  true ;
 
         // interrupt and wait
         scheduleThread.interrupt();
         try  {
             scheduleThread.join();
         catch  (InterruptedException e) {
             logger.error(e.getMessage(), e);
         }
 
         // interrupt and wait
         ringThread.interrupt();
         try  {
             ringThread.join();
         catch  (InterruptedException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
}

                       通過代碼,我們可以發現調度中心由兩個線程完成,第一個線程不停的取最近10s鍾待開始的任務,把任務放入時間輪中,第二個線程從時間輪中獲取需要開始的任務,開始執行任務。

                      當然任務調度還可以使用DelayQueue(https://soulmachine.gitbooks.io/system-design/content/cn/task-scheduler.html

 

 

        定時任務一直有一個頭疼的問題,就是高頻調度的執行時間比較長的任務,一般建議指定到單獨一台主機上並保證在單機上任務不會並發執行來解決。

 

 

    4、分布式定時任務中依賴任務的解決方案

          1)任務依賴不支持環,只支持DAG;

              如:A->B->(C,D)->E    其中CD並行,其余串行

          2)下游任務只支持上游所有任務都成功並調度時間到了,才執行任務;

                如:

               

                JobA只有在Job1,Job2,Job3都執行完,並且3點時間到了才能執行。

          3)不支持有不同調度周期的任務存在依賴關系

               如:A->B      B的前置任務為A, A的調度周期為每15分鍾調度一次, B為每天早上1點調度,該任務不建議分布式調度中心執行。

               不支持原因:

               1)改種情況在具體業務中比較少;

               2)支持改種流程會提升分布式定時任務對負責度同時很難判斷前置任務是成功還是失敗;

               3)建議把A任務拆分為兩個任務,一個為B對前置任務A1,一個為每15分鍾執行一次(調度時間過濾掉A1)的任務

       

       實現:

       

 

          在任務回調成功之后,查詢任務到依賴任務,開始執行。

         這里面有幾個問題需要解決:

         1、任務重復執行:

         

           如上面任務,JobA依賴Job1,Job2,Job3執行,同時JobA3點也會調度執行,在3點左右時,Job3執行完后會執行JobA,同時cron調度也會執行JobA,在這種情況怎么保證JobA只被執行一次。

                 解決辦法:在JobA執行前需要把JobA的狀態修改為正在執行中,此時,通過update  where jobId = #{jobId} and status=#{未開始執行} 方法執行更新,如果更新記錄為1的,任務可以進行執行,如果更新記錄為0,拋棄該任務的執行。

           2、怎么判斷任務該不該執行

                

                 條件一:1點鍾Job1執行完了,開始找后置任務JobA,JobA是否該執行?怎么判斷?

                                JobA不該執行,前置任務Job2,Job3 都沒開始執行,Job1不能執行;

                 條件二:3點鍾Job3執行完了,開始找后置任務JobA,JobA是否該執行?怎么判斷?

                                JobA不該執行,前置任務Job1,Job2,Job3 都執行完了,但是Cron時間還沒到,Job1不能執行;

                 條件三:3點15分調度器開始調度,JobA是否該執行,怎么判斷?

                               JobA該執行,前置任務Job1,Job2,Job3 都執行完了,Cron時間也到了;   

                 判斷任務是否執行的邏輯: 如果JobA執行時,需要判斷Job1,Job2,Job3是否執行,下面拿Job1為例

                 假設Job1的歷史任務都是正常執行成功的。

                 情況1:  2019-06-26 00:30:00(today)時,Job1的上一次執行成功時間為2019-06-25:01:00:00 (lastDay),下一次執行時間為:2019-06-26 01:00:00(nextDay).

   

                情況2:  2019-06-26 01:30:00時,Job1的上一次執行成功時間為2019-06-26:01:00:00,下一次執行時間為:2019-06-27 01:00:00.

       

                 

             

             3、任務失敗了,怎么辦?

                    任務失敗應該同時執行帶依賴執行和不帶依賴執行,由頁面配置控制。

             

              4、任務失敗了,頁面配置執行任務時,是否可傳參數,參數怎么在任務間傳遞?

                    頁面配置傳參數時,參數需要傳遞給依賴任務。

              5、查看任務執行狀態時,是否可以查看依賴到表執行情況? 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM