首先配置ThreadPoolTaskScheduler线程池:
package cn.demo.support.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class ScheduleConfig { @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskExecutor() { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); executor.setPoolSize(20); executor.setThreadNamePrefix("taskExecutor-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); return executor; } }
配置定时任务,业务类中注入HiveClusterSyncScheduler即可调用:
1 package cn.jsfund.ngdp.support.batchSchedule; 2 3 import java.util.ArrayList; 4 import java.util.Date; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.Map.Entry; 8 import java.util.concurrent.ConcurrentHashMap; 9 import java.util.concurrent.ScheduledFuture; 10 11 import javax.annotation.PostConstruct; 12 import javax.annotation.Resource; 13 14 import org.apache.commons.lang.StringUtils; 15 import org.slf4j.Logger; 16 import org.slf4j.LoggerFactory; 17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.jdbc.core.JdbcTemplate; 19 import org.springframework.scheduling.Trigger; 20 import org.springframework.scheduling.TriggerContext; 21 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 22 import org.springframework.scheduling.support.CronSequenceGenerator; 23 import org.springframework.scheduling.support.CronTrigger; 24 import org.springframework.stereotype.Component; 25 import org.springframework.util.ObjectUtils; 26 27 import cn.jsfund.ngdp.support.config.BasicConfig; 28 import cn.jsfund.ngdp.support.exception.ServiceException; 29 import cn.jsfund.ngdp.support.model.bigdata.TaskDef; 30 import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService; 31 32 @SuppressWarnings("rawtypes") 33 @Component 34 public class HiveClusterSyncScheduler { 35 36 @Resource 37 private JdbcTemplate dmJdbcTemplate; 38 39 @Resource 40 private BasicConfig basicConfig; 41 42 @Resource 43 HiveClusterSyncService hiveClusterSyncService;//业务类 44 45 @Autowired 46 ThreadPoolTaskScheduler threadPoolTaskScheduler; 47 48 private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class); 49 50 private static final String MAPTASKKEY = "map_task_key"; 51 52 private static Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<String, ScheduledFuture>(); 53 54 @PostConstruct 55 public void init() { 56 refreshTasks(); 57 } 58 59 // @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}") 60 public void refreshTasks() { 61 62 if (!"true".equals(basicConfig.getBackupEnabled())) { 63 logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************"); 64 return; 65 } 66 for (Entry<String, ScheduledFuture> entry : scheduledFutureMap.entrySet()) { 67 ScheduledFuture sf = entry.getValue(); 68 if (sf != null) { 69 sf.cancel(false); 70 } 71 } 72 scheduledFutureMap.clear(); 73 74 logger.info("*************开始扫描数据库,刷新定时任务*************"); 75 76 List<TaskDef> list = new ArrayList<>(); 77 78 try { 79 list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null) 80 .getContent(); 81 } catch (Exception e) { 82 logger.info("查询数据库异常,代码执行结束,异常信息:", e); 83 } 84 if (ObjectUtils.isEmpty(list)) { 85 logger.info("查询启动状态的任务记录为空,代码执行结束。"); 86 return; 87 } 88 for (TaskDef taskDef : list) { 89 String taskId = taskDef.getId(); 90 String crontab = taskDef.getCrontab(); 91 if (StringUtils.isBlank(crontab)) { 92 continue; 93 } 94 95 TaskThread taskThread = new TaskThread(taskId, crontab); 96 boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab); 97 if (!isValidExp) { 98 logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab); 99 continue; 100 } 101 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 102 new Trigger() { 103 104 @Override 105 public Date nextExecutionTime(TriggerContext triggerContext) { 106 107 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 108 } 109 }); 110 111 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 112 } 113 logger.info("*************刷新定时任务完成*************"); 114 } 115 116 //添加计划 117 public void addTask(String taskId, String crontab) { 118 if (!"true".equals(basicConfig.getBackupEnabled())) { 119 return; 120 } 121 TaskThread taskThread = new TaskThread(taskId, crontab); 122 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 123 new Trigger() { 124 @Override 125 public Date nextExecutionTime(TriggerContext triggerContext) { 126 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 127 } 128 }); 129 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 130 } 131 132 //取消计划 133 public void cancelTask(Object... taskId) { 134 for (int i = 0; i < taskId.length; i++) { 135 ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId[i]); 136 if (sf != null) { 137 sf.cancel(false); 138 scheduledFutureMap.remove(MAPTASKKEY + taskId[i]); 139 } 140 } 141 } 142 143 //更新计划:先取消再添加 144 public void updateScheduleTask(String taskId, String crontab) throws ServiceException { 145 if (!"true".equals(basicConfig.getBackupEnabled())) { 146 return; 147 } 148 ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId); 149 if (sf != null) { 150 sf.cancel(false); 151 scheduledFutureMap.remove(MAPTASKKEY + taskId); 152 } 153 TaskThread taskThread = new TaskThread(taskId, crontab); 154 ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, 155 new Trigger() { 156 @Override 157 public Date nextExecutionTime(TriggerContext triggerContext) { 158 return new CronTrigger(crontab).nextExecutionTime(triggerContext); 159 } 160 }); 161 scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); 162 } 163 164 class TaskThread extends Thread { 165 166 private String taskId; 167 168 private String crontab; 169 170 public TaskThread(String taskId, String crontab) { 171 this.taskId = taskId; 172 this.crontab = crontab; 173 } 174 175 public void run() { 176 try { 177 hiveClusterSyncService.bootTask(taskId, crontab); 178 } catch (Exception e) { 179 logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage()); 180 } 181 } 182 } 183 184 }