1、引言
在開發中,有時會遇到批量處理的業務。如果單線程處理,速度會非常慢,可能會導致上游超時。這是就需要使用多線程開發。
創建線程時,應當使用線程池。一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。
可以使用J.U.C提供的線程池:ThreadPoolExecutor類。在Spring框架中,也可以使用ThreadPoolTaskExecutor類。ThreadPoolTaskExecutor其實是對ThreadPoolExecutor的一種封裝。
2、使用ThreadPoolExecutor類
假設現有業務,輸入Input類,輸出Output類:
@Data @AllArgsConstructor public class Input { int i; } @Data @AllArgsConstructor public class Output { boolean success; String s; }
這里@Data與@AllArgsConstrutor使用了Lombok工具
處理方法:
public Output singleProcess(Input input) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)) }
現在該業務需要批量處理,輸入List<Input>,輸出List<Output>。那么可以創建一個核心線程數為4的線程池,每個線程把執行結果添加到線程安全的List中。這里List應當使用SynchronizedList而不是CopyOnWriteArrayList,因為這里寫操作有多次,而讀操作只有一次。並使用CountDownLatch等待所有線程執行結束:
public List<Output> multiProcess(List<Input> inputList) { ExecutorService executorService = Executors.newFixedThreadPool(4); CountDownLatch countDownLatch = new CountDownLatch(inputList.size()); List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size())); for (Input input : inputList) { executorService.submit(() -> { try { // 單個處理 Output output = singleProcess(input); outputList.add(ouput); } catch (Exception e) { // 處理異常 } finally { countDownLatch.countDown(); } }) } // 等待所有線程執行完成 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return outputList; }
但是這樣還是有很大的問題:
- 阿里巴巴開發手冊不建議我們使用Executors創建線程池,因為Executors.newFixedThreadPool方法沒有限制線程隊列的容量,如果input數量過多,可能導致OOM。
- multiProcess不適合被多次調用,不適合用在大多數業務場景。
3、在Spring框架中使用ThreadPoolTaskExecutor類
為了應對大多數業務場景,配合Spring Boot框架,我們可以使用ThreadPoolTaskExecutor創建線程池,並把它注入到ioc容器中,全局都可以使用。
首先,配置線程池參數
@Data @Component @ConfigurationProperties(prefix = "thread-pool") public class ThreadPoolProperties {
//線程池創建時候初始化的線程數
private int corePoolSize;
//線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
private int maxPoolSize;
//用來緩沖執行任務的隊列
private int queueCapacity;
//允許線程的空閑時間:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
private int keepAliveSeconds; }
在配置文件application.yml中
thread-pool:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 80
keep-alive-seconds: 120
這里線程池各參數的意義可以參考Java線程池實現原理及其在美團業務中的實踐
其次,將ThreadPoolTaskExecutor加入至ioc容器中
@EnableAsync @Configuration public class ThreadPoolConfig { private final ThreadPoolProperties threadPoolProperties; @Autowired public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) { this.threadPoolProperties = threadPoolProperties; } @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(threadPoolProperties.getCorePoolSize()); executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize()); executor.setQueueCapacity(threadPoolProperties.getQueueCapacity()); executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds()); executor.setThreadNamePrefix("thread-pool-"); return executor; } }
這里@EnableAsync是與@Async配合使用,用於執行異步任務,后面會給出示例
最后,在業務類中通過自定義SpringUtils類獲取bean或使用@Async,來使用線程池。
/** * 業務實現類 */ @Service @Slf4j public class Input2OutputServiceImpl implements Input2OutputService { /** * 單個處理 * @param input 輸入對象 * @return 輸出對象 */ @Override public Output singleProcess(Input input) { log.info("Processing..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return new Output(false, null); } return new Output(true, String.valueOf(2 * input.getI() + 1)); } /** * 批量處理 * @param inputList 輸入對象列表 * @return 輸出對象列表 */ @Override public List<Output> multiProcess(List<Input> inputList) { ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class); CountDownLatch latch = new CountDownLatch(inputList.size()); List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size())); for (Input input : inputList) { executor.execute(() -> { try { Output output = singleProcess(input); outputList.add(output); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return outputList; } /** * 異步處理 * @param input 輸入對象 * @return 輸出Future對象 */ @Async("threadPoolTaskExecutor") @Override public Future<Output> asyncProcess(Input input) { return new AsyncResult<>(singleProcess(input)); } }
以上代碼的完整代碼包括測試代碼在筆者的GitHub項目thread-pool-demo,在項目中用到ThreadPoolTaskExecutor可參考。