從定時器的選型,到透過源碼看XXL-Job(下)


透過源碼看xxl-job

(注:本文基於xxl-job最新版v2.0.2, quartz版本為 v2.3.1。 以下提到的調度中心均指xxl-job-admin項目)

上回說到,xxl-job是一個中心化的設計方案,分為了調度中心執行器兩部分。其本質上仍然是對quartz的封裝。那么,我們就分別通過“調度中心” 和 “執行器” 來看看它是怎么運作的。

調度中心

初始化

由於是spring boot應用,因此先從配置看起。

XxlJobDynamicSchedulerConfig

相關的初始化是在XxlJobDynamicSchedulerConfig中完成的,下面我們來看XxlJobDynamicSchedulerConfig源碼。

@Configuration

public class XxlJobDynamicSchedulerConfig {   @Bean   public SchedulerFactoryBean getSchedulerFactoryBean(DataSource dataSource){     SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();     schedulerFactory.setDataSource(dataSource);     schedulerFactory.setAutoStartup(true); // 自動啟動     schedulerFactory.setStartupDelay(20); // 延時啟動,應用啟動成功后在啟動     schedulerFactory.setOverwriteExistingJobs(true); // 覆蓋DB中JOB:true、以數據庫中已經存在的為准:false     schedulerFactory.setApplicationContextSchedulerContextKey("applicationContext");     schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));     return schedulerFactory;   }   @Bean(initMethod = "start", destroyMethod = "destroy")   public XxlJobDynamicScheduler getXxlJobDynamicScheduler(SchedulerFactoryBean schedulerFactory){     Scheduler scheduler = schedulerFactory.getScheduler();     XxlJobDynamicScheduler xxlJobDynamicScheduler = new XxlJobDynamicScheduler();     xxlJobDynamicScheduler.setScheduler(scheduler);     return xxlJobDynamicScheduler;   } }

由上可知 XxlJobDynamicSchedulerConfig 主要是創建了 SchedulerFactoryBean 對XxlJobDynamicScheduler 對象。SchedulerFactoryBean創建調度器(Scheduler, 沒錯,它就是quartz中的Scheduler對象), XxlJobDynamicScheduler持有對Scheduler對象的引用。

那么,SchedulerFactoryBean 是如何創建 Scheduler的呢,接下來,我們再看看SchedulerFactoryBean 。

 

SchedulerFactoryBean

SchedulerFactoryBean實現了InitializingBean, 其主要初始化流程在 afterPropertiesSet 方法中。

@Override
public void afterPropertiesSet() throws Exception {
  if (this.dataSource == null && this.nonTransactionalDataSource != null) {
  this.dataSource = this.nonTransactionalDataSource;
  }

  if (this.applicationContext != null && this.resourceLoader == null) {
  this.resourceLoader = this.applicationContext;
  }

  // 初始化scheduler
  this.scheduler = prepareScheduler(prepareSchedulerFactory());
  try {
  registerListeners();
  registerJobsAndTriggers();
  } catch (Exception ex) {
    try {
    this.scheduler.shutdown(true);
    } catch (Exception ex2) {
      logger.debug("Scheduler shutdown exception after registration failure", ex2);
    }
    throw ex;
  }
}

以上,先通過prepareSchedulerFactory方法創建ScheduleFactory對象(quartz),再通過prepareScheduler方法創建Scheduler對象。

private SchedulerFactory prepareSchedulerFactory() throws SchedulerException, IOException {
    SchedulerFactory schedulerFactory = this.schedulerFactory;
    if (schedulerFactory == null) {
      //默認為StdSchedulerFactory
      schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
      if (schedulerFactory instanceof StdSchedulerFactory) {
      //解析處理配置
        initSchedulerFactory((StdSchedulerFactory) schedulerFactory);
      } else if (this.configLocation != null || this.quartzProperties != null ||  this.taskExecutor != null || this.dataSource != null) {
        throw new IllegalArgumentException("StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory);
      }
    }
    return schedulerFactory;
}

 

接下來,我們看看prepareScheduler方法是怎么創建scheduler對象的。

protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException {
.........
  //已省略部分我們本次不用關心的代碼 
  try {
    SchedulerRepository repository = SchedulerRepository.getInstance();
    synchronized (repository) {
      Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
      //通過schedulerFactory創建scheduler, 重點關注。前往quartz中一探究竟
      Scheduler newScheduler = schedulerFactory.getScheduler();
      if (newScheduler == existingScheduler) {
        throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
      }
      if (!this.exposeSchedulerInRepository) {
        SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
      }
      return newScheduler;
    }
  } finally {
    if (overrideClassLoader) {
      // Reset original thread context ClassLoader.
      currentThread.setContextClassLoader(threadContextClassLoader);
    }
  }
}

 

ok, 接着前往quartz中一探究竟。

 

StdSchedulerFactory

public Scheduler getScheduler() throws SchedulerException {
  if (cfg == null) {
    initialize();
  }
  SchedulerRepository schedRep = SchedulerRepository.getInstance();
  Scheduler sched = schedRep.lookup(getSchedulerName());

  //如果存在該對象,則直接返回
  if (sched != null) {
    if (sched.isShutdown()) {
      schedRep.remove(getSchedulerName());
    } else {
      return sched;
    }
  }

  //重點關注
  sched = instantiate();
  return sched;
}

 

下面就重點看看instantiate方法。

private Scheduler instantiate() throws SchedulerException {
....

  QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
  rsrcs.setName(schedName);
  rsrcs.setThreadName(threadName);
  rsrcs.setInstanceId(schedInstId);
  rsrcs.setJobRunShellFactory(jrsf);
  rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
  rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
  rsrcs.setBatchTimeWindow(batchTimeWindow);
  rsrcs.setMaxBatchSize(maxBatchSize);
  rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
  rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
  rsrcs.setJMXExport(jmxExport);
  rsrcs.setJMXObjectName(jmxObjectName);

  SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);

  rsrcs.setThreadExecutor(threadExecutor);
  threadExecutor.initialize();

  rsrcs.setThreadPool(tp);
  if(tp instanceof SimpleThreadPool) {
    if(threadsInheritInitalizersClassLoader)
    ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
  }
  tp.initialize();
  tpInited = true;

  rsrcs.setJobStore(js);

  // add plugins
  for (int i = 0; i < plugins.length; i++) {
    rsrcs.addSchedulerPlugin(plugins[i]);
  }

  //創建QuartzScheduler對象,重點關注,此為Quartz核心部分
  qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
  qsInited = true;

  // 創建Scheduler對象,QuartzScheduler並未直接實現Scheduler接口,而是作為了Scheduler的委托者
  Scheduler scheduler = instantiate(rsrcs, qs);
  ...
}

protected Scheduler instantiate(QuartzSchedulerResources rsrcs, QuartzScheduler qs) {
  Scheduler scheduler = new StdScheduler(qs);
  return scheduler;
}

 

以上代碼是經過大刀闊斧砍掉過的,原代碼十分長,通篇下來主要是根據配置去創建一系列的對象,所有的對象最終都將被以上代碼中的 QuartzSchedulerResources 對象所持有,這些對象共同協作才能最終組裝出Quartz這台"機器", 通過以上代碼也可大致窺探出創建了哪些對象實例,這些對象實例的創建大多都可通過quartz.properties進行配置。

其中,我們更應該關注的是 QuartzScheduler 對象的創建,它實則為Quartz的心臟。

 

QuartzScheduler

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException {
  this.resources = resources;
  if (resources.getJobStore() instanceof JobListener) {
  addInternalJobListener((JobListener)resources.getJobStore());
  }

  //創建QuartzSchedulerThread對象,重點關注,此線程負責任務調度
  this.schedThread = new QuartzSchedulerThread(this, resources);
  ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
  //DefaultThreadExecutor對象,該方法的作用是啟動schedThread線程
  schedThreadExecutor.execute(this.schedThread);
  if (idleWaitTime > 0) {
    this.schedThread.setIdleWaitTime(idleWaitTime);
  }

  jobMgr = new ExecutingJobsManager();
  addInternalJobListener(jobMgr);
  errLogger = new ErrorLogger();
  addInternalSchedulerListener(errLogger);

  signaler = new SchedulerSignalerImpl(this, this.schedThread);

  getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}

 

以上代碼,主要是創建了QuartzSchedulerThread對象,然后通過DefaultThreadExecutor進行啟動。

 

QuartzSchedulerThread

QuartzSchedulerThread實現自Thread,我們接下來就看看其核心代碼。

@Override
public void run() {
  int acquiresFailed = 0;

  //是否結束循環\調度
  while (!halted.get()) {
    try {
      synchronized (sigLock) {
      //如果是暫停狀態,則在此阻塞,直到外部更改狀態
        while (paused && !halted.get()) {
          try {
            sigLock.wait(1000L);
          } catch (InterruptedException ignore) {
          }
          acquiresFailed = 0;
        }

        if (halted.get()) {
          break;
        }
      }

      ......

      //獲取可用線程數量 
      int availThreadCount =qsRsrcs.getThreadPool().blockForAvailableThreads();
      if(availThreadCount > 0) { 
        List<OperableTrigger> triggers;
        long now = System.currentTimeMillis();
        clearSignaledSchedulingChange();
      //從DB中取出一批即將要執行的Trigger(觸發器), DB中該數據狀態也會同步進行修改
        try {
          triggers = qsRsrcs.getJobStore().acquireNextTriggers(
          now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
          acquiresFailed = 0;
          if (log.isDebugEnabled())
          log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
        } catch (JobPersistenceException jpe) {
          if (acquiresFailed == 0) {
            qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.",jpe);
          }
          if (acquiresFailed < Integer.MAX_VALUE)
          acquiresFailed++;
          continue;
        } catch (RuntimeException e) {
          if (acquiresFailed == 0) {
            getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e);
          }
          if (acquiresFailed < Integer.MAX_VALUE)
          acquiresFailed++;
          continue;
        }

        if (triggers != null && !triggers.isEmpty()) {

          ......


          List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

          boolean goAhead = true;
          synchronized(sigLock) {
            goAhead = !halted.get();
          }
          if(goAhead) {
            //取出觸發器對應的任務,同步修改相關DB中的記錄狀態,並調整下次執行時間
            try {
              List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
              if(res != null)
                bndles = res;
            } catch (SchedulerException se) {
              qs.notifySchedulerListenersError("An error occurred while firing triggers '"+ triggers + "'", se);
              for (int i = 0; i < triggers.size(); i++) {
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
              }
              continue;
            }

          }

          //真正執行的方法,包裝為JobRunShell, 並從線程池中獲取線程進行執行
          for (int i = 0; i < bndles.size(); i++) {
            TriggerFiredResult result = bndles.get(i);
            TriggerFiredBundle bndle = result.getTriggerFiredBundle();
            Exception exception = result.getException();

            if (exception instanceof RuntimeException) {
              getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
              qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
              continue;
            }

            if (bndle == null) {
              qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
              continue;
            }

            JobRunShell shell = null;
            try {
              shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
              shell.initialize(qs);
            } catch (SchedulerException se) {
              qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
              continue;
            }

            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
              getLog().error("ThreadPool.runInThread() return false!");
              qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
            }

          }
          continue; // while (!halted)
        }
      } else {
        continue; // while (!halted)
    }

    ......
}

 

以上代碼同樣進行過精簡,該方法為quartz的核心調度流程。由於內部業務較為復雜,只在代碼上加了簡單的注釋,不過主要流程就是 從DB中獲取Trigger觸發器和Job(任務), 同時通過更新DB數據狀態來防止集群下的“爭搶”,通過線程的wait和notify機制來協同線程調度,最終從線程池中獲取線程來執行我們的任務。

ok, 到此,quartz這顆小心臟就已經跳動起來了。

那么,到此結束?

No, 一切才剛剛開始! 說好的xxl-job呢?

 

XxlJobDynamicScheduler

回到XxlJobDynamicSchedulerConfig,我們發現在初始化XxlJobDynamicScheduler對象后,會調用其start方法。那么,我們進入其start方法一探究竟。

public void start() throws Exception {
  // valid
  Assert.notNull(scheduler, "quartz scheduler is null");

  // 國際化
  initI18n();

  // 啟動維護執行器注冊信息守護線程
  JobRegistryMonitorHelper.getInstance().start();

  // 啟動執行失敗的任務掃描守護線程
  JobFailMonitorHelper.getInstance().start();

  // 初始化RPC (接收執行器注冊和回調等), 在分析執行器的時候再來看,本次不看
  initRpcProvider();

  logger.info(">>>>>>>>> init xxl-job admin success.");
}

 

當執行器自動注冊后,調度中心是如何去維護它的呢? 答案就在 JobRegistryMonitorHelper 線程里。

 

JobRegistryMonitorHelper

public void start(){
  registryThread = new Thread(new Runnable() {
    @Override
    public void run() {
      while (!toStop) {
        try {
          // 從XXL_JOB_QRTZ_TRIGGER_GROUP表中獲取自動注冊類型的執行器
          List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
          if (groupList!=null && !groupList.isEmpty()) {
            //注冊信息記錄在XXL_JOB_QRTZ_TRIGGER_REGISTRY表,刪除90秒沒心跳機器
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(RegistryConfig.DEAD_TIMEOUT);

            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
            //XXL_JOB_QRTZ_TRIGGER_REGISTRY表獲取存活的機器
            List<XxlJobRegistry> list =XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
            //appname 相同的形成集群
            if (list != null) {
              for (XxlJobRegistry item: list) {
                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                  String appName = item.getRegistryKey();
                  List<String> registryList = appAddressMap.get(appName);
                  if (registryList == null) {
                    registryList = new ArrayList<String>();
                  }

                  if (!registryList.contains(item.getRegistryValue())) {
                    registryList.add(item.getRegistryValue());
                  }
                  appAddressMap.put(appName, registryList);
                }
              }
            }

            // 維護集群地址(XXL_JOB_QRTZ_TRIGGER_GROUP表,地址逗號分隔)
            for (XxlJobGroup group: groupList) {
              List<String> registryList = appAddressMap.get(group.getAppName());
              String addressListStr = null;
                if (registryList!=null && !registryList.isEmpty()) {
                  Collections.sort(registryList);  
                  addressListStr = "";
                  for (String item:registryList) {
                    addressListStr += item + ",";
                  }                  
                  addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                }
                group.setAddressList(addressListStr);
                XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
              }
            }
          } catch (Exception e) {
            ......
          }
          try {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
          } catch (InterruptedException e) {
            ......
          }
        }
        logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
      }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
    registryThread.start();
  }

 

關於注冊信息的維護比較簡單,就是定時檢查有沒心跳,心跳體現在DB中(通過每次更新DB記錄時間,來表示存活)。一定時間窗口內(默認90秒)沒更新心跳的,就認為已經dead, 直接剔除,然后維護當前存活機器的地址。

 

JobFailMonitorHelper

當任務執行失敗時,我們需要收到郵件報警。甚至有時候我們需要任務進行自動重試,那么,xxl-job是如何實現的呢? 答案就在 JobFailMonitorHelper 中。

public void start(){
  monitorThread = new Thread(new Runnable() {

  @Override
  public void run() {

  // monitor
    while (!toStop) {
      try {
        //XXL_JOB_QRTZ_TRIGGER_LOG表中記錄的是任務執行
        //從XXL_JOB_QRTZ_TRIGGER_LOG表中取出執行失敗的記錄
        List<Integer> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
        if (failLogIds!=null && !failLogIds.isEmpty()) {
          for (int failLogId: failLogIds) {
          //鎖定日志記錄
          int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
          if (lockRet < 1) {
            continue;
          }
          XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
          //XXL_JOB_QRTZ_TRIGGER_INFO表中獲取任務詳情
          XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
  
          // 沒達到最大重試次數,則進行重試,日志中記錄的就是剩余的重試次數
          if (log.getExecutorFailRetryCount() > 0) {
          //發起重試(觸發流程參考后面章節)
            JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), null);
            .......
            //更新日志 
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
          }

          // 失敗任務報警
          // 0-默認、-1=鎖定狀態、1-無需告警、2-告警成功、3-告警失敗
          int newAlarmStatus = 0;    
          if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
            boolean alarmResult = true;
            try {
              alarmResult = failAlarm(info, log);
            } catch (Exception e) {
              alarmResult = false;
              logger.error(e.getMessage(), e);
            }
            newAlarmStatus = alarmResult?2:3;
          } else {
            newAlarmStatus = 1;
          }
          //更新報警狀態
          XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
        }
      }

      TimeUnit.SECONDS.sleep(10);
      } catch (Exception e) {
        ......
      }
    }
    logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
    }
  });
  monitorThread.setDaemon(true);
  monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
  monitorThread.start();
}

 

至此,調度中心我們關心的主要流程就已經初始化完畢。現在,我們大致清楚了xxl-job初始化流程,調度中心對於我們而言,其核心功能無非對任務進行增刪改查的管理以及觸發和停止,增刪改查還好,其實質就是對於DB的CRUD操作,但是觸發調度和停止任務是怎么做的呢? 由於xxl-job是調度中心和執行器分離的,所以,上述問題換句話來說就是兩者間是如何通信的。

答案就是RPC, 接下來,我們通過調度一個任務,來看看其執行流程。

 

執行流程

打開調度中心頁面,在任務操作欄點擊 “啟動” 按鈕,會發現其請求路徑為 “/jobinfo/start”, 都到這一步了,學WEB是不是該秒懂,馬上前往 /jobinfo/start。

@Controller
@RequestMapping("/jobinfo")
public class JobInfoController {
  ......

  @RequestMapping("/start")
  @ResponseBody
  public ReturnT<String> start(int id) {
    return xxlJobService.start(id);
  }

  ......
}

 

@Service
public class XxlJobServiceImpl implements XxlJobService {
  ......

  @Override
  public ReturnT<String> start(int id) {
    //XXL_JOB_QRTZ_TRIGGER_INFO表獲取任務信息
    XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
    String name = String.valueOf(xxlJobInfo.getId());
    //獲取cron表達式
    String cronExpression = xxlJobInfo.getJobCron();

    try {
      boolean ret = XxlJobDynamicScheduler.addJob(name, cronExpression);
      return ret?ReturnT.SUCCESS:ReturnT.FAIL;
    } catch (SchedulerException e) {
      logger.error(e.getMessage(), e);
      return ReturnT.FAIL;
    }
  }
}

 

public class XxlJobDynamicScheduler {
  public static boolean addJob(String jobName, String cronExpression) throws SchedulerException {
  ......

  CronTrigger cronTrigger = TriggerBuilder.newTrigger().
  withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();

  // 任務最終將轉換為RemoteHttpJobBean
  Class<? extends Job> jobClass_ = RemoteHttpJobBean.class; 
  JobDetail jobDetail = JobBuilder.newJob(jobClass_).withIdentity(jobKey).build();

  // 通過quartz的scheduler (StdScheduler)調度任務
  Date date = scheduler.scheduleJob(jobDetail, cronTrigger);
  
  return true;
  }
}

 

public class StdScheduler {

  ...
  private QuartzScheduler sched;
  ...

  public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException {
  //來到了我們之前說的quartz的心臟部分QuartzScheduler
    return sched.scheduleJob(jobDetail, trigger);
  }
}

 

public class QuartzScheduler implements RemotableQuartzScheduler {

  ......
  private SchedulerSignaler signaler;
  ......

  public Date scheduleJob(JobDetail jobDetail,
    Trigger trigger) throws SchedulerException {

    ......
    //喚醒線程
    notifySchedulerThread(trigger.getNextFireTime().getTime());
    ......

    return ft;
  }

  protected void notifySchedulerThread(long candidateNewNextFireTime) {
    if (isSignalOnSchedulingChange()) {
      //通過SchedulerSignalerImpl會調用到signalSchedulingChange方法
      //SchedulerSignalerImpl.schedThread.signalSchedulingChange(candidateNewNextFireTime);
      signaler.signalSchedulingChange(candidateNewNextFireTime);
    }
  }


  public void signalSchedulingChange(long candidateNewNextFireTime) {
    synchronized(sigLock) {
      signaled = true;
      signaledNextFireTime = candidateNewNextFireTime;
      //喚醒線程
      sigLock.notifyAll();
    }
  }
}

 

至此,一切又回到了我們之前介紹過的 QuartzScheduler。剛剛,提到我們的任務類型最終會被注冊為RemoteHttpJobBean,這發生在哪一步? 其實就發生在 JobRunShell (之前提到所有任務都會被包裝為JobRunShell 對象,然后在線程池中獲取線程執行)中的initialize方法。

public class JobRunShell extends SchedulerListenerSupport implements Runnable {
  ......

  public void initialize(QuartzScheduler sched) throws SchedulerException {
    this.qs = sched;

    Job job = null;
    JobDetail jobDetail = firedTriggerBundle.getJobDetail();

    try {
      //最終通過jobdetail的jobClass創建實例, 
      //這個jobClass正是我們上面設置的 RemoteHttpJobBean
      job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
    } catch (SchedulerException se) {
      sched.notifySchedulerListenersError("An error occured instantiating job to be executed. job= '"+ jobDetail.getKey() + "'", se);
      throw se;
    } catch (Throwable ncdfe) { // such as NoClassDefFoundError
      SchedulerException se = new SchedulerException("Problem instantiating class '"+ jobDetail.getJobClass().getName() + "' - ", ncdfe);
      sched.notifySchedulerListenersError("An error occured instantiating job to be executed. job= '"+ jobDetail.getKey() + "'", se);
      throw se;
    }

    this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
  }

  ......

  //啟動
  public void run() {
    qs.addInternalSchedulerListener(this);

      try {
        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
        JobDetail jobDetail = jec.getJobDetail();
      
        do {

          JobExecutionException jobExEx = null;
          Job job = jec.getJobInstance();

          ......
          try {
            log.debug("Calling execute on job " + jobDetail.getKey());
            //執行任務,調用RemoteHttpJobBean的executeInternal方法
            job.execute(jec);
            endTime = System.currentTimeMillis();
          } catch (JobExecutionException jee) {
            ......
          } catch (Throwable e) {
            ......
          }

          jec.setJobRunTime(endTime - startTime);

          ......

          qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
          break;
        } while (true);

      } finally {
        qs.removeInternalSchedulerListener(this);
      }
    }
  }

 

以上,JobRunShell線程啟動時,最終會調用RemoteHttpJobBean的executeInternal方法。

public class RemoteHttpJobBean extends QuartzJobBean {
  private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class);

  @Override
  protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

    // load jobId
    JobKey jobKey = context.getTrigger().getJobKey();
    Integer jobId = Integer.valueOf(jobKey.getName());

    // 實際調用JobTriggerPoolHelper.addTrigger方法,看下面代碼
    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
  }

}

 

public class JobTriggerPoolHelper {
  ......
  private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
  ......

  public static void trigger(int jobId, TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam) {
    helper.addTrigger(jobId, triggerType, failRetryCount, 
    executorShardingParam, executorParam);
  }


  public void addTrigger(final int jobId, final TriggerTypeEnum triggerType,final int failRetryCount, final String executorShardingParam, final String executorParam) {

    // 根據任務執行時間進行了線程池隔離,分快慢兩個線程池,默認為快線程池
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    //在一定窗口期內(默認1分鍾)達到條件(時間大於500毫秒10次)則進入慢線程池
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
      triggerPool_ = slowTriggerPool;
    }

    // 通過線程池執行
    triggerPool_.execute(new Runnable() {
      @Override
      public void run() {
        long start = System.currentTimeMillis();
        try {
          // 重點關注,到此時才是真正觸發執行
          XxlJobTrigger.trigger(jobId, triggerType, failRetryCount,executorShardingParam, executorParam);
        } catch (Exception e) {
          logger.error(e.getMessage(), e);
        } finally {

          // 時間窗口為1分鍾,超過就清空,進入下一個周期
          long minTim_now = System.currentTimeMillis()/60000;
          if (minTim != minTim_now) {
            minTim = minTim_now;
            jobTimeoutCountMap.clear();
          }

          // 每超過500毫秒就記錄超時一次
          long cost = System.currentTimeMillis()-start;
          if (cost > 500) {
            AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1));
            if (timeoutCount != null) {
              timeoutCount.incrementAndGet();
            }
          }

        }

      }
    });
  }
}

 

以上代碼,我們可以清楚看到xxl-job對於線程池隔離的處理規則,其實對於我們在設計同類問題的時候還是具有一定的參考價值。當然,本段代碼最值得我們關注的還是其真正調用了XxlJobTrigger的trigger方法,這才是最終真正觸發任務執行的。作了這么多准備,似乎好戲才真正開始。

 

public class XxlJobTrigger {
  ......

  public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
  // XXL_JOB_QRTZ_TRIGGER_INFO表獲取任務信息
  XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
  if (jobInfo == null) {
    logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
    return;
  }
  if (executorParam != null) {
    jobInfo.setExecutorParam(executorParam);
  }
  //算出失敗重試次數
  int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount :
  jobInfo.getExecutorFailRetryCount();

  //XXL_JOB_QRTZ_TRIGGER_GROUP表獲取執行器相關信息
  XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

  // 如果有分片,就進行分片處理
  int[] shardingParam = null;
  if (executorShardingParam!=null){
    String[] shardingArr = executorShardingParam.split("/");
      if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
        shardingParam = new int[2];
        //分片序號
        shardingParam[0] = Integer.valueOf(shardingArr[0]);
        //總分片數
        shardingParam[1] = Integer.valueOf(shardingArr[1]);
      }
    }

    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==
      ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
      && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
      && shardingParam==null) {
      //如果是SHARDING_BROADCAST(分片廣播策略),則對應所有執行器都將被觸發
        for (int i = 0; i < group.getRegistryList().size(); i++) {
          //觸發方法,重點關注,代碼緊接
          processTrigger(group, jobInfo, finalFailRetryCount, 
          triggerType, i, group.getRegistryList().size());
        }
    } else {
      if (shardingParam == null) {
        shardingParam = new int[]{0, 1};
    }
    //只觸發一次
    processTrigger(group, jobInfo, finalFailRetryCount, 
    triggerType, shardingParam[0], shardingParam[1]);
  }

}

......

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
  // 阻塞處理策略
  ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
  //路由策略
  ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
  //分片參數
  String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

  // 記錄日志
  XxlJobLog jobLog = new XxlJobLog();
  ......
  XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);

  // 組裝TriggerParam參數
  TriggerParam triggerParam = new TriggerParam();
  ......

  // 獲取相應的執行器地址
  String address = null;
  ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
      if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
        //如果是分片廣播,就根據當前分片序號,取出執行器地址
        if (index < group.getRegistryList().size()) {
          address = group.getRegistryList().get(index);
        } else {
          address = group.getRegistryList().get(0);
        }
      } else {
      //根據路由策略獲取相應執行器地址
      //一些列路由策略繼承自ExecutorRouter
      routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
      if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
        address = routeAddressResult.getContent();
      }
    }
  } else {
    routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
  }

  //執行
  ReturnT<String> triggerResult = null;
  if (address != null) {
    //經過一系列組裝參數,路由選址后,最終開始執行,該方法在下面,重點關注
    triggerResult = runExecutor(triggerParam, address);
  } else {
    triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
  }
  ......
  //更新日志 
  XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

  logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
    

  /**
  * 最終執行的地方
  * @param triggerParam
  * @param address
  * @return
  */
  public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
  ReturnT<String> runResult = null;
    try {
      //此處獲取的為代理對象(注意)
      ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
      //真正執行的為代理對象
      runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
      ......
    }

    ......

    return runResult;
  }
}

 

到此,我們離真相只差最后一步了。上面獲取ExecutorBiz對象,然后通過ExecutorBiz進行最終執行,特別需要注意的是獲取到的ExecutorBiz是個代理對象。如果沒打開XxlJobDynamicScheduler.getExecutorBiz進行查看,直接點run, 你會覺得你的打開方式沒對。

那么,最后,我們就來通過這個代理對象解開最后謎題吧。

 

public final class XxlJobDynamicScheduler {
  ......
  private static ConcurrentHashMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();

  /**
  * 獲取ExecutorBiz代理對象
  * @param address
  * @return
  * @throws Exception
  */
  public static ExecutorBiz getExecutorBiz(String address) throws Exception {
  if (address==null || address.trim().length()==0) {
    return null;
  }

  // 從緩存中獲取
  address = address.trim();
  ExecutorBiz executorBiz = executorBizRepository.get(address);
  if (executorBiz != null) {
    return executorBiz;
  }

    // 創建獲取代理對象(重點看getObject方法)
    executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
    NetEnum.NETTY_HTTP,
    Serializer.SerializeEnum.HESSIAN.getSerializer(),
    CallType.SYNC,
    LoadBalance.ROUND,
    ExecutorBiz.class,
    null,
    5000,
    address,
    XxlJobAdminConfig.getAdminConfig().getAccessToken(),
    null,
    null).getObject();

    //設置緩存
    executorBizRepository.put(address, executorBiz);
    return executorBiz;
  }
}

 

 

看看代理對象內部實現

public class XxlRpcReferenceBean {

  ......

  //重點關注的方法, 被代理對象的run方法最終會到此對象的invoke 
  public Object getObject() {
  return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { iface },
    new InvocationHandler() {
      @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ......
        // 組裝RPC請求參數
        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
        xxlRpcRequest.setAccessToken(accessToken);
        xxlRpcRequest.setClassName(className);
        xxlRpcRequest.setMethodName(methodName);
        xxlRpcRequest.setParameterTypes(parameterTypes);
        xxlRpcRequest.setParameters(parameters);
    

        ......         
//最終都會通過此方法發起RPC         //此處的client為上面創建代理對象時傳入的NetEnum.NETTY_HTTP         //即NettyHttpClient對象         //最終會通過netty來與執行器發起通信,細節不再繼續追溯         client.asyncSend(finalAddress, xxlRpcRequest);         ......       }     });   } }

 

到此,xxl-job調度中心的初始化和調度執行流程,我們大概都知道了。那么,當調度中心向執行器發起調度請求時,執行器又是怎么做的呢?

那就還得再從執行器的初始化說起。

 

 執行器

我們還是以spring boot版本的執行器為例。

初始化

首先會創建並初始化 XxlJobSpringExecutor實例,如下:

@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {

    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppName(appName);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    return xxlJobSpringExecutor;
}

 

在初始化完成后會調用 XxlJobSpringExecutor 的start方法。

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {

    @Override
    public void start() throws Exception {
        // JobHandler注解名與spring 托管的bean(我們的job)建立映射關系並緩存到Map
        initJobHandlerRepository(applicationContext);

        // 指定使用SpringGlueFactory, 不在我們本次探討范圍,暫時忽略
        GlueFactory.refreshInstance(1);
// 調用父類XxlJobExecutor的start方法 super.start(); } }

 

我們看看initJobHandlerRepository方法。

private void initJobHandlerRepository(ApplicationContext applicationContext){

    if (applicationContext == null) {
        return;
    }
// 獲取帶有@JobHandler修飾的bean Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); if (serviceBeanMap!=null && serviceBeanMap.size()>0) { for (Object serviceBean : serviceBeanMap.values()) { if (serviceBean instanceof IJobHandler){ //獲取@JobHandler值 String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); IJobHandler handler = (IJobHandler) serviceBean; if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler naming conflicts."); } //緩存到Map. 建立映射關系 registJobHandler(name, handler); } } } } public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ return jobHandlerRepository.put(name, jobHandler); }

 

接下來看父類XxlJobExecutor 的start方法。

public class XxlJobExecutor  {
    ......
    public void start() throws Exception {
        // 設置job的日志目錄
        XxlJobFileAppender.initLogPath(logPath);
        // 初始化AdminBiz代理對象,該代理對象用於與調度中心進行RPC通信
        initAdminBizList(adminAddresses, accessToken);
        // 日志清理線程
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
        // 回調線程(RPC回調到調度中心)
        TriggerCallbackThread.getInstance().start();
        //啟動服務並向調度中心發起注冊請求
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        initRpcProvider(ip, port, appName, accessToken);
    }
    ......
}

 

其中,我們重點關注initAdminBizList 和 initRpcProvider 兩個方法。

......

private static List<AdminBiz> adminBizList;

private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
    //如果是有多個調度中心地址,則創建多個實例
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {         
                //http://調度中心地址/api
                String addressUrl = address.concat(AdminBiz.MAPPING);
                //創建代理對象,似曾相識?
                //這在我們講調度中心的時候,已經講過。
                AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
                    NetEnum.NETTY_HTTP,
                    serializer,
                    CallType.SYNC,
                    LoadBalance.ROUND,
                    AdminBiz.class,
                    null,
                    10000,
                    addressUrl,
                    accessToken,
                    null,
                    null
                ).getObject();

                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }                

                //代理對象加入緩存
                adminBizList.add(adminBiz);
            }
        }
    }
}
......

 

 

接下來,我們再看看 initRpcProvider 這個最關鍵的方法之一,其包含了服務的啟動。

 

......

private XxlRpcProviderFactory xxlRpcProviderFactory = null;

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {

    // 獲取當前服務地址 (ip:port)
    String address = IpUtil.getIpPort(ip, port);
    //組裝注冊參數
    Map<String, String> serviceRegistryParam = new HashMap<String, String>();
    serviceRegistryParam.put("appName", appName);
    serviceRegistryParam.put("address", address);
    xxlRpcProviderFactory = new XxlRpcProviderFactory();

    //最需要注意的是
    //NetEnum.NETTY_HTTP指定使用NettyHttpServer作為我們的服務器
    //ExecutorServiceRegistry為我們的服務注冊的執行器
    xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

    // add services
    xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

    // 啟動服務,並向調度中心發起注冊請求
    xxlRpcProviderFactory.start();
}

......

 

接下來,我們直接看啟動服務的方法。

 

public class XxlRpcProviderFactory {

    ......
    private Server server;
    private ServiceRegistry serviceRegistry;
    private String serviceAddress;

    public void start() throws Exception {
        // 本(執行器)服務的地址
        serviceAddress = IpUtil.getIpPort(this.ip, port);
        // 即上面指定的NettyHttpServer
        server = netType.serverClass.newInstance();
        // 啟動后回調此方法
        server.setStartedCallback(new BaseCallback() {
            @Override
            public void run() throws Exception {
                if (serviceRegistryClass != null) {
                    //即上面指定的ExecutorServiceRegistry
                    serviceRegistry = serviceRegistryClass.newInstance();
                    // 向調度中心發起注冊請求
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        ......

        //啟動
        server.start(this);
    }
    ......
}

 

 

以上,會啟動NettyHttpServer服務, 通過設置啟動回調來向調度中心發起注冊請求。接下來,看看是怎么注冊的。

 

@Override

public void start(Map<String, String> param) {

    //調用ExecutorRegistryThread對象的start方法

    ExecutorRegistryThread.getInstance()

        .start(param.get("appName"), param.get("address"));

}

 

public class ExecutorRegistryThread {

    private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);
    private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
    public static ExecutorRegistryThread getInstance(){
        return instance;
    }
    private Thread registryThread;
    private volatile boolean toStop = false;
    public void start(final String appName, final String address){
        ......
       registryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        //注冊參數
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                //真正發起注冊的方法
                                //adminBiz對象即為我們上面的代理對象
                                //觸發的實際為代理對象的invoke方法
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        if (!toStop) {
                            //默認每隔30S觸發一次注冊
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                        }
                    }
                }

                //移除注冊信息
                ........
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        //啟動線程
        registryThread.start();
    }

    ......

    public void toStop() {
        toStop = true;
        // interrupt and wait
        registryThread.interrupt();
        try {
            registryThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }
}

 

 

以上,通過啟動ExecutorRegistryThread線程進行注冊,最終發起rpc請求的仍然是我們之前(調度中心)介紹的代理對象實例,就不作過多描述,該線程默認情況下會每隔30s發送心跳到調度中心。

 

 以上即為主要初始化流程。那么,我們的執行中心到底是如何接收調度中心發起的調度請求的呢?

 

 

 

執行流程

 

在回到NettyHttpServer的啟動流程。

 

public class NettyHttpServer extends Server  {
private Thread thread;
@Override
public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception { thread = new Thread(new Runnable() { @Override public void run() { ...... try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(5*1024*1024)); //重點關注 ch.pipeline().addLast(new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)); } }).childOption(ChannelOption.SO_KEEPALIVE, true); ...... } ...... } }); thread.setDaemon(true); thread.start(); } ...... }

 

以上值得注意的是,在server啟動時,會初始化NettyHttpServerHandler實例,當請求到來時,會到NettyHttpServerHandler的channelRead0方法。

public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private static final Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class);
    private XxlRpcProviderFactory xxlRpcProviderFactory;
    private ThreadPoolExecutor serverHandlerPool;

    public NettyHttpServerHandler(final XxlRpcProviderFactory xxlRpcProviderFactory, final ThreadPoolExecutor serverHandlerPool) {
        this.xxlRpcProviderFactory = xxlRpcProviderFactory;
        this.serverHandlerPool = serverHandlerPool;
    }
    //處理請求
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        // request parse
        final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());
        final String uri = msg.uri();
        final boolean keepAlive = HttpUtil.isKeepAlive(msg);
        // 通過線程池異步執行
        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                process(ctx, uri, requestBytes, keepAlive);
            }
        });
    }
private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive){ String requestId = null; try { if ("/services".equals(uri)) { // services mapping // request StringBuffer stringBuffer = new StringBuffer("<ui>"); for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) { stringBuffer.append("<li>").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>"); } stringBuffer.append("</ui>"); // response serialize byte[] responseBytes = stringBuffer.toString().getBytes("UTF-8"); // response-write writeResponse(ctx, keepAlive, responseBytes); } else { // valid if (requestBytes.length == 0) { throw new XxlRpcException("xxl-rpc request data empty."); } // request deserialize XxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializer().deserialize(requestBytes, XxlRpcRequest.class); requestId = xxlRpcRequest.getRequestId(); // 處理請求 XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest); // response serialize byte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse); // response-write writeResponse(ctx, keepAlive, responseBytes); } } catch (Exception e) { ...... } } ...... }

 

public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

    ......
    String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
    //取出ExecutorBizImpl實例
    Object serviceBean = serviceData.get(serviceKey);
    ......
    try {
        // 反射調用ExecutorBizImpl對象run方法
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = xxlRpcRequest.getMethodName();
        Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
        Object[] parameters = xxlRpcRequest.getParameters();
        Method method = serviceClass.getMethod(methodName, parameterTypes);
        method.setAccessible(true);
        Object result = method.invoke(serviceBean, parameters);
        xxlRpcResponse.setResult(result);
    } catch (Throwable t) {
        // catch error
        logger.error("xxl-rpc provider invokeService error.", t);
        xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
    }
    return xxlRpcResponse;

}
public class ExecutorBizImpl implements ExecutorBiz {

    ......
    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {

        // 緩存獲取JobThread對象
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {
            // 緩存中獲取IJobHandler對象(即我們的業務job)
            // 之前通過掃描注解存入緩存
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
                jobThread = null;
                jobHandler = null;
            }
            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }
        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";
                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    //從DB中獲取源碼,通過groovy進行加載並實例化
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";
                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                //讀取腳本,寫入文件,最終執行通過commons-exec
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // 阻塞策略
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // 丟棄后續調度
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // 覆蓋之前調度
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // 第一次執行或者是覆蓋之前調度策略
        if (jobThread == null) {
            //開啟線程,執行任務
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // 觸發任務入隊
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }
}

 

至此,我們離真相只差最后一步,最后再看看XxlJobExecutor.registJobThread

 

......
private static ConcurrentHashMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
    //新線程執行
    JobThread newJobThread = new JobThread(jobId, handler);
    //線程執行
    newJobThread.start();
    logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    //放入緩存
    JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);
    if (oldJobThread != null) {
        //舊任務線程停止,覆蓋策略
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }
    return newJobThread;
}
......

 

public class JobThread extends Thread{

    private static Logger logger = LoggerFactory.getLogger(JobThread.class);
    private int jobId;
    private IJobHandler handler;
    private LinkedBlockingQueue<TriggerParam> triggerQueue;
    private Set<Integer> triggerLogIdSet;        // avoid repeat trigger for the same TRIGGER_LOG_ID
    private volatile boolean toStop = false;
    private String stopReason;
    private boolean running = false;    // if running job
    private int idleTimes = 0;            // idel times

public JobThread(int jobId, IJobHandler handler) { this.jobId = jobId; this.handler = handler; this.triggerQueue = new LinkedBlockingQueue<TriggerParam>(); this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Integer>()); } public IJobHandler getHandler() { return handler; } /** * trigger入隊,執行的時候出隊 * * @param triggerParam * @return */ public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } /** * kill job thread * * @param stopReason */ public void toStop(String stopReason) { /** * Thread.interrupt只支持終止線程的阻塞狀態(wait、join、sleep), * 在阻塞出拋出InterruptedException異常,但是並不會終止運行的線程本身; * 所以需要注意,此處徹底銷毀本線程,需要通過共享變量方式; */ this.toStop = true; this.stopReason = stopReason; } /** * is running job * @return */ public boolean isRunningOrHasQueue() { return running || triggerQueue.size()>0; } @Override public void run() { // init try { handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; ReturnT<String> executeResult = null; try { //出隊消費,3秒獲取不到就返回null triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // 日志 "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); //任務分片數據 ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); if (triggerParam.getExecutorTimeout() > 0) { //有超時限制 Thread futureThread = null; try { final TriggerParam triggerParamTmp = triggerParam; FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { //執行業務job return handler.execute(triggerParamTmp.getExecutorParams()); } }); futureThread = new Thread(futureTask); futureThread.start(); //可能超時 executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); XxlJobLogger.log(e); executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); } finally { futureThread.interrupt(); } } else { // 無超時限制的,直接執行 executeResult = handler.execute(triggerParam.getExecutorParams()); } ......
// destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); } }

 

我們的業務基本,都是實現IJobHandler的excute方法,因此,最終就會到我們的業務方法。

到此,我們的xxl-job之旅就暫且告一段落。其實其中還有不少內容值得去深探,有興趣的可以繼續去看看。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM