springboot定時任務之旅——從單體應用到分布式集群


springboot定時任務

假設場景:單體應用的定時任務,假設我們已經有了一個搭建好的springboot應用,但是需要添加一個定時執行的部分(比如筆者遇到的是定時去請求一個接口數據來更新某個表),這樣作為開發人員,筆者選擇了最簡單的方法,也就是springboot自帶的定時器。

1、使用@Scheduled

demo的結構如下:

 

啟動器:

package com.wh.timerdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling//開啟定時任務
public class TimerdemoApplication {

   public static void main(String[] args) {
       SpringApplication.run(TimerdemoApplication.class, args);
  }

}

定時器工具類:

package com.wh.timerdemo.util;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* scheduler定時器執行任務的類
*/
@Component
public class TimerUtil {

   private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

   /**
    * 每2s執行一次
    */
   @Scheduled(fixedRate = 5000)
   public void taskOne(){
       System.out.println("定時任務1執行!!!執行時間:" + dateFormat.format(new Date()));
  }

   /**
    * 每天凌晨3:15執行一次
    */
   @Scheduled(cron = "0 15 03 ? * *")//cron的格式會在后面貼出
   public void taskTwo(){
       System.out.println("定時任務2執行!!!執行時間:" + dateFormat.format(new Date()));
  }
}

輸出結果:

 

最簡單的定時任務就實現啦~

假設場景:分布式應用的定時任務。當這個項目做了一半、第一版即將發布線上時,我司的上雲行動也進行到了白熱化階段,於是筆者就遇到了這樣一個問題:多個實例的定時任務是會同時執行的,這樣不僅會消耗資源,而且可能還會引起數據庫鎖。這時我就想到了quartz。但是要注意,使用quartz的前提是集群的時間要設置統一。

2、使用分布式定時任務框架quartz

首先quartz本身是支持分布式的,通過表來管理各節點之間的關系。

1、去quartz官網下載最新的包 http://www.quartz-scheduler.org/

2、下載之后解壓,進入如下目錄,創建數據庫表

quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables並選擇對應的數據庫SQL(筆者使用的是MySQL數據庫)

 

3、在pom文件中引入依賴

<!--quartz依賴-->
<dependency>
   <groupId>org.quartz-scheduler</groupId>
   <artifactId>quartz</artifactId>
</dependency>
<dependency>
   <groupId>org.quartz-scheduler</groupId>
   <artifactId>quartz-jobs</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-context-support</artifactId>
</dependency>

4、創建 quartz.proiperties 配置文件

org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.makeSchedulerThreadDaemon=true
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemons=true
#線程數量
org.quartz.threadPool.threadCount:20
#線程優先級
org.quartz.threadPool.threadPriority:5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
#特別注意:此處是quartz的數據源,報錯就debug跟蹤一下查看dbName
org.quartz.jobStore.dataSource = springTxDataSource.schedulerFactoryBean
#加入集群
org.quartz.jobStore.isClustered=true
#容許的最大作業延
org.quartz.jobStore.misfireThreshold=25000
#調度實例失效的檢查時間間隔
org.quartz.jobStore.clusterCheckinInterval: 5000

5、quartz的初始化配置,讀取配置文件

package com.wh.timerdemo.config;

import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;

public class QuartzConfig {
   // 配置文件路徑
   private static final String QUARTZ_CONFIG = "/quartz.properties";
   // 按照自己注入的數據源自行修改
   @Qualifier("writeDataSource")
   @Autowired
   private DataSource dataSource;

   @Autowired
   private AutoWiredSpringBeanToJobFactory autoWiredSpringBeanToJobFactory;

   /**
    * 從quartz.properties文件中讀取Quartz配置屬性
    * @return
    * @throws IOException
    */
   @Bean
   public Properties quartzProperties() throws IOException {
       PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
       propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_CONFIG));
       propertiesFactoryBean.afterPropertiesSet();
       return propertiesFactoryBean.getObject();
  }
   /**
    * JobFactory與schedulerFactoryBean中的JobFactory相互依賴,注意bean的名稱
    * 在這里為JobFactory注入了Spring上下文
    *
    * @param applicationContext
    * @return
    */
   @Bean
   public JobFactory buttonJobFactory(ApplicationContext applicationContext) {
       AutoWiredSpringBeanToJobFactory jobFactory = new AutoWiredSpringBeanToJobFactory();
       jobFactory.setApplicationContext(applicationContext);
       return jobFactory;
  }

   @Bean
   public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
       SchedulerFactoryBean factory = new SchedulerFactoryBean();
       factory.setJobFactory(autoWiredSpringBeanToJobFactory);
       factory.setOverwriteExistingJobs(true);
       factory.setAutoStartup(true); // 設置自行啟動
       // 延時啟動,應用啟動1秒后
       factory.setStartupDelay(1);
       factory.setQuartzProperties(quartzProperties());
       factory.setDataSource(dataSource);// 使用應用的dataSource替換quartz的dataSource
       return factory;
  }
}

6、將任務工廠注入到Spring

package com.wh.timerdemo.config;

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;

/**
* 為JobFactory注入SpringBean,否則Job無法使用Spring創建的bean
*/
@Component
public class AutoWiredSpringBeanToJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
   private transient AutowireCapableBeanFactory beanFactory;
   @Override
   public void setApplicationContext(final ApplicationContext context) {
       beanFactory = context.getAutowireCapableBeanFactory();
  }
   @Override
   protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
       final Object job = super.createJobInstance(bundle);
       beanFactory.autowireBean(job);
       return job;
  }

}

7、創建任務調度管理,任務的增刪改查,起動停止等。

package com.wh.timerdemo.config;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.List;

/**
* quartz的調度器 包含了任務的增刪改查 可以配置在頁面上調度任務 這里就省略了
*/
public class QuartzManager {
   private static SchedulerFactory schedulerFactory =  new StdSchedulerFactory();

   private Scheduler scheduler = null;

   /**
    * @Description: 添加一個定時任務
    *
    * @param jobName 任務名
    * @param jobGroupName 任務組名
    * @param triggerName 觸發器名
    * @param triggerGroupName 觸發器組名
    * @param jobClass 任務
    * @param cron   時間設置,參考quartz說明文檔
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public  static void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, String cron) {
       try {
           // 任務名,任務組,任務執行類
           Scheduler scheduler = schedulerFactory.getScheduler();
           JobDetail jobDetail= JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();


           // 觸發器
           TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
           // 觸發器名,觸發器組
           triggerBuilder.withIdentity(triggerName, triggerGroupName);
           triggerBuilder.startNow();
           // 觸發器時間設定
           triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
           // 創建Trigger對象
           CronTrigger trigger = (CronTrigger) triggerBuilder.build();

           // 調度容器設置JobDetail和Trigger
           scheduler.scheduleJob(jobDetail, trigger);

           // 啟動
           if (!scheduler.isShutdown()) {
               scheduler.start();
          }
      } catch (Exception e) {
           throw new RuntimeException(e);

      }
  }

   /**
    * @Description: 修改一個任務的觸發時間
    *
    * @param jobName
    * @param jobGroupName
    * @param triggerName 觸發器名
    * @param triggerGroupName 觸發器組名
    * @param cron   時間設置,參考quartz說明文檔
    */
   public static  void modifyJobTime(String jobName,String jobGroupName, String triggerName, String triggerGroupName, String cron) {
       try {
           Scheduler scheduler = schedulerFactory.getScheduler();
           TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
           CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
           if (trigger == null) {
               return;
          }

           String oldTime = trigger.getCronExpression();
           if (!oldTime.equalsIgnoreCase(cron)) {
               System.out.println("任務:"+jobName+"被修改");
               /** 方式一 :調用 rescheduleJob 開始 */
              /* // 觸發器
               TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
               // 觸發器名,觸發器組
               triggerBuilder.withIdentity(triggerName, triggerGroupName);
               triggerBuilder.startNow();
               // 觸發器時間設定
               triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
               // 創建Trigger對象
               trigger = (CronTrigger) triggerBuilder.build();
               // 方式一 :修改一個任務的觸發時間
               scheduler.rescheduleJob(triggerKey, trigger);*/
               /** 方式一 :調用 rescheduleJob 結束 */
               /** 方式二:先刪除,然后在創建一個新的Job */
               JobDetail jobDetail = scheduler.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
               Class<? extends Job> jobClass = jobDetail.getJobClass();
               removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
               addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass,cron);
               /** 方式二 :先刪除,然后在創建一個新的Job */
          }
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }

   /**
    * @Description: 移除一個任務
    *
    * @param jobName
    * @param jobGroupName
    * @param triggerName
    * @param triggerGroupName
    */
   public  static  void removeJob(String jobName, String jobGroupName,String triggerName, String triggerGroupName) {
       try {
           Scheduler scheduler = schedulerFactory.getScheduler();
           TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);

           scheduler.pauseTrigger(triggerKey);// 停止觸發器
           scheduler.unscheduleJob(triggerKey);// 移除觸發器
           scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 刪除任務
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }

   /**
    * @Description:啟動所有定時任務
    */
   public  static  void startJobs() {
       try {
           Scheduler scheduler = schedulerFactory.getScheduler();
           scheduler.start();
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }

   /**
    * @Description:關閉所有定時任務
    */
   public static void shutdownJobs() {
       try {
           Scheduler scheduler = schedulerFactory.getScheduler();
           if (!scheduler.isShutdown()) {
               scheduler.shutdown();
          }
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
  }

   /**
    * 獲取當前正在執行的任務
    * @return
    */
   public static boolean getCurrentJobs(String name){
       try {
           Scheduler scheduler = schedulerFactory.getScheduler();
           List<JobExecutionContext> jobContexts = scheduler.getCurrentlyExecutingJobs();
           for (JobExecutionContext context : jobContexts) {
               if (name.equals(context.getTrigger().getJobKey().getName())) {
                   return true;
              }
          }
      } catch (Exception e) {
           throw new RuntimeException(e);
      }
       return false;
  }

   public Scheduler getScheduler() {
       return scheduler;
  }

   public void setScheduler(Scheduler scheduler) {
       this.scheduler = scheduler;
  }


}

8、創建一個執行的Job,這里包含定時任務執行的邏輯

package com.wh.timerdemo.task;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @DisallowConcurrentExecution : 此標記用在實現Job的類上面,意思是不允許並發執行.
* 注org.quartz.threadPool.threadCount的數量有多個的情況,@DisallowConcurrentExecution才生效
*/
@DisallowConcurrentExecution
public class ButtonTimerJob implements Job {

   private static final Logger logger = LoggerFactory.getLogger(ButtonTimerJob.class);

   /**
    * 核心方法,Quartz Job真正的執行邏輯。
    * @throws JobExecutionException execute()方法只允許拋出JobExecutionException異常
    */

   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
       logger.info("--------------定時任務執行邏輯---------------------");
  }

}

9、創建啟動Job類:負責任務的創建啟動和配置cron等

package com.wh.timerdemo.task;

import com.wh.timerdemo.config.QuartzManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;

/**
* 定時任務的啟動類
*/
@Configuration
public class StartJob implements ApplicationListener<ContextRefreshedEvent> {

   private Logger logger = LoggerFactory.getLogger(this.getClass());

   public void run() {
       logger.info(">> 啟動定時任務...");
       //   QuartzManager.startJobs();
       QuartzManager.addJob(
               "SpecialPeriodJob",
               "SpecialPeriodJobGroup",
               "SpecialPeriodTrigger",
               "SpecialPeriodTriggerGroup",
               ButtonTimerJob.class,
               "0/30 * * * * ?");
  }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
       System.out.println("啟動定時任務......");
       run();
  }
}

啟動springboot,此時就不需要 @EnableScheduling 注解啦。

執行結果:

雖然IDEA控制台打印的信息顯示我們不是集群啟動,但是上線后查看日志,定時任務確實實現了三個實例但是只有一個會運行。

需要注意的是:任務第一次啟動后就會存入數據庫,再次啟動的時候任務已經存在,就不需要再添加一個任務了,直接執行啟動任務即可。由於quartz的特性,即使集群中有一個服務掛掉了,其他的定時任務仍會接替執行。

4、擴展

附錄1:cron語法——引用自https://www.cnblogs.com/linjiqin/archive/2013/07/08/3178452.html

0 0 10,14,16 * * ? 每天上午10點,下午2點,4點
0 0/30 9-17 * * ? 朝九晚五工作時間內每半小時
0 0 12 ? * WED 表示每個星期三中午12點
"0 0 12 * * ?" 每天中午12點觸發
"0 15 10 ? * *" 每天上午10:15觸發
"0 15 10 * * ?" 每天上午10:15觸發
"0 15 10 * * ? *" 每天上午10:15觸發
"0 15 10 * * ? 2005" 2005年的每天上午10:15觸發
"0 * 14 * * ?" 在每天下午2點到下午2:59期間的每1分鍾觸發
"0 0/5 14 * * ?" 在每天下午2點到下午2:55期間的每5分鍾觸發
"0 0/5 14,18 * * ?" 在每天下午2點到2:55期間和下午6點到6:55期間的每5分鍾觸發
"0 0-5 14 * * ?" 在每天下午2點到下午2:05期間的每1分鍾觸發
"0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44觸發
"0 15 10 ? * MON-FRI" 周一至周五的上午10:15觸發
"0 15 10 15 * ?" 每月15日上午10:15觸發
"0 15 10 L * ?" 每月最后一日的上午10:15觸發
"0 15 10 ? * 6L" 每月的最后一個星期五上午10:15觸發
"0 15 10 ? * 6L 2002-2005" 2002年至2005年的每月的最后一個星期五上午10:15觸發
"0 15 10 ? * 6#3" 每月的第三個星期五上午10:15觸發

附錄2:quartz各張表的作用——引用自https://blog.csdn.net/yhhyhhyhhyhh/article/details/84235374

qrtz_blob_triggers : 以Blob 類型存儲的觸發器。
qrtz_calendars:存放日歷信息, quartz可配置一個日歷來指定一個時間范圍。
qrtz_cron_triggers:存放cron類型的觸發器。
qrtz_fired_triggers:存放已觸發的觸發器。
qrtz_job_details:存放一個jobDetail信息。
qrtz_locks: 存儲程序的悲觀鎖的信息(假如使用了悲觀鎖)。
qrtz_paused_trigger_graps:存放暫停掉的觸發器。
qrtz_scheduler_state:調度器狀態。
qrtz_simple_triggers:簡單觸發器的信息。
qrtz_trigger_listeners:觸發器監聽器。
qrtz_triggers:觸發器的基本信息。
cron方式需要用到的4張數據表:
qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details

附錄3:quartz的工作原理——引用自https://blog.51cto.com/simplelife/2314620?source=drh

Quartz實際並不關心你是在相同還是不同的機器上運行節點。當集群放置在不同的機器上時,稱之為水平集群。節點跑在同一台機器上時,稱之為垂直集群。對於垂直集群,存在着單點故障的問題。這對高可用性的應用來說是無法接受的,因為一旦機器崩潰了,所有的節點也就被終止了。對於水平集群,存在着時間同步問題。

節點用時間戳來通知其他實例它自己的最后檢入時間。假如節點的時鍾被設置為將來的時間,那么運行中的Scheduler將再也意識不到那個結點已經宕掉了。另一方面,如果某個節點的時鍾被設置為過去的時間,也許另一節點就會認定那個節點已宕掉並試圖接過它的Job重運行。最簡單的同步計算機時鍾的方式是使用某一個Internet時間服務器(Internet Time Server ITS)。

節點爭搶Job問題:

因為Quartz使用了一個隨機的負載均衡算法, Job以隨機的方式由不同的實例執行。Quartz官網上提到當前,還不存在一個方法來指派(釘住) 一個 Job 到集群中特定的節點。

可以看出采用了Quartz集群采用了悲觀鎖的方式對triggers表進行行加鎖, 以保證任務同步的正確性。

當線程使用上述的SQL對表中的數據執行操作時,數據庫對該行進行行加鎖; 於此同時, 另一個線程對該行數據執行操作前需要獲取鎖, 而此時已被占用, 那么這個線程就只能等待, 直到該行鎖被釋放。

寫在最后:
本次定時任務之旅算是告一段落了,在趟雷的路上踩了不少雷,可惜當時時間緊迫,沒有來得及把很多錯誤記錄下來,只在最后總結出這樣一篇文章和demo分享給各位,小弟才疏學淺寫的不好,如有寫的不對的地方歡迎各位指正。

本文的參考鏈接如下:
https://blog.csdn.net/d984881239/article/details/86569818
https://blog.csdn.net/yhhyhhyhhyhh/article/details/84235374

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM