Java多線程批量處理、線程池的使用


1、引言

在開發中,有時會遇到批量處理的業務。如果單線程處理,速度會非常慢,可能會導致上游超時。這是就需要使用多線程開發。

創建線程時,應當使用線程池。一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。

可以使用J.U.C提供的線程池:ThreadPoolExecutor類。在Spring框架中,也可以使用ThreadPoolTaskExecutor類。ThreadPoolTaskExecutor其實是對ThreadPoolExecutor的一種封裝。

2、使用ThreadPoolExecutor類

假設現有業務,輸入Input類,輸出Output類:

@Data
@AllArgsConstructor
public class Input {
    int i;
}

@Data
@AllArgsConstructor
public class Output {
    boolean success;
    String s;
}

這里@Data與@AllArgsConstrutor使用了Lombok工具

處理方法:

public Output singleProcess(Input input) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
        return new Output(false, null);
    }
    return new Output(true, String.valueOf(2 * input.getI() + 1))
}

 

現在該業務需要批量處理,輸入List<Input>,輸出List<Output>。那么可以創建一個核心線程數為4的線程池,每個線程把執行結果添加到線程安全的List中。這里List應當使用SynchronizedList而不是CopyOnWriteArrayList,因為這里寫操作有多次,而讀操作只有一次。並使用CountDownLatch等待所有線程執行結束:

public List<Output> multiProcess(List<Input> inputList) {
    ExecutorService executorService = Executors.newFixedThreadPool(4);
    CountDownLatch countDownLatch = new CountDownLatch(inputList.size());
    List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));

    for (Input input : inputList) {
        executorService.submit(() -> {
            try {
                // 單個處理
                Output output = singleProcess(input);
                outputList.add(ouput);
            } catch (Exception e) {
                // 處理異常
            } finally {
                countDownLatch.countDown();
            }
        })
    }

    // 等待所有線程執行完成
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    return outputList;
}

 

但是這樣還是有很大的問題:

  1. 阿里巴巴開發手冊不建議我們使用Executors創建線程池,因為Executors.newFixedThreadPool方法沒有限制線程隊列的容量,如果input數量過多,可能導致OOM。
  2. multiProcess不適合被多次調用,不適合用在大多數業務場景。

3、在Spring框架中使用ThreadPoolTaskExecutor類

為了應對大多數業務場景,配合Spring Boot框架,我們可以使用ThreadPoolTaskExecutor創建線程池,並把它注入到ioc容器中,全局都可以使用。

首先,配置線程池參數

@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {
//線程池創建時候初始化的線程數 private int corePoolSize;
//線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程 private int maxPoolSize;
//用來緩沖執行任務的隊列 private int queueCapacity;
//允許線程的空閑時間:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀 private int keepAliveSeconds; }

  

在配置文件application.yml中

thread-pool:
  core-pool-size: 4
  max-pool-size: 16
  queue-capacity: 80
  keep-alive-seconds: 120

 

這里線程池各參數的意義可以參考Java線程池實現原理及其在美團業務中的實踐

其次,將ThreadPoolTaskExecutor加入至ioc容器中

  

@EnableAsync
@Configuration
public class ThreadPoolConfig {
    private final ThreadPoolProperties threadPoolProperties;

    @Autowired
    public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
        this.threadPoolProperties = threadPoolProperties;
    }

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
        executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
        executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
        executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
        executor.setThreadNamePrefix("thread-pool-");
        return executor;
    }
}

 

這里@EnableAsync是與@Async配合使用,用於執行異步任務,后面會給出示例

最后,在業務類中通過自定義SpringUtils類獲取bean或使用@Async,來使用線程池。

/**
 * 業務實現類
 */
@Service
@Slf4j
public class Input2OutputServiceImpl implements Input2OutputService {
    /**
     * 單個處理
     * @param input 輸入對象
     * @return 輸出對象
     */
    @Override
    public Output singleProcess(Input input) {
        log.info("Processing...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return new Output(false, null);
        }
        return new Output(true, String.valueOf(2 * input.getI() + 1));
    }

    /**
     * 批量處理
     * @param inputList 輸入對象列表
     * @return 輸出對象列表
     */
    @Override
    public List<Output> multiProcess(List<Input> inputList) {
        ThreadPoolTaskExecutor executor
                = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
        CountDownLatch latch = new CountDownLatch(inputList.size());
        List<Output> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));

        for (Input input : inputList) {
            executor.execute(() -> {
                try {
                    Output output = singleProcess(input);
                    outputList.add(output);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return outputList;
    }

    /**
     * 異步處理
     * @param input 輸入對象
     * @return 輸出Future對象
     */
    @Async("threadPoolTaskExecutor")
    @Override
    public Future<Output> asyncProcess(Input input) {
        return new AsyncResult<>(singleProcess(input));
    }
}

 

以上代碼的完整代碼包括測試代碼在筆者的GitHub項目thread-pool-demo,在項目中用到ThreadPoolTaskExecutor可參考。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM