【雜談Spring】Spring支持多線程


在我們的應用系統中,經常會處理一些耗時任務,自然而然的會想到使用多線程。JDK給我們提供了非常方便的操作線程的API,JDK5之后更是新增了JUC包的支持,並發編程大師Doug Lea(JDK並發的作者)也是一直在為我們使用線程做着不懈的努力。

為什么還要使用Spring來實現多線程呢?這是句廢話!實際有兩個原因,第一使用Spring比使用JDK原生的並發API更簡單。第二我們的應用環境一般都會集成Spring,我們的Bean也都交給Spring來進行管理,那么使用Spring來實現多線程更加簡單,更加優雅。

在Spring3之后,Spring引入了對多線程的支持,如果你使用的版本在3.1以前,應該還是需要通過傳統的方式來實現多線程的。從Spring3同時也是新增了Java的配置方式,而且Java配置方式也逐漸成為主流的Spring的配置方式,因此后面的例子都是以Java的配置進行演示。

在Spring中實現多線程,其實非常簡單,只需要在配置類中添加@EnableAsync就可以使用多線程。在希望執行的並發方法中使用@Async就可以定義一個線程任務。通過spring給我們提供的ThreadPoolTaskExecutor就可以使用線程池。

快速入門

啟用異步任務,線程池配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Description spring多線程使用
 */
@Configuration
@EnableAsync  // 啟用異步任務
public class ThreadConfig {

    @Bean("smsExecutor")
    public ThreadPoolTaskExecutor smsExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //設置線程的名稱前綴
        executor.setThreadNamePrefix("短信通知線程");
        //線程池維護線程的最少數量
        executor.setCorePoolSize(5);
        //線程池維護線程的最大數量
        executor.setMaxPoolSize(10);
        //非核心線程數的存活時間
        executor.setKeepAliveSeconds(4);
        //阻塞隊列LinkedBlockingQueue
        executor.setQueueCapacity(25);
        //對拒絕任務的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化
        executor.initialize();
        return executor;
    }

    @Bean("mailExecutor")
    public ThreadPoolTaskExecutor mailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //設置線程的名稱前綴
        executor.setThreadNamePrefix("郵件通知線程");
        //線程池維護線程的最少數量
        executor.setCorePoolSize(5);
        //線程池維護線程的最大數量
        executor.setMaxPoolSize(10);
        //非核心線程數的存活時間
        executor.setKeepAliveSeconds(4);
        //阻塞隊列LinkedBlockingQueue
        executor.setQueueCapacity(25);
        //對拒絕任務的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化
        executor.initialize();
        return executor;
    }

}

在需要異步執行的方法上添加@Async注解,@Async注解可以指定線程池名稱

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class MultiThreadProcessService {

    public static final Logger logger = LoggerFactory.getLogger(MultiThreadProcessService.class);

    /**
     * 短信發送服務
     * 默認處理流程耗時1000ms
     */
    @Async("smsExecutor")
    public void sendSms() {
        logger.debug("MultiThreadProcessService-sendSms" + Thread.currentThread() + "......start");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        logger.debug("MultiThreadProcessService-sendSms" + Thread.currentThread() + "......end");
    }

    /**
     * 郵件發送服務
     * 默認處理流程耗時1000ms
     */
    @Async("mailExecutor")
    public void sendMail() {
        logger.debug("MultiThreadProcessService-sendMail" + Thread.currentThread() + "......start");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        logger.debug("MultiThreadProcessService-sendMail" + Thread.currentThread() + "......end");
    }
}

測試

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {RootConfig.class, WebConfig.class})
@WebAppConfiguration
public class MultiThreadTest {

    @Autowired
    private MultiThreadProcessService multiThreadProcessService;

    @Test
    public void test() throws IOException {
        for (int i=0; i<20; i++){
            multiThreadProcessService.sendSms();
            multiThreadProcessService.sendMail();
        }
        System.in.read();
    }
}

下面關於線程池的配置還有一種方式,就是直接實現AsyncConfigurer接口,重寫getAsyncExecutor方法即可,代碼如下:

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;

@Configuration
@EnableAsync //開啟異步任務支持
//配置類實現AsyncConfigurer接口並重寫getAsyncExecutor方法
public class TaskExecutorConfig implements AsyncConfigurer {

    //返回一個ThreadPoolTaskExecutor,這就就獲得了一個基於線程池的TaskExecutor
    @Override
    public ThreadPoolTaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler(){
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                System.out.println( "Exception Caught in Thread - "+ Thread.currentThread().getName());
                System.out.println( "Exception message - "+ ex.getMessage());
                System.out.println( "Method name - "+ method.getName());
                for(Object param : params) {
                    System.out.println( "Parameter value - "+ param);
                }
            }
        };
    }


}

 但是這種方式有一個局限性,就是只能有一個實現AsyncConfigurer接口的類,無法做到隔離不同業務(即不同的業務使用不同的線程池)。如果整個系統確實只需要一個線程池,可以這么做。

異步任務詳解

任務的拒絕策略

JDK內置了四種任務的拒絕策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認)。
ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務。
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理,重試添加當前的任務,它會自動重復調用execute()方法 

當然我們也可以自定義我們的處理策略,實現java.util.concurrent.RejectedExecutionHandler接口。

/**
 * 自定義拒絕策略
 */
class SmsExecutorRejectedExecutionHandler implements RejectedExecutionHandler{
    //自定義拒絕策略,線程任務隊列滿了的情況下,任務等待入線程隊列
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            System.out.println("wait input queue error");
        }
    }
}

@Async注解

基於@Async標注的方法,稱之為異步方法;這些方法將在執行的時候,將會在獨立的線程中被執行,調用者無需等待它的完成,即可繼續其他的操作。

該注解標注在類上,就代表調用該類的所有方法均會自動異步執行;標注在方法上,該方法就會異步執行;當調用該方法時,Async的切面會先向異步線程池申請一個線程,然后使用該線程執行該方法內的業務邏輯。

@Async注解只有一個參數,可以指定異步任務所用的線程池bean名稱,如@Async("taskExecutor")。

@Async使用的注意事項

(1) 異步化注解@Async標注的方法返回值只能是void或者Future<T>(AsyncResult)。

(2) @Async所修飾的方法不要定義為static類型,這樣異步調用不會生效。

(3) @Async  必須不同類間調用,即異步方法和調用異步方法的方法不能再同一個類,原因是:Async底層是通過代理對注解掃描實現的。

(4) @Async所修飾的方法不能和@Transactional一起使用,因為會啟用新的子線程來執行方法內的業務,主線程內的事務注解無法控制子線程的業務操作,原因就是事務存在線程隔離的原因,如果要加事務,請在方法內嵌套其他事務標注后的方法即可生效。

@Async返回值

1、無返回值

基於@Async無返回值調用,直接在使用類,使用方法(建議在使用方法)上,加上注解。若需要拋出異常,需手動new一個異常拋出。

/**
 * 帶參數的異步調用
 * 對於返回值,異常會被AsyncUncaughtExceptionHandler處理掉,前提是實現了AsyncConfigurer接口並重寫了異常捕獲方法 
 * @param message
 */
@Async
public void sendMessage(String message) {
    logger.info("sendMessage >> message=" + message);
    throw new IllegalArgumentException(message);
}

2、有返回值 

/**
 * 異常調用返回Future
 * 對於返回值是Future,不會被AsyncUncaughtExceptionHandler處理,需要我們在進行捕獲異常並處理
 * 或者在調用方法調用Future.get()方法時捕獲異常並處理
 * @param message
 */
@Async
public Future<String> sendMessage(String message) {
    logger.info("sendMessage >> message=" + message);
    Future<String> future;
    try {
        Thread.sleep(1000);
        future = new AsyncResult("success:" + message);
        throw new IllegalArgumentException("a");
    }catch (InterruptedException e){
        future = new AsyncResult("error");
    }catch (IllegalArgumentException e){
        future = new AsyncResult("error-IllegalArgumentException");
    }
    return future;
}

什么情況下導致@Async失效

1、缺少@EnableAsync注解

2、異步方法不能是static,加了static就不走AOP了

3、異步方法(添加@Async注解)和調用者方法在同一個類中

原因:spring 在掃描bean的時候會掃描方法上是否包含@Async注解,如果包含,spring會為這個bean動態地生成一個子類(即代理類,proxy),代理類是繼承原來那個bean的。此時,當這個有注解的方法被調用的時候,實際上是由代理類來調用的,代理類在調用時增加異步作用。然而,如果這個有注解的方法是被同一個類中的其他方法調用的,那么該方法的調用並沒有通過代理類,而是直接通過原來的那個bean,所以就沒有增加異步作用,我們看到的現象就是@Async注解無效。

解決:將異步方法按照業務統一抽取到對應的bean中,當外部需要使用時將該bean注入,然后調用bean中的異步方法。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM