關鍵詞:
1. 定時任務
2. 分布式
3. 可動態配置觸發時間
一般通過Quartz實現定時任務很簡單。如果實現分布式定時任務需要結合分布式框架選擇master節點觸發也可以實現。但我們有個實際需求是,頁面可動態配置定時任務觸發周期(比如,假如下班前如果把先決條件完成了,正常可以18:00觸發完成批量任務,假如完不成,需要將任務設置到很晚,等到先決條件完成再觸發)。這個時候需要滿足1,2,3都要滿足。這樣實現起來就有一定難度了。
下面來看看我是怎樣實現的,如有更好的實現方案,歡迎在評論區提出,謝謝!
1. Quartz實現定時任務
我通過一個工具類來實現,如下:
import java.text.SimpleDateFormat
import java.util.Date
import org.quartz.{CronScheduleBuilder, CronTrigger, JobBuilder, JobKey, TriggerBuilder, TriggerKey, _}
import org.quartz.impl.StdSchedulerFactory
import scala.collection.JavaConverters._
/**
* 定時任務管理類
*
* @author BarryWang create at 2018/5/11 14:22
* @version 0.0.1
*/
object QuartzManager {
private val stdSchedulerFactory = new StdSchedulerFactory
private val JOB_GROUP_NAME = "JOB_GROUP_NAME"
private val TRIGGER_GROUP_NAME = "TRIGGER_NAME"
/**
* 根據指定格式(yyyy-MM-dd HH:mm:ss)時間字符串添加定時任務,使用默認的任務組名,觸發器名,觸發器組名
* @param jobName 任務名
* @param time 時間設置,參考quartz說明文檔
* @param jobClass 任務類名
*/
def addJobByTime(jobName: String, time: String, jobClass: Class[_ <: Job]) : Unit = {
QuartzManager.addJobByTime(jobName, time, jobClass, Map("1"->"otherData"))
}
/**
* 根據指定時間(java.util.Date)添加定時任務,使用默認的任務組名,觸發器名,觸發器組名
*
* @param jobName 任務名
* @param date 日期
* @param jobClass 任務類名
*/
def addJobByDate(jobName: String, date: Date, jobClass: Class[_ <: Job]): Unit = {
QuartzManager.addJobByDate(jobName, date, jobClass, Map("1"->"otherData"))
}
/**
* 根據指定cron表達式添加定時任務,使用默認的任務組名,觸發器名,觸發器組名
*
* @param jobName 任務名
* @param jobClass 任務類名
* @param cron cron表達式
*/
def addJobByCron(jobName: String, cron : String, jobClass: Class[_ <: Job]): Unit = {
QuartzManager.addJobByCron(jobName, cron, jobClass, Map("1"->"otherData"))
}
/**
* 函數描述: 添加一個定時任務,使用默認的任務組名,觸發器名,觸發器組名
* @param jobName 任務名
* @param time 時間字符串, 格式為(yyyy-MM-dd HH:mm:ss)
* @param jobClass 任務類名
* @param paramsMap 定時器需要額外數據
*/
def addJobByTime(jobName: String, time: String, jobClass: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef]): Unit = {
addJobByTime(jobName, time, "yyyy-MM-dd HH:mm:ss", jobClass, paramsMap)
}
/**
* 函數描述: 添加一個定時任務,使用默認的任務組名,觸發器名,觸發器組名
* @param jobName 任務名
* @param time 時間設置,參考quartz說明文檔
* @param jobClass 任務類名
* @param paramsMap 定時器需要額外數據
*/
def addJobByTime(jobName: String, time: String, timePattern: String, jobClass: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef]): Unit = {
val df = new SimpleDateFormat(timePattern)
val cron = getCron(df.parse(time))
addJobByCron(jobName, cron, jobClass, paramsMap)
}
/**
* Description: 添加一個定時任務,使用默認的任務組名,觸發器名,觸發器組名
*
* @param jobName 任務名
* @param date 日期
* @param cls 任務
* @param paramsMap 定時器需要額外數據
*/
def addJobByDate(jobName: String, date: Date, cls: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef]): Unit = {
val cron = getCron(date)
addJobByCron(jobName, cron, cls, paramsMap)
}
/**
* 函數描述: 根據cron表達式添加定時任務(默認觸發器組名及任務組名)
* @param jobId 任務ID
* @param cron 時間設置 表達式,參考quartz說明文檔
* @param jobClass 任務的類
* @param paramsMap 可變參數需要進行傳參的值
*/
def addJobByCron(jobId: String, cron: String, jobClass: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef]): Unit = {
addJob(jobId, cron, jobClass, paramsMap, JOB_GROUP_NAME, TRIGGER_GROUP_NAME)
}
/**
* 函數描述: 根據cron表達式添加定時任務
* @param jobId 任務ID
* @param cron 時間設置 表達式,參考quartz說明文檔
* @param jobClass 任務的類類型 eg:TimedMassJob.class
* @param paramsMap 可變參數需要進行傳參的值
* @param jobGroupName 任務組名
* @param triggerGroupName 觸發器組名
*/
def addJob(jobId: String, cron: String, jobClass: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef],
jobGroupName: String, triggerGroupName: String): Unit = {
val scheduler = stdSchedulerFactory.getScheduler
// 任務名,任務組,任務執行類
val jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobId, jobGroupName).build
//設置參數
jobDetail.getJobDataMap.putAll(paramsMap.asJava)
val triggerBuilder = TriggerBuilder.newTrigger
// 觸發器名,觸發器組
//默認設置觸發器名與任務ID相同
val triggerName = jobId
triggerBuilder.withIdentity(triggerName, triggerGroupName)
triggerBuilder.startNow
// 觸發器時間設定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron))
// 創建Trigger對象
val trigger = triggerBuilder.build.asInstanceOf[CronTrigger]
// 調度容器設置JobDetail和Trigger
scheduler.scheduleJob(jobDetail, trigger)
// 啟動
if (!scheduler.isShutdown) scheduler.start()
}
/**
* 函數描述: 修改一個任務的觸發時間(使用默認的任務組名,觸發器名,觸發器組名)
* @param cron 時間字符串
*/
def modifyJobTime(jobId: String, cron: String, jobClass: Class[_ <: Job]): Unit = {
modifyJobTime(jobId, cron, jobClass, Map("1"->"otherData"), JOB_GROUP_NAME, TRIGGER_GROUP_NAME)
}
/**
* 函數描述: 修改一個任務的觸發時間(使用默認的任務組名,觸發器名,觸發器組名)
* @param cron 時間字符串
*/
def modifyJobTime(jobId: String, cron: String, jobClass: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef]): Unit = {
modifyJobTime(jobId, cron, jobClass, paramsMap, JOB_GROUP_NAME, TRIGGER_GROUP_NAME)
}
/**
* 函數描述: 修改一個任務的觸發時間
* @param jobId 任務ID
* @param cron cron表達式
* @param jobClass 任務類名
* @param paramsMap 其他參數
* @param jobGroupName 任務組名
* @param triggerGroupName 觸發器組
*/
def modifyJobTime(jobId: String, cron: String, jobClass: Class[_ <: Job], paramsMap: Map[_ <: String, _ <: AnyRef],
jobGroupName: String, triggerGroupName: String): Unit = {
val scheduler = stdSchedulerFactory.getScheduler()
//默認設置觸發器名與任務ID相同
val triggerName = jobId
val triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName)
var trigger = scheduler.getTrigger(triggerKey).asInstanceOf[CronTrigger]
if (trigger != null) {
removeJob(jobId)
}
addJob(jobId, cron, jobClass, paramsMap, jobGroupName, triggerGroupName)
}
/**
* 函數描述: 移除一個任務(使用默認的任務組名,觸發器名,觸發器組名)
* @param jobId 任務名稱
*/
def removeJob(jobId: String): Unit = {
val scheduler = stdSchedulerFactory.getScheduler
//默認設置觸發器名與任務ID相同
val triggerName = jobId
val triggerKey = TriggerKey.triggerKey(triggerName, TRIGGER_GROUP_NAME)
// 停止觸發器
scheduler.pauseTrigger(triggerKey)
// 移除觸發器
scheduler.unscheduleJob(triggerKey)
// 刪除任務
scheduler.deleteJob(JobKey.jobKey(jobId , JOB_GROUP_NAME))
}
/**
* 函數描述: 移除一個任務
* @param jobId 任務ID
* @param jobGroupName 任務組
* @param triggerName 觸發器名稱
* @param triggerGroupName 觸發器組名
*/
def removeJob(jobId: String, jobGroupName: String, triggerName: String, triggerGroupName: String): Unit = {
val scheduler = stdSchedulerFactory.getScheduler
val triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName)
// 停止觸發器
scheduler.pauseTrigger(triggerKey)
// 移除觸發器
scheduler.unscheduleJob(triggerKey)
// 刪除任務
scheduler.deleteJob(JobKey.jobKey(jobId , jobGroupName))
}
/**
* 函數描述:啟動所有定時任務
*/
def startJobs(): Unit = {
stdSchedulerFactory.getScheduler.start()
}
/**
* 函數描述:關閉所有定時任務
*
*/
def shutdownJobs(): Unit = {
val sched = stdSchedulerFactory.getScheduler
if (!sched.isShutdown) sched.shutdown()
}
/**
* 根據時間獲取Cron表達式
* @param date 日期
* @return
*/
def getCron(date: Date): String = {
val dateFormat = "ss mm HH dd MM ? yyyy"
formatDateByPattern(date, dateFormat)
}
/**
* 日期格式轉換
* @param date 日期
* @param dateFormat 格式
* @return
*/
def formatDateByPattern(date : Date, dateFormat : String): String = {
val sdf = new SimpleDateFormat(dateFormat)
sdf.format(date)
}
}
2.分布式定時任務實現
分布式定時任務:即在分布式服務的環境下,啟動定時任務。分布式定時任務需要解決的問題是:同一定時任務,同一時間點,只能在一台服務器上啟動。
為了解決分布式定時任務的問題,有兩種方案可以解決:
1)執行定時任務是添加分布式鎖
分布式鎖實現請參考:
分布式鎖的三種實現方式
2)執行定時任務選取一台服務作為master節點執行該定時任務
我們需要框架層面解決定時任務觸發時,選舉一台服務器作為master節點。
實現思路如下:
1. 服務啟動注冊服務時,為服務編號;
2.從注冊服務中隨機選中一台服務器作為master節點;
3. 服務掛掉或添加時重新選舉。
代碼如下:
import java.util.HashMap;
import java.util.Map;
/**
* Created by tangliu on 2016/7/13.
*/
public class MasterHelper {
public static Map<String, Boolean> isMaster = new HashMap<>();
/**
* 根據serviceName, versionName,判斷當前服務是否集群中的master
* todo 服務版本號是否作為master判斷的依據??
* @param servieName
* @param versionName
* @return
*/
public static boolean isMaster(String servieName, String versionName) {
String key = generateKey(servieName, versionName);
if (!isMaster.containsKey(key))
return false;
else
return isMaster.get(key);
}
public static String generateKey(String serviceName, String versionName) {
return serviceName + ":" + versionName;
}
}
競選master:
/**
* 監聽服務節點下面的子節點(臨時節點,實例信息)變化
*/
public void watchInstanceChange(RegisterContext context) {
String watchPath = context.getServicePath();
try {
List<String> children = zk.getChildren(watchPath, event -> {
LOGGER.warn("ServerZk::watchInstanceChange zkEvent:" + event);
//Children發生變化,則重新獲取最新的services列表
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
LOGGER.info("容器狀態:{}, {}子節點發生變化,重新獲取子節點...", ContainerFactory.getContainer().status(), event.getPath());
if (ContainerFactory.getContainer().status() == Container.STATUS_SHUTTING
|| ContainerFactory.getContainer().status() == Container.STATUS_DOWN) {
LOGGER.warn("Container is shutting down");
return;
}
watchInstanceChange(context);
}
});
boolean _isMaster = false;
if (children.size() > 0) {
_isMaster = checkIsMaster(children, MasterHelper.generateKey(context.getService(), context.getVersion()), context.getInstanceInfo());
}
//masterChange響應
LifecycleProcessorFactory.getLifecycleProcessor().onLifecycleEvent(
new LifeCycleEvent(LifeCycleEvent.LifeCycleEventEnum.MASTER_CHANGE,
context.getService(), _isMaster));
} catch (KeeperException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
create(context.getServicePath() + "/" + context.getInstanceInfo(), context, true);
}
}
//-----競選master---
private static Map<String, Boolean> isMaster = MasterHelper.isMaster;
/**
* @param children 當前方法下的實例列表, eg 127.0.0.1:9081:1.0.0,192.168.1.12:9081:1.0.0
* @param serviceKey 當前服務信息 eg com.github.user.UserService:1.0.0
* @param instanceInfo 當前服務節點實例信息 eg 192.168.10.17:9081:1.0.0
*/
public boolean checkIsMaster(List<String> children, String serviceKey, String instanceInfo) {
if (children.size() <= 0) {
return false;
}
boolean _isMaster = false;
/**
* 排序規則
* a: 192.168.100.1:9081:1.0.0:0000000022
* b: 192.168.100.1:9081:1.0.0:0000000014
* 根據 lastIndexOf : 之后的數字進行排序,由小到大,每次取zk臨時有序節點中的序列最小的節點作為master
*/
try {
Collections.sort(children, (o1, o2) -> {
Integer int1 = Integer.valueOf(o1.substring(o1.lastIndexOf(":") + 1));
Integer int2 = Integer.valueOf(o2.substring(o2.lastIndexOf(":") + 1));
return int1 - int2;
});
String firstNode = children.get(0);
LOGGER.info("serviceInfo firstNode {}", firstNode);
String firstInfo = firstNode.replace(firstNode.substring(firstNode.lastIndexOf(":")), "");
if (firstInfo.equals(instanceInfo)) {
isMaster.put(serviceKey, true);
_isMaster = true;
LOGGER.info("({})競選master成功, master({})", serviceKey, CURRENT_CONTAINER_ADDR);
} else {
isMaster.put(serviceKey, false);
_isMaster = false;
LOGGER.info("({})競選master失敗,當前節點為({})", serviceKey);
}
} catch (NumberFormatException e) {
LOGGER.error("臨時節點格式不正確,請使用新版,正確格式為 etc. 192.168.100.1:9081:1.0.0:0000000022");
}
return _isMaster;
}
分布式環境判斷是否是master:
if (!MasterHelper.isMaster("com.today.api.financetask.service.FinanceScheduledService", "1.0.0")) {
//excute the task
}
3. 動態配置定時任務觸發時間
實現定時任務可動態配置,需要通過數據庫表保存最新一次修改的cron表達式來實現:
建表如下:
CREATE TABLE t_scheduled_task ( job_name varchar(50) NOT NULL COMMENT 'job名稱' , job_id varchar(40) NOT NULL COMMENT 'job ID' , job_cron varchar(50) NOT NULL COMMENT 'Job cron表達式' , job_type int COMMENT 'Job類型' , is_start tinyint(2) DEFAULT 1 NOT NULL COMMENT '是否已啟動,0:否(no);1:是(yes)' , remark varchar(256) COMMENT '備注' , updated_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL COMMENT '更新時間' , created_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL COMMENT '創建時間' , has_deleted tinyint(2) DEFAULT 0 NOT NULL COMMENT '是否已刪除,0:否(no);1:是(yes)' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='定時任務配置表'; CREATE UNIQUE INDEX uk_t_scheduled_task ON t_scheduled_task ( job_id);
建表后需要頁面調用接口實現先停掉上次的定時任務,再根據最新修改的觸發時間新建一個新的定時任務:
如下:
//頁面設置每天觸發的時間,格式:HH:mm
val cron = convertHourMinuteToCron(processTime)
//修改定時任務時間, 保存入庫
ScheduledTaskQuerySql.isExists(jobId) match {
case true => ScheduledTaskActionSql.updateTaskCron(jobId, cron)
case false => ScheduledTaskActionSql.insertTScheduledTask(
TScheduledTask( report.name,
jobId,
cron,
None,
TScheduledTaskIsStartEnum.YES.id,
None,
null,
null,
TScheduledTaskHasDeletedEnum.NO.id))
}
//關掉老的定時任務,添加新的定時任務
QuartzManager.modifyJobTime(jobId, cron, classOf[DailyGenIncomeDetailJob])
/**
* 每天定時觸發-轉換時分格式(hh:mm)為cron表達式
* @param hourMibuteStr
* @return
*/
def convertHourMinuteToCron(hourMibuteStr : String) : String = {
val splitHm = hourMibuteStr.split(":")
s"0 ${splitHm(1).trim.toInt} ${splitHm(0).trim.toInt} * * ?"
}
/**
* 每天定時觸發-轉換時分格式(hh:hh:mm)為cron表達式
* @param dayStr
* @return
*/
def convertDayToCron(dayStr : String) : String = {
val splitHm = dayStr.split(":")
s"0 ${splitHm(1).trim.toInt} ${splitHm(0).trim.toInt} * * ?"
}
定義Job及父類:
Job定義
import java.util.Calendar
import com.today.api.checkaccount.scala.enums.FlatFormTypeEnum
import com.today.api.checkaccount.scala.request.ReconciliationRequest
import com.today.api.checkaccount.scala.CheckAccountServiceClient
import com.today.service.financetask.job.define.{AbstractJob, JobEnum}
import org.quartz.JobExecutionContext
class CheckAccountJob extends AbstractJob{
/**
* get the api information
*
* @return (interface name, interface version, JobEnum)
*/
override def getJobAndApiInfo(context: JobExecutionContext): (String, String, JobEnum) = {
("com.today.api.financetask.service.CloseAccountScheduleService", "1.0.0", JobEnum.CHECK_ACCOUNT_PROCESS)
}
/**
* start up the scheduled task
*
* @param context JobExecutionContext
*/
override def run(context: JobExecutionContext): Unit = {
val cal = Calendar.getInstance
cal.add(Calendar.DATE, -1)
new CheckAccountServiceClient().appReconciliation(new ReconciliationRequest(FlatFormTypeEnum.TODAY_APP,None))
}
}
公共父類
import java.io.{PrintWriter, StringWriter} import com.github.dapeng.core.helper.MasterHelper import com.today.api.financetask.scala.enums.TScheduledTaskLogEnum import com.today.service.financetask.action.sql.ScheduledTaskLogSql import com.today.service.financetask.util.{AppContextHolder, Debug} import org.quartz.{Job, JobExecutionContext} import org.slf4j.LoggerFactory import org.springframework.transaction.TransactionStatus import org.springframework.transaction.support.TransactionTemplate import scala.util.{Failure, Success, Try} /** * the abstract class for job */ trait AbstractJob extends Job{ /** 日志 */ val logger = LoggerFactory.getLogger(getClass) /** * execute the job * @param context */ override def execute(context: JobExecutionContext): Unit = { val jobAndApiInfo = getJobAndApiInfo(context) if (!MasterHelper.isMaster(jobAndApiInfo._1, jobAndApiInfo._2)) { logger.info(s"Can't select master to run the job ${jobAndApiInfo._3.jobId}: ${jobAndApiInfo._3.jobName}") return } val logId = ScheduledTaskLogSql.insertScheduledTaskLog(jobAndApiInfo._3) context.put("logId", logId) logger.info(s"Starting the job ${jobAndApiInfo._3.jobId}: ${jobAndApiInfo._3.jobName} ...") /** * 事物處理 */ val transactionTemplate: TransactionTemplate = AppContextHolder.getBean("transactionTemplate") transactionTemplate.execute((status: TransactionStatus) =>{ Debug.reset() Try(Debug.trace(s"${jobAndApiInfo._3.jobId}:${jobAndApiInfo._3.jobName}")(run(context))) match { case Success(x) => { logger.info(s"Successfully execute the job ${jobAndApiInfo._3.jobId}: ${jobAndApiInfo._3.jobName}") successLog(logId) } case Failure(e) => { logger.error(s"Failure execute the job ${jobAndApiInfo._3.jobId}: ${jobAndApiInfo._3.jobName}", e) failureLog(logId, status, e) } } Debug.info() }) } /** * get the api information * @return (interface name, interface version, JobEnum) */ def getJobAndApiInfo(context: JobExecutionContext) : (String, String, JobEnum) /** * start up the scheduled task * @param context JobExecutionContext */ def run(context: JobExecutionContext) /** * 成功日志記錄 * @param logId */ def successLog(logId: Long): Unit ={ ScheduledTaskLogSql.updateExportReportRecord(logId, TScheduledTaskLogEnum.SUCCESS, "Success") } /** * 失敗日志記錄 * @param logId */ def failureLog(logId: Long, status: TransactionStatus, e: Throwable): Unit ={ status.setRollbackOnly() ScheduledTaskLogSql.updateExportReportRecord(logId, TScheduledTaskLogEnum.FAILURE, getExceptionStack(e)) } /** * * 功能說明:在日志文件中 ,打印異常堆棧 * @param e : Throwable * @return : String */ def getExceptionStack(e: Throwable): String = { val errorsWriter = new StringWriter e.printStackTrace(new PrintWriter(errorsWriter)) errorsWriter.toString } }
4. 重啟服務器,啟動所有定時任務
重啟定時任務,需要重啟所有定時任務
這個過程需要將所有定時任務及觸發周期保存到數據庫,重啟后,讀取數據庫啟動恢復所有定時任務
代碼如下(Spring框架下啟動服務自動會啟動ApplicationListener.onApplicationEvent(event: ContextRefreshedEvent)):
import com.today.api.financetask.scala.enums.{TReportTypeEnum, TScheduledTaskIsStartEnum}
import com.today.api.financetask.scala.request.QueryAutoConfigRequest
import com.today.service.financetask.job._
import com.today.service.financetask.job.define.JobEnum
import com.today.service.financetask.query.sql.{AutoConfigQuerySql, ScheduledTaskQuerySql}
import com.today.service.financetask.util.QuartzManager
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationListener
import org.springframework.context.event.ContextRefreshedEvent
import org.springframework.stereotype.Service
/**
* 類功能描述: 定時器監聽器, 服務啟動時啟動定時器
*
* @author BarryWang create at 2018/5/11 12:04
* @version 0.0.1
*/
@Service
class ScheduleStartListener extends ApplicationListener[ContextRefreshedEvent] {
/** 日志 */
val logger = LoggerFactory.getLogger(getClass)
/**
* 啟動加載執行定時任務
*/
override def onApplicationEvent(event: ContextRefreshedEvent): Unit = {
logger.info("=======服務器重啟定時任務啟動start=======")
//1. 恢復日次處理定時任務
recoveryDayTimeProcessJob()
//2. 恢復每天營收明細報表生成定時任務
recoveryImcomeDetailGenJob()
logger.info("=======服務器重啟定時任務啟動end=======")
}
/**
* 恢復日次處理定時任務
*/
private def recoveryDayTimeProcessJob(): Unit ={
try {
ScheduledTaskQuerySql.queryByJobId(JobEnum.DAY_TIME_PROCESS.jobId) match {
case Some(x) =>
if(x.isStart == TScheduledTaskIsStartEnum.YES.id)
QuartzManager.addJobByCron(JobEnum.DAY_TIME_PROCESS.jobId, x.jobCron, classOf[DayTimeProcessJob])
else
logger.info("定時任務:" + JobEnum.DAY_TIME_PROCESS.jobName + "is_start標志為0,不啟動")
case None =>
QuartzManager.addJobByCron(JobEnum.DAY_TIME_PROCESS.jobId, "0 30 2 * * ?", classOf[DayTimeProcessJob])
}
} catch {
case e : Exception => logger.error(JobEnum.DAY_TIME_PROCESS.jobName + "啟動失敗, 失敗原因:", e)
}
}
/**
* 恢復營收明細報表生成定時任務
*/
private def recoveryImcomeDetailGenJob(): Unit = {
val jobName = TReportTypeEnum.INCOMEDETAIL_REPORT.name
try {
val jobId = TReportTypeEnum.INCOMEDETAIL_REPORT.id.toString
ScheduledTaskQuerySql.queryByJobId(jobId) match {
case Some(x) =>
if (x.isStart == TScheduledTaskIsStartEnum.YES.id)
QuartzManager.addJobByCron(jobId, x.jobCron, classOf[DailyGenIncomeDetailJob])
else
logger.info("定時任務:" + jobName + "is_start標志為0,不啟動")
case None =>
QuartzManager.addJobByCron(jobId, "0 10 0 * * ?", classOf[DailyGenIncomeDetailJob])
}
}catch {
case e : Exception => logger.error(jobName + "啟動失敗, 失敗原因:", e)
}
}
}
大家也看到上面代碼會使用使用JobEnum的枚舉來定義不同定時任務信息。
如下就是使用Scala枚舉定義定時信息如下:
case class JobEnum(val jobId: String, val jobName: String) extends Enumeration /** * 所有Job 枚舉定義在此類, 不能重復 * jobId不能重復 * @author BarryWang create at 2018/5/12 10:45 * @version 0.0.1 */ object JobEnum { val DAY_TIME_PROCESS = JobEnum("DAY_TIME_PROCESS", "日次處理定時任務") val INVOICE_SYNC_PROCESS = JobEnum("INVOICE_SYNC_PROCESS", "采購系統同步單據數據定時任務") val RETIREMENT_SYNC_PROCESS = JobEnum("RETIREMENT_SYNC_PROCESS", "采購系統同步報廢單據數據定時任務") val CLOSE_ACCOUNT_STATE_PROCESS = JobEnum("CLOSE_ACCOUNT_STATE_PROCESS","更新關賬狀態定時任務") val PURCHASE_ORDER_2_SYNC_PROCESS = JobEnum("PURCHASE_ORDER_2_SYNC_PROCESS","采購系統同步PO2數據定時任務") val SEND_EMAIL_PROCESS = JobEnum("SEND_EMAIL_PROCESS","計划付款通知和已付款通知定時任務") val CLOSE_ACCOUNT_BASE_DATA_PROCESS = JobEnum("CLOSE_ACCOUNT_BASE_DATA_PROCESS","關賬基礎數據同步定時任務") }
這是可動態配置的分布式定時任務第一版已實現,但還是還是有優化的地方:
1.枚舉里面只有jobId及jobName,為了啟動服務時不需要每個定時任務都寫一個恢復方法,可以將枚舉里面定時任務信息多添加jobClass及cron表達式,這樣以后添加新定時任務只需要維護枚舉信息就可以;
2.每個Job子類里面都需要實現 override def getJobAndApiInfo(context: JobExecutionContext): (String, String, JobEnum) 方法,這個也可以省掉;
3.新添加定時任務完全可以制定一個Job子類,其他操作自動維護進去;
1,2請參考: Scala 定義復雜枚舉 是實現,這種方式雖然做了簡化,只需要定義Job子類及對應JobEnum。但理想狀態所有定時任務信息只需要維護在Job子類就行了
3通過使用反射實現將所有定時任務信息只需要維護在Job子類,請參考: Java Scala獲取注解的類信息
至此整個實現分布式環境下實現動態可配置的定時任務過程已完成
