0003-spring 中線程池配置


0003-spring 中線程池配置

引用alibaba編碼規范中的話

【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣

的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

說明:Executors 返回的線程池對象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool:

允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2)CachedThreadPool 和 ScheduledThreadPool:

允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

1. spring 中自己已經提供了線程池策略 ThreadPoolTaskExecutor

// 部分源碼
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

	private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

    // 最大線程數 Integer.MAX_VALUE;
	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private int queueCapacity = Integer.MAX_VALUE;

	private boolean allowCoreThreadTimeOut = false;

上面的代碼可知,spring默認的線程池最大線程數跟等待隊列都是 Integer.MAX_VALUE直接使用的話也會存在OOM問題。

2. 所以我們在使用時候會自定義我們自己的線程池策略

2.1 通過配置文件yml配置

spring:
  task:
    execution:
      pool:
        core-size: 8 # 核心線程數
        max-size: 20 # 最大線程數
        queue-capacity: 100 # 等待隊列
        allow-core-thread-timeout: true # 是否允許超時回收
        keep-alive: 60s # 線程存活時間
      thread-name-prefix: my_task_

使用:

​ 只需要給方法頭加上 @Async,這個方法在調用的時候就是個異步方法

    @Async
    @Override
    public void testThread2() {
        log.info("testThread2 正在執行,線程{}",Thread.currentThread().getName());
    }

2.2 如果上面不能滿足我們的需求,我們還可以自己新增線程池

package com.zhoust.threadbatch.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
 * @author zhoust
 */
@EnableAsync
@Configuration
@Slf4j
public class ThreadPoolConfig {

    /**
     * 核心線程數(默認線程數)
     */
    @Value("${my-task.execution.pool.corePoolSize}")
    private int corePoolSize;
    /**
     * 最大線程數
     */
    @Value("${my-task.execution.pool.maxPoolSize}")
    private int maxPoolSize;
    /**
     * 允許線程空閑時間(單位:默認為秒)
     */
    @Value("${my-task.execution.pool.keepAliveTime}")
    private int keepAliveTime;
    /**
     * 緩沖隊列大小
     */
    @Value("${my-task.execution.pool.queueCapacity}")
    private int queueCapacity;
    /**
     * 線程池名前綴
     */
    @Value("${my-task.execution.threadNamePrefix}")
    private String threadNamePrefix;
    /**
     * 拒絕策略 1. AbortPolicy 丟棄任務並拋異常,2. DiscardPolicy 丟棄任務不拋異常 3.DiscardOldestPolicy 丟棄隊列最前面的線程(棄老)4.CallerRunsPolicy:由調用線程處理該任務
     */
    @Value("${my-task.execution.rejectedExecutionHandler}")
    private String rejectedExecutionHandler;
    /**
     * 是否回收核心線程
     */
    @Value("${my-task.execution.allowCoreThreadTimeOut}")
    private boolean allowCoreThreadTimeOut;
    /**
     * 執行完畢是否關閉線程池
     */
    @Value("${my-task.execution.waitForTasksToCompleteOnShutdown}")
    private boolean waitForTasksToCompleteOnShutdown;

    
    @Bean("asyncServiceThread")
    public Executor getThreadPool(){

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心線程數
        executor.setCorePoolSize(getCorePoolSize(corePoolSize));
        // 隊列允許等待的線程數
        executor.setQueueCapacity(queueCapacity);
        // 最大線程數,包含核心線程數
        executor.setMaxPoolSize(maxPoolSize);
        // 線程回收時間
        executor.setKeepAliveSeconds(keepAliveTime);
        // 4種拒絕策略 1. AbortPolicy 丟棄任務並拋異常,2. DiscardPolicy 丟棄任務不拋異常 3.DiscardOldestPolicy 丟棄隊列最前面的線程(棄老)4.CallerRunsPolicy:由調用線程處理該任務
        executor.setRejectedExecutionHandler(getRejectedExecutionHandler(rejectedExecutionHandler));
        // 是否允許超時回收線程
        executor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
        // 線程執行完畢后,關閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
        // 線程名前綴
        executor.setThreadNamePrefix(threadNamePrefix);
        // 初始化
        executor.initialize();
        return executor;
    }

    private int getCorePoolSize(int corePoolSize){
        // 如果是-1 去機器核心數的2倍
        if(-1 == corePoolSize){
            int processors = Runtime.getRuntime().availableProcessors();
            log.info("機器核心數:{},核心線程數:{}",processors,processors * 2);
            return processors * 2;
        }
        log.info("核心線程數:{}",corePoolSize);
        return corePoolSize;
    }

    private RejectedExecutionHandler getRejectedExecutionHandler(String rejectedExecutionHandler){
        RejectedExecutionHandler reh = null;
        try{
            log.info("配置的線程拒絕策略:{}",rejectedExecutionHandler);
            reh = (RejectedExecutionHandler) Class.forName("java.util.concurrent.ThreadPoolExecutor$" + rejectedExecutionHandler).newInstance();
        }catch (Exception e){
            log.warn("獲取拒絕策略失敗,默認使用 CallerRunsPolicy");
            reh = new ThreadPoolExecutor.CallerRunsPolicy();
        }
        return reh;
    }

}


yml:

my-task:
  execution:
    pool:
      corePoolSize: 8 # 核心線程數
      maxPoolSize: 20 # 最大線程數
      queueCapacity: 100 # 等待隊列
      allow-core-thread-timeout: true # 是否允許超時回收
      keepAliveTime: 60 # 線程存活時間
    threadNamePrefix: spring_task_
    rejectedExecutionHandler: CallerRunsPolicy # 拒絕策略 1. AbortPolicy 丟棄任務並拋異常,2. DiscardPolicy 丟棄任務不拋異常 3.DiscardOldestPolicy 丟棄隊列最前面的線程(棄老)4.CallerRunsPolicy:由調用線程處理該任務
    allowCoreThreadTimeOut: true # 是否回收核心線程
    waitForTasksToCompleteOnShutdown: true # 執行完畢是否關閉線程池

使用:

指定使用哪個線程池

    @Async(value = "asyncServiceThread")
    @Override
    public void testThread1() {
        log.info("testThread1 正在執行,線程{}",Thread.currentThread().getName());
    }

調用:

    @GetMapping("/testThread")
    public String testThread(){

        for (int i = 0; i < 100; i++) {
            taskServer.testThread1();
            taskServer.testThread2();
        }
        return "succeed";
    }

補充:拒絕策略

拒絕策略就是當線程池已經滿了,還有新的線程過來,此時我們怎么處理這些線程。

1.AbortPolicy(默認線程策略):丟棄任務並拋出RejectedExecutionException異常

​ 如果是比較關鍵的業務,推薦使用此拒絕策略,這樣子在系統不能承載更大的並發量的時候,能夠及時的通過異常發現。

2.DiscardPolicy:丟棄任務,但是不拋出異常

​ 無關緊要的業務可以這樣做。

3.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新提交被拒絕的任務。(棄老策略)

4.CallerRunsPolicy:由調用線程處理該任務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM