本文介紹如何使用springboot的sheduled實現任務的定時調度,並將調度的任務實現為並發的方式。
1、定時調度配置scheduled
1)注冊定時任務
package com.xiaoju.dqa.sentinel.scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class Scheduler { private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); @Scheduled(cron = "0 0/2 * * * ?") public void cronTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("cron任務開始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); Thread.sleep(1000); } catch (InterruptedException e) { } } @Scheduled(fixedRate = 2 * 60 ) public void rateTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("fixedRate任務開始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); Thread.sleep(1000); } catch (InterruptedException e) { } } }
2)啟動定時任務
package com.xiaoju.dqa.sentinel; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling @EnableAutoConfiguration @ComponentScan public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
這里就介紹兩種配置調度時間的方式:
1)cron表達式
2)fixedRate,調度頻率也就是調度間隔
如下代碼中設置的都是每兩分鍾調度一次。你只需要將任務用@Scheduled裝飾即可。
我這里只寫了兩個調度任務,而且只sleep1s,如果你sleep 10s的話你就能清晰的看到,兩個任務是串行執行的。
springboot中定時任務的執行時串行的!
開始把他改成並行的。
2、定時調度並行化
定時調度的並行化,線程池實現,非常簡單,只需要添加一個configuration,實現SchedulingConfigurer接口就可以了。
package com.xiaoju.dqa.sentinel.configuration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Configuration @EnableScheduling public class ScheduleConfiguration implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskExecutor()); } @Bean(destroyMethod="shutdown") public Executor taskExecutor() { return Executors.newScheduledThreadPool(100); } }
然后你重啟服務,可以看到兩個任務並行的執行起來。
3、將任務里的方法設置為異步
package com.xiaoju.dqa.sentinel.scheduler; import com.xiaoju.dqa.sentinel.service.AuditCollect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class Scheduler { private static final Logger logger = LoggerFactory.getLogger(Scheduler.class); @Autowired private AuditCollect auditCollect; @Scheduled(cron = "0 0/2 * * * ?") public void cronTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("cron任務開始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); auditCollect.doAuditCollect(); } catch (InterruptedException e) { } } @Scheduled(fixedRate = 2 * 60 ) public void rateTask() { long timestamp = System.currentTimeMillis(); try { Thread thread = Thread.currentThread(); logger.info("fixedRate任務開始, timestamp={}, threadId={}, threadName={}", timestamp, thread.getId(), thread.getName()); auditCollect.doAuditCollect(); } catch (InterruptedException e) { } } }
比如這里有個函數執行的是數據收集,可以把他實現為異步的,並同樣扔到線程池里並發的執行。
看看是怎么實現的。
package com.xiaoju.dqa.sentinel.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xiaoju.dqa.sentinel.client.es.ESDao; import com.xiaoju.dqa.sentinel.client.es.entity.ESDocument; import com.xiaoju.dqa.sentinel.client.es.entity.SearchDocument; import com.xiaoju.dqa.sentinel.client.redis.RedisClient; import com.xiaoju.dqa.sentinel.mapper.sentinel.SentinelMapper; import com.xiaoju.dqa.sentinel.model.SentinelClan; import com.xiaoju.dqa.sentinel.utils.ESSQLUtil; import com.xiaoju.dqa.sentinel.utils.GlobalStaticConf; import com.xiaoju.dqa.sentinel.utils.TimeFunction; import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.*; @Component public class AuditCollect { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async("sentinelSimpleAsync") public void doAuditCollect(JSONObject clanObject, long currentTime) { JSONArray topicArray = clanObject.getJSONArray("families"); // 遍歷所有的topic for (int j = 0; j < topicArray.size(); j++) { JSONObject topicObject = topicArray.getJSONObject(j); audit(clanObject, topicObject, currentTime); } } }
可以看到只是用@Async注釋一下,並且加入了異步的executor=sentinelSimpleAsync。
SentinelSimpleAsync是我們自己實現來定制化線程池的。
package com.xiaoju.dqa.sentinel.configuration; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ExecutorConfiguration { @Value("${executor.pool.core.size}") private int corePoolSize; @Value("${executor.pool.max.size}") private int maxPoolSize; @Value("${executor.queue.capacity}") private int queueCapacity; @Bean public Executor sentinelSimpleAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("SentinelSimpleExecutor-"); executor.initialize(); return executor; } @Bean public Executor sentinelAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("SentinelSwapExecutor-"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
配置文件如下:
#============== 線程池 ===================
executor.pool.core.size=100
executor.pool.max.size=150
executor.queue.capacity=2000
想讓異步生效的話,只需要在application類上加上EnableAsync注釋就好了。
package com.xiaoju.dqa.sentinel; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling @EnableAutoConfiguration @EnableAsync @ComponentScan public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }