Spring任務執行和任務調度


介紹

Spring框架分別通過TaskExecutor和TaskScheduler接口提供了異步執行和任務調度的抽象。 Spring還提供了那些接口的實現,這些接口在應用程序服務器環境中支持線程池或委托給CommonJ。 最終,在公共接口后面使用這些實現可以抽象化Java SE 5,Java SE 6和Java EE環境之間的差異。

Spring還具有集成類,用於支持與Timer(自1.3起成為JDK的一部分)和Quartz Scheduler(https://www.quartz-scheduler.org/)一起進行調度。 這兩個調度程序都是使用FactoryBean設置的,分別帶有對Timer或Trigger實例的可選引用。 此外,還提供了Quartz Scheduler和Timer的便捷類,該類允許您調用現有目標對象的方法(類似於常規的MethodInvokingFactoryBean操作)。

Spring TaskExecutor抽象

Executors 是線程池概念的JDK名稱。 “executor”的命名是由於無法保證基礎實現實際上是一個池; 執行程序可能是單線程的,甚至是同步的。 Spring的抽象隱藏了Java SE和Java EE環境之間的實現細節

Spring的TaskExecutor接口與java.util.concurrent.Executor接口相同。 實際上,最初,它存在的主要原因是在使用線程池時抽象出對Java 5的需求。 該接口具有單個方法execute(Runnable task),該方法根據線程池的語義和配置接受要執行的任務。

最初創建TaskExecutor的目的是為其他Spring組件提供所需的線程池抽象。 ApplicationEventMulticaster,JMS的AbstractMessageListenerContainer和Quartz集成等組件都使用TaskExecutor抽象來池化線程。 但是,如果您的bean需要線程池行為,則可以根據自己的需要使用此抽象。

TaskExecutor類型

Spring發行版中包含許多TaskExecutor的預構建實現。 您極有可能無需實現自己的方法。 常見的即用型變體是:

  • SyncTaskExecutor此實現不會異步執行調用。 而是,每個調用都在調用線程中進行。 它主要用於不需要多線程的情況下,例如在簡單的測試案例中。
  • SimpleAsyncTaskExecutor此實現不重用任何線程,而是為每次調用啟動一個新線程。 但是,它確實支持並發限制,該限制將阻止超出限制的所有調用,直到釋放插槽為止。 如果您正在尋找真正的池,請參見下面的ThreadPoolTaskExecutor。
  • ConcurrentTaskExecutor此實現是java.util.concurrent.Executor實例的適配器。 還有一個替代方法ThreadPoolTaskExecutor,它將Executor配置參數公開為bean屬性。 很少需要直接使用ConcurrentTaskExecutor,但是如果ThreadPoolTaskExecutor不夠靈活,無法滿足您的需求,那么可以選擇ConcurrentTaskExecutor。
  • ThreadPoolTaskExecutor此實現是最常用的實現。 它公開了用於配置 java.util.concurrent.ThreadPoolExecutor的bean屬性,並將其包裝在TaskExecutor中。 如果需要適應其他類型的java.util.concurrent.Executor,建議您改用ConcurrentTaskExecutor。
  • WorkManagerTaskExecutor此實現使用CommonJ WorkManager作為其支持服務提供者,並且是在Spring應用程序上下文中在WebLogic / WebSphere上設置基於CommonJ的線程池集成的中央便利類。
  • DefaultManagedTaskExecutor此實現在兼容JSR-236的運行時環境(例如Java EE 7+應用程序服務器)中使用JNDI獲得的ManagedExecutorService,為此替換了CommonJ WorkManager。

使用TaskExecutor

Spring的TaskExecutor實現用作簡單的JavaBean。 在下面的示例中,我們定義一個使用ThreadPoolTaskExecutor異步打印出一組消息的bean。

import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskExecutor;

/**
 * @author Created by niugang on 2020/4/7/20:46
 */
public class TaskExecutorExample implements InitializingBean {
    @Override
    public void afterPropertiesSet() throws Exception {
        printMessages();
    }

    private class MessagePrinterTask implements Runnable {

        private String message;

        public MessagePrinterTask(String message) {
            this.message = message;
        }


        @Override
        public void run() {
            System.out.println(message);
        }
    }

    private TaskExecutor taskExecutor;

    public TaskExecutorExample(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }


    public void printMessages() {
        for (int i = 0; i < 25; i++) {
            taskExecutor.execute(new MessagePrinterTask("Message" + i));
        }
    }
}

如您所見,您可以將Runnable添加到隊列,而TaskExecutor使用其內部規則來確定何時執行任務,而不是從池中檢索線程並執行自己。

為了配置TaskExecutor將使用的規則,已經公開了簡單的bean屬性。

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5"/>
        <property name="maxPoolSize" value="10"/>
        <property name="queueCapacity" value="25"/>
    </bean>

    <bean id="taskExecutorExample" class="com.xdja.dsc.system.springvalidate.task.TaskExecutorExample">
        <constructor-arg ref="taskExecutor"/>
    </bean>

Java config

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @author Created by niugang on 2020/4/8/14:41
 */
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setMaxPoolSize(10);
        threadPool.setCorePoolSize(2);
        threadPool.setQueueCapacity(100);
        return  threadPool;
    }

}
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;


/**
 * @author Created by niugang on 2020/4/7/20:46
 */
@Component
public class TaskExecutorExample implements InitializingBean {

    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    public TaskExecutorExample(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.threadPoolTaskExecutor = threadPoolTaskExecutor;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        printMessages();
    }

    private class MessagePrinterTask implements Runnable {

        private String message;

        public MessagePrinterTask(String message) {
            this.message = message;
        }


        @Override
        public void run() {
            System.out.println(message);
        }
    }


    public void printMessages() {
        for (int i = 0; i < 25; i++) {
            threadPoolTaskExecutor.execute(new MessagePrinterTask("Message" + i));
        }
    }
}

Spring TaskScheduler抽象

除了TaskExecutor抽象之外,Spring 3.0還引入了TaskScheduler,它具有多種用於計划任務在將來某個時間點運行的方法。

public interface TaskScheduler {

    ScheduledFuture schedule(Runnable task, Trigger trigger);

    ScheduledFuture schedule(Runnable task, Date startTime);

    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);

    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);

    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);

    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}

最簡單的方法是一個名為“schedule”的方法,它僅接受Runnable和Date。 這將導致任務在指定時間后運行一次。 所有其他方法都可以調度任務以重復運行。 固定速率和固定延遲方法用於簡單的定期執行,但是接受觸發器的方法則更加靈活。

Trigger接口

Trigger接口本質上是受JSR-236(從Spring 3.0開始尚未正式實現)的啟發。 觸發器的基本思想是可以根據過去的執行結果甚至任意條件來確定執行時間。 如果這些確定確實考慮了先前執行的結果,則該信息在TriggerContext中可用。 Trigger接口本身非常簡單:

public interface Trigger {

    Date nextExecutionTime(TriggerContext triggerContext);
}

如您所見,TriggerContext是最重要的部分。 它封裝了所有相關數據,並在將來必要時開放以進行擴展。 TriggerContext是一個接口(默認情況下使用SimpleTriggerContext實現)。 在這里,您可以查看哪些方法可用於Trigger實現。

public interface TriggerContext {

    Date lastScheduledExecutionTime();

    Date lastActualExecutionTime();

    Date lastCompletionTime();
}

Trigger 實現

Spring提供了Trigger接口的兩種實現。 最有趣的是CronTrigger。 它啟用了基於cron表達式的任務調度。 例如,以下任務計划在每小時的15分鍾后運行,但僅在工作日的9到17個“工作時間”內運行。

scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));

在這里插入圖片描述

另一種現成的實現是PeriodicTrigger,它接受一個固定的時間段,一個可選的初始延遲值和一個布爾值,以指示該時間段應解釋為固定速率還是固定延遲。 由於TaskScheduler接口已經定義了以固定速率或固定延遲計划任務的方法,因此應盡可能直接使用這些方法。 PeriodicTrigger實現的價值在於它可以在依賴於觸發器抽象的組件中使用。 例如,允許周期性觸發器,基於cron的觸發器,甚至自定義觸發器實現可互換使用可能很方便。 這樣的組件可以利用依賴注入的優勢,從而可以在外部配置此類觸發器,因此可以輕松地對其進行修改或擴展。

TaskScheduler實現

在這里插入圖片描述

與Spring的TaskExecutor抽象一樣,TaskScheduler安排的主要好處是應用程序的調度需求與部署環境分離。 當部署到不應由應用程序本身直接創建線程的應用程序服務器環境時,此抽象級別特別重要。 對於此類情況,Spring提供了一個委托給WebLogic / WebSphere上的CommonJ TimerManager TimerManagerTaskScheduler,以及一個委托給Java EE 7+環境中的JSR-236 ManagedScheduledExecutorService的更新的DefaultManagedTaskScheduler,通常都配置了JNDI查找。

每當不需要外部線程管理時,一個更簡單的選擇是在應用程序中進行本地ScheduledExecutorService設置,可以通過Spring的ConcurrentTaskScheduler對其進行調整。 為了方便起見,Spring還提供了一個ThreadPoolTaskScheduler,它在內部委托給ScheduledExecutorService,從而按照ThreadPoolTaskExecutor的方式提供了通用的bean樣式配置。 這些變體也適用於寬松的應用程序服務器環境中的本地嵌入式線程池設置,尤其是在Tomcat和Jetty上

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10"/>
    </bean>

    <bean id="taskScheduleExample" class="com.xdja.dsc.system.springvalidate.task.TaskScheduleExample">
        <constructor-arg ref="taskExecutor"/>
    </bean>
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.Date;


/**
 * @author Created by niugang on 2020/4/7/20:46
 */

public class TaskScheduleExample implements InitializingBean {

   private   ThreadPoolTaskScheduler  threadPoolTaskScheduler;

    public TaskScheduleExample(ThreadPoolTaskScheduler threadPoolTaskScheduler) {
        this.threadPoolTaskScheduler=threadPoolTaskScheduler;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        threadPoolTaskScheduler.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                 //5秒執行一次
                System.out.println("定時任務執行:"+new Date());
            }
        },new Date(),5000);
    }
}

基於注解的任務計划和任務執行

Spring為任務調度和異步方法執行提供注解支持

啟用計划任務注解

要啟用對@Scheduled和@Async批注的支持,請將@EnableScheduling和@EnableAsync添加到您的@Configuration類之一:

@Configuration
@EnableAsync
@EnableScheduling
public class AppConfig {
}

您可以自由選擇適合您的應用程序的相關注釋。 例如,如果只需要對@Scheduled的支持,則只需省略@EnableAsync。 為了獲得更細粒度的控制,您還可以實現SchedulingConfigurerAsyncConfigurer接口

如果您更喜歡XML配置,請使用<task:annotation-driven>元素。

<task:annotation-driven executor="myExecutor" scheduler="myScheduler"/>
<task:executor id="myExecutor" pool-size="5"/>
<task:scheduler id="myScheduler" pool-size="10"/>

請注意,使用上面的XML,將提供執行程序引用來處理與帶有@Async批注的方法相對應的那些任務,並且提供調度程序引用來管理以@Scheduled注釋的那些方法。

處理@Async批注的默認建議模式是“ proxy”,它僅允許通過代理來攔截呼叫。 同一類中的本地調用無法以這種方式被攔截。 對於更高級的偵聽模式,請考慮結合編譯時或加載時編織切換到“ aspectj”模式。

注意:上面同一個類中無法進行異步執行

@Scheduled注解

可以將@Scheduled注釋與觸發器元數據一起添加到方法中。 例如,以下方法將每隔5秒以固定的延遲被調用一次,這意味着將從每個先前調用的完成時間開始計算該時間段。

@Scheduled(fixedDelay=5000)
public void doSomething() {
    // something that should execute periodically
}

如果需要固定速率執行,只需更改注釋中指定的屬性名稱。 在每次調用的連續開始時間之間測量的每5秒執行一次以下操作。

@Scheduled(fixedRate=5000)
public void doSomething() {
    // something that should execute periodically
}

對於固定延遲和固定速率的任務,可以指定初始延遲,以指示在第一次執行該方法之前要等待的毫秒數。

@Scheduled(initialDelay=1000, fixedRate=5000)
public void doSomething() {
    // something that should execute periodically
}

如果簡單的周期性調度不足以表現出來,則可以提供cron表達式。 例如,以下僅在工作日執行。

@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething() {
    // something that should execute on weekdays only
}

請注意,要調度的方法必須具有空返回值,並且不能期望任何參數。 如果該方法需要與應用程序上下文中的其他對象進行交互,則通常將通過依賴項注入來提供這些對象。

從Spring Framework 4.3開始,任何范圍的bean都支持@Scheduled方法。

確保不要在運行時初始化同一@Scheduled注釋類的多個實例,除非您確實希望為每個此類實例計划回調。 與此相關,請確保不要在使用@Scheduled注釋並通過容器注冊為常規Spring Bean的bean類上使用@Configurable:否則,您將獲得雙重初始化,一次通過容器,一次通過@Configurable方面 ,每個@Scheduled方法都會被調用兩次。

@Async 注解

可以在方法上提供@Async注解,以便對該方法的調用將異步發生。 換句話說,調用者將在調用后立即返回,並且該方法的實際執行將在已提交給Spring TaskExecutor的任務中發生。 在最簡單的情況下,可以將注解應用於沒有返回值的方法。

@Async
void doSomething() {
    // this will be executed asynchronously
}

與用@Scheduled注解的方法不同,這些方法可以使用參數,因為它們將在運行時由調用者以“常規”方式調用,而不是從容器管理的計划任務中調用。 例如,以下是@Async注解的合法應用程序。

@Async
void doSomething(String s) {
    // this will be executed asynchronously
}

@Async方法不僅可以聲明常規的java.util.concurrent.Future返回類型,還可以聲明Spring的org.springframework.util.concurrent.ListenableFuture,或者從Spring 4.2起,聲明JDK 8的java.util.concurrent.CompletableFuture:以實現更豐富的交互 與異步任務並通過進一步的處理步驟立即合成。

異步實例:

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * @author Created by niugang on 2020/4/8/16:48
 */
@Configuration
/**
 * 處理@Async批注的默認建議模式是“ proxy”,它僅允許通過代理來攔截呼叫。
 * 同一類中的本地調用無法以這種方式被攔截。
 * 對於更高級的偵聽模式,請考慮結合編譯時或加載時編織切換到“ aspectj”模式
 */
@EnableAsync
public class AppAsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(7);
        executor.setMaxPoolSize(42);
        executor.setQueueCapacity(11);
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }
}

controller

   @GetMapping("async")
    public Object findByCodeAndAuthor() {
        asyncService.asyncMethod();
        return "success";

    }

    @GetMapping("async2")
    public Object findByCodeAndAuthor2() {
        asyncService.asyncMethod2();
        return "success";

    }

service

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author Created by niugang on 2020/4/8/17:07
 */
@Service
@Slf4j
public class AsyncService {

    private final ApplicationContext applicationContext;

    @Autowired
    public AsyncService(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }


    @Async
    public void asyncMethod() {


        log.info("開始執行異步方法:{},Thread Name:{}", new Date(), Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
            log.info("異步方法執行結束:{}", new Date());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }


    /**
     * 解決同類方法調用
     */
    public void asyncMethod2() {
        AsyncService asyncService = applicationContext.getBean(AsyncService.class);
        asyncService.asyncMethod3();

    }

    @Async
    void asyncMethod3() {

        log.info("開始執行異步本類方法:{},Thread Name:{}", new Date(), Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
            log.info("異步方法執行本類結束:{}", new Date());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }
}

@Async的異常管理

當@Async方法具有Future類型的返回值時,很容易管理在方法執行期間引發的異常,因為在調用Future結果時將引發此異常。 但是,對於void返回類型,該異常不會被捕獲,並且無法傳輸。 對於這些情況,可以提供AsyncUncaughtExceptionHandler來處理此類異常。

public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        // handle exception
    }
}

默認情況下,僅記錄異常。 可以通過AsyncConfigurer或task:annotation驅動的XML元素定義自定義AsyncUncaughtExceptionHandler。

在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM