在我們的應用系統中,經常會處理一些耗時任務,自然而然的會想到使用多線程。JDK給我們提供了非常方便的操作線程的API,JDK5之后更是新增了JUC包的支持,並發編程大師Doug Lea(JDK並發的作者)也是一直在為我們使用線程做着不懈的努力。
為什么還要使用Spring來實現多線程呢?這是句廢話!實際有兩個原因,第一使用Spring比使用JDK原生的並發API更簡單。第二我們的應用環境一般都會集成Spring,我們的Bean也都交給Spring來進行管理,那么使用Spring來實現多線程更加簡單,更加優雅。
在Spring3之后,Spring引入了對多線程的支持,如果你使用的版本在3.1以前,應該還是需要通過傳統的方式來實現多線程的。從Spring3同時也是新增了Java的配置方式,而且Java配置方式也逐漸成為主流的Spring的配置方式,因此后面的例子都是以Java的配置進行演示。
在Spring中實現多線程,其實非常簡單,只需要在配置類中添加@EnableAsync就可以使用多線程。在希望執行的並發方法中使用@Async就可以定義一個線程任務。通過spring給我們提供的ThreadPoolTaskExecutor就可以使用線程池。
快速入門
啟用異步任務,線程池配置
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * @Description spring多線程使用 */ @Configuration @EnableAsync // 啟用異步任務 public class ThreadConfig { @Bean("smsExecutor") public ThreadPoolTaskExecutor smsExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //設置線程的名稱前綴 executor.setThreadNamePrefix("短信通知線程"); //線程池維護線程的最少數量 executor.setCorePoolSize(5); //線程池維護線程的最大數量 executor.setMaxPoolSize(10); //非核心線程數的存活時間 executor.setKeepAliveSeconds(4); //阻塞隊列LinkedBlockingQueue executor.setQueueCapacity(25); //對拒絕任務的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化 executor.initialize(); return executor; } @Bean("mailExecutor") public ThreadPoolTaskExecutor mailExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //設置線程的名稱前綴 executor.setThreadNamePrefix("郵件通知線程"); //線程池維護線程的最少數量 executor.setCorePoolSize(5); //線程池維護線程的最大數量 executor.setMaxPoolSize(10); //非核心線程數的存活時間 executor.setKeepAliveSeconds(4); //阻塞隊列LinkedBlockingQueue executor.setQueueCapacity(25); //對拒絕任務的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化 executor.initialize(); return executor; } }
在需要異步執行的方法上添加@Async注解,@Async注解可以指定線程池名稱
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service public class MultiThreadProcessService { public static final Logger logger = LoggerFactory.getLogger(MultiThreadProcessService.class); /** * 短信發送服務 * 默認處理流程耗時1000ms */ @Async("smsExecutor") public void sendSms() { logger.debug("MultiThreadProcessService-sendSms" + Thread.currentThread() + "......start"); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } logger.debug("MultiThreadProcessService-sendSms" + Thread.currentThread() + "......end"); } /** * 郵件發送服務 * 默認處理流程耗時1000ms */ @Async("mailExecutor") public void sendMail() { logger.debug("MultiThreadProcessService-sendMail" + Thread.currentThread() + "......start"); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } logger.debug("MultiThreadProcessService-sendMail" + Thread.currentThread() + "......end"); } }
測試
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = {RootConfig.class, WebConfig.class}) @WebAppConfiguration public class MultiThreadTest { @Autowired private MultiThreadProcessService multiThreadProcessService; @Test public void test() throws IOException { for (int i=0; i<20; i++){ multiThreadProcessService.sendSms(); multiThreadProcessService.sendMail(); } System.in.read(); } }
下面關於線程池的配置還有一種方式,就是直接實現AsyncConfigurer接口,重寫getAsyncExecutor方法即可,代碼如下:
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.lang.reflect.Method; @Configuration @EnableAsync //開啟異步任務支持 //配置類實現AsyncConfigurer接口並重寫getAsyncExecutor方法 public class TaskExecutorConfig implements AsyncConfigurer { //返回一個ThreadPoolTaskExecutor,這就就獲得了一個基於線程池的TaskExecutor @Override public ThreadPoolTaskExecutor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(25); taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler(){ @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { System.out.println( "Exception Caught in Thread - "+ Thread.currentThread().getName()); System.out.println( "Exception message - "+ ex.getMessage()); System.out.println( "Method name - "+ method.getName()); for(Object param : params) { System.out.println( "Parameter value - "+ param); } } }; } }
但是這種方式有一個局限性,就是只能有一個實現AsyncConfigurer接口的類,無法做到隔離不同業務(即不同的業務使用不同的線程池)。如果整個系統確實只需要一個線程池,可以這么做。
異步任務詳解
任務的拒絕策略
JDK內置了四種任務的拒絕策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認)。
ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務。
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理,重試添加當前的任務,它會自動重復調用execute()方法
當然我們也可以自定義我們的處理策略,實現java.util.concurrent.RejectedExecutionHandler接口。
/** * 自定義拒絕策略 */ class SmsExecutorRejectedExecutionHandler implements RejectedExecutionHandler{ //自定義拒絕策略,線程任務隊列滿了的情況下,任務等待入線程隊列 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { System.out.println("wait input queue error"); } } }
@Async注解
基於@Async標注的方法,稱之為異步方法;這些方法將在執行的時候,將會在獨立的線程中被執行,調用者無需等待它的完成,即可繼續其他的操作。
該注解標注在類上,就代表調用該類的所有方法均會自動異步執行;標注在方法上,該方法就會異步執行;當調用該方法時,Async的切面會先向異步線程池申請一個線程,然后使用該線程執行該方法內的業務邏輯。
@Async注解只有一個參數,可以指定異步任務所用的線程池bean名稱,如@Async("taskExecutor")。
@Async使用的注意事項
(1) 異步化注解@Async標注的方法返回值只能是void或者Future<T>(AsyncResult)。
(2) @Async所修飾的方法不要定義為static類型,這樣異步調用不會生效。
(3) @Async 必須不同類間調用,即異步方法和調用異步方法的方法不能再同一個類,原因是:Async底層是通過代理對注解掃描實現的。
(4) @Async所修飾的方法不能和@Transactional一起使用,因為會啟用新的子線程來執行方法內的業務,主線程內的事務注解無法控制子線程的業務操作,原因就是事務存在線程隔離的原因,如果要加事務,請在方法內嵌套其他事務標注后的方法即可生效。
@Async返回值
1、無返回值
基於@Async無返回值調用,直接在使用類,使用方法(建議在使用方法)上,加上注解。若需要拋出異常,需手動new一個異常拋出。
/** * 帶參數的異步調用 * 對於返回值,異常會被AsyncUncaughtExceptionHandler處理掉,前提是實現了AsyncConfigurer接口並重寫了異常捕獲方法 * @param message */ @Async public void sendMessage(String message) { logger.info("sendMessage >> message=" + message); throw new IllegalArgumentException(message); }
2、有返回值
/** * 異常調用返回Future * 對於返回值是Future,不會被AsyncUncaughtExceptionHandler處理,需要我們在進行捕獲異常並處理 * 或者在調用方法調用Future.get()方法時捕獲異常並處理 * @param message */ @Async public Future<String> sendMessage(String message) { logger.info("sendMessage >> message=" + message); Future<String> future; try { Thread.sleep(1000); future = new AsyncResult("success:" + message); throw new IllegalArgumentException("a"); }catch (InterruptedException e){ future = new AsyncResult("error"); }catch (IllegalArgumentException e){ future = new AsyncResult("error-IllegalArgumentException"); } return future; }
什么情況下導致@Async失效
1、缺少@EnableAsync注解
2、異步方法不能是static,加了static就不走AOP了
3、異步方法(添加@Async注解)和調用者方法在同一個類中
原因:spring 在掃描bean的時候會掃描方法上是否包含@Async注解,如果包含,spring會為這個bean動態地生成一個子類(即代理類,proxy),代理類是繼承原來那個bean的。此時,當這個有注解的方法被調用的時候,實際上是由代理類來調用的,代理類在調用時增加異步作用。然而,如果這個有注解的方法是被同一個類中的其他方法調用的,那么該方法的調用並沒有通過代理類,而是直接通過原來的那個bean,所以就沒有增加異步作用,我們看到的現象就是@Async注解無效。
解決:將異步方法按照業務統一抽取到對應的bean中,當外部需要使用時將該bean注入,然后調用bean中的異步方法。