原文地址:http://blog.csdn.net/dailywater/article/details/51470779
一、问题描述
使用Quartz配置定时任务,配置了超过10个定时任务,这些定时任务配置的触发时间都是5分钟执行一次,实际运行时,发现总有几个定时任务不能执行到。
二、示例程序
1、简单介绍
采用spring+quartz整合方案实现定时任务,Quartz的SchedulerFactoryBean配置参数中不注入taskExecutor属性,使用默认自带的线程池。准备了15个定时任务,全部设置为每隔10秒触发一次,定时任务的实现逻辑是使用休眠8秒的方式模拟执行定时任务的时间耗费。
2、配置文件信息如下(节选):
<bean id="startQuertz" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="testMethod1Trigger"/> <ref bean="testMethod2Trigger"/> // 以下省略13个 触发器的配置 </list> </property> </bean> <bean id="testMethod1Trigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="testMethod1" /> <!-- 指定Cron表达式:每10秒触发一次 --> <property name="cronExpression" value="0/10 * * * * ?"/> </bean> <bean id="testMethod1" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject" ref="triggerService" /> <!-- 要执行的方法名称 --> <property name="targetMethod" value="method1" /> </bean> // 以下省略14个定时任务的配置
3、Java定时任务类程序如下(节选)
@Service("triggerService") public class TriggerService { private int cnt1; public void method1() { try { Thread.sleep(8000); } catch (InterruptedException e) { } cnt1++; } public void print() { StringBuffer sb = new StringBuffer(); sb.append("\nmethod1:" + cnt1); sb.append("\nmethod2:" + cnt2); sb.append("\nmethod3:" + cnt3); sb.append("\nmethod4:" + cnt4); sb.append("\nmethod5:" + cnt5); sb.append("\nmethod6:" + cnt6); sb.append("\nmethod7:" + cnt7); sb.append("\nmethod8:" + cnt8); sb.append("\nmethod9:" + cnt9); sb.append("\nmethod10:" + cnt10); sb.append("\nmethod11:" + cnt11); sb.append("\nmethod12:" + cnt12); sb.append("\nmethod13:" + cnt13); sb.append("\nmethod14:" + cnt14); sb.append("\nmethod15:" + cnt15); System.out.println(sb.toString()); } }
实现逻辑很简单,总共定义15个方法,方法内休眠6秒,同时每个方法都使用一个成员变量记录被调用的次数,并在该类的print()方法里统一输出所有方法调用次数的概况。
4、client启动程序如下:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext.xml") public class TriggerServiceTest extends TestCase { @Autowired private TriggerService triggerService; @Test public void testService() { try { while (true) { Thread.sleep(11000); triggerService.print(); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
一个简单的单元测试用例,每隔11秒调用一次定时任务服务类的print()方法,输出定时任务调用次数的统计值。
5、运行结果
我们让这个demo程序跑了几分钟,控制台输出的取样结果如下:
method1:25 method2:25 method3:25 method4:25 method5:12 method6:12 method7:12 method8:12 method9:12 method10:25 method11:25 method12:25 method13:25 method14:25 method15:25
6、结果分析
此次采样的数据结果表示:15个任务中,有10个执行了25次,另外5个只执行了12次,执行的次数不一样,说明在定时任务调度过程中,有的任务会被遗漏不执行,目前的实验结果能够重现上文描述的问题。
三、源码分析
刚开始我们对此也是感觉到很疑惑,因为任务被漏执行时,没有任何警告或报错的日志信息,这个问题若在实际生产中出现了,很难查明原因。
我们来看一下相关的源码实现,希望能在源码中发现一些有价值的信息:
1)SchedulerFactoryBean类的初始化操作
其中关于线程池属性注入的相关代码如下(省略了部分代码):
/** * Load and/or apply Quartz properties to the given SchedulerFactory. * @param schedulerFactory the SchedulerFactory to initialize */ private void initSchedulerFactory(SchedulerFactory schedulerFactory) throws SchedulerException, IOException { if (!(schedulerFactory instanceof StdSchedulerFactory)) { if (this.configLocation != null || this.quartzProperties != null || this.taskExecutor != null || this.dataSource != null) { throw new IllegalArgumentException( "StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory); } // Otherwise assume that no initialization is necessary... return; } // 省略其他代码... // 此为需要关注的代码 if (this.taskExecutor != null) { mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, LocalTaskExecutorThreadPool.class.getName()); } else { // Set necessary default properties here, as Quartz will not apply // its default configuration when explicitly given properties. mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); mergedProps.setProperty(PROP_THREAD_COUNT, Integer.toString(DEFAULT_THREAD_COUNT)); } // 省略其他代码... }
此代码的逻辑是,如果taskExecutor属性有注入值,就使用指定的线程池,一般Spring是会配置线程池的,线程池的参数可以自行指定。如果taskExecutor未注入值,就使用org.quartz.simple.SimpleThreadPool线程池,DEFAULT_THREAD_COUNT的值为10,即该线程池的大小为10。
我们现在演示的场景是未设置taskExecutor的,所以线程池是SimpleThreadPool的实例对象,池的大小为10。
2)运行过程中,定时任务的触发过程
首先,要从线程池获取可用资源,该实现在org.quartz.core.QuartzSchedulerThread线程类的run方法中,如代码所示:
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); //这个方法的实现在SimpleThreadPool类里 public int blockForAvailableThreads() { synchronized(nextRunnableLock) { while((availWorkers.size() < 1 || handoffPending) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } return availWorkers.size(); } }
注意这个获取线程池资源的方法是阻塞式的,若线程池资源不够用,会一直等待直至获取到可用的资源。这里是产生等待的原因。
然后我们看一下定时任务允许被触发的条件,实现的源码还是在
org.quartz.core.QuartzSchedulerThread线程类的run方法中:
try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.",jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; continue; }
最关键的是acquireNextTriggers方法,这个方法是获取所有可用的触发器,定位到org.quartz.simpl.RAMJobStore实现类中,代码如下:
/** * <p> * Get a handle to the next trigger to be fired, and mark it as 'reserved' * by the calling scheduler. * </p> * * @see #releaseAcquiredTrigger(OperableTrigger) */ public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) { synchronized (lock) { List<OperableTrigger> result = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>(); long firstAcquiredTriggerFireTime = 0; // return empty list if store has no triggers. if (timeTriggers.size() == 0) return result; while (true) { TriggerWrapper tw; try { tw = timeTriggers.first(); if (tw == null) break; timeTriggers.remove(tw); } catch (java.util.NoSuchElementException nsee) { break; } if (tw.trigger.getNextFireTime() == null) { continue; } if (applyMisfire(tw)) { if (tw.trigger.getNextFireTime() != null) { timeTriggers.add(tw); } continue; } if (tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow) { timeTriggers.add(tw); break; } // 省略部分代码... if (result.size() == maxCount) break; } // If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store. if (excludedTriggers.size() > 0) timeTriggers.addAll(excludedTriggers); return result; } }
请注意一下while循环内调用的applyMisfire方法,实现如下:
protected boolean applyMisfire(TriggerWrapper tw) {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) { misfireTime -= getMisfireThreshold(); } Date tnft = tw.trigger.getNextFireTime(); if (tnft == null || tnft.getTime() > misfireTime || tw.trigger.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) { return false; } // 省略其他代码... return true; }
以上源码为了节省篇幅有部分省略,有兴趣的可以自行阅读完整代码。
注意一下这里返回为false的判断逻辑,这个方法返回为false,表示acquireNextTriggers将不再接收这个定时任务,并且没有任何信息输出,这样该定时任务在触发过程中就被忽略不执行了。
顺便留意一下misfireTime,它取当前的时间点,另外减小了5秒钟(减小的时间参数可以设置,默认是5秒),如果我们把tnft.getTime()理解为定时任务预先设定的执行时间,那么”nextFireTime + misfireThreshold”我们可以理解为任务执行的过期时间,misfireTime这个变量是用来跟nextFireTime比较的参数,如果nextFireTime大于misfireTime,即任务当前执行的时间点大于过期时间”nextFireTime + misfireThreshold”,表示任务已经超过了等待的限度,那么这个任务就不再被执行了。
简单地说,就是一个定时任务经过获取可用的线程池资源,到执行这段逻辑的时间,如果5秒内无法完成的话, 这个任务就不再执行了。
回想我们的演示案例,定时任务是超过了10个,就肯定存在线程池资源获取等待的问题,而每个定时任务的方法是休眠6秒钟,又超过了5秒的限度,所以每次调度时,总有一些任务是被略过了的。
四、解决方案
经过以上分析,我们已经了解到出现些问题的原因,解决方案有两种:
1、注入taskExecutor属性,保证线程池资源是够用的。
2、各个定时任务错峰触发。
演示案例的定时任务触发时间均为10秒一次,错峰时间配置可以参照素数原理,减小冲突可能性,比如配置时间为5分钟,7分钟,11分钟,13分钟,17分钟等,这样高峰相遇的概率会低一些。
以上两个方案可根据实际情况挑选,也可以组合使用。
五、总结
1、经过阅读源码分析,可以了解到两个关键点:线程池资源获取等待定时任务过期作废机制。
2、Quartz框架的定时任务执行是绝对时间触发的,所以存在“过期不候”的现象。
3、在使用Quartzs框架时,一定要预先计算好triggers数量与线程池大小的匹配程度,资源一定要够,或者任务执行密度不能太大,否则等到线程任务释放完,trigger早已过期,就无法按预期时间触发了。
六、FAQ
Q1、Quartz框架使用绝对时间触发机制有什么好处?
A1、我个人觉得这种机制对运行环境是一种过载保护,如果任务负荷过重,已经来不及执行的,就适当放弃。如此一来,我们使用就需要注意实际业务场景这种特性的存在,并通过适当增加线程资源,减小任务执行密度,任务错峰触发等方法来避免这种情况发生。只是个人见解,仅作参考。