今天是2022-02-19,周六。今天我們來聊聊java的線程池。大家聽到線程池,第一反應是聯想到線程。那么何為線程池?跟線程有啥區別?以及線程池、線程都是怎么使用?帶着這些疑問,看完這篇大家有幾本的了解。
一、線程池是什么?
線程池就是可以創建固定線程數量、最大線程數、等待隊列數、每一個線程的執行時間、線程的名稱等參數的線程。大家可以理解成,線程池就是多個線程組成,但是跟線程又有區別。線程是單一且需要時就創建,執行完任務就銷毀,而線程池就不會,需要就取一個創建好的線程,用完就放回去。
二、創建線程池有哪些方式?(使用Executors頂層容器靜態類實現創建)
1、Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
2、Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池
3、Executory.newFixedThreadPool(int); //創建固定容量大小的線程池
三、使用ThreadPoolExecutor創建線程池(參數可以定義配置到配置到配置文件)
下面直接上代碼:
import cn.hutool.json.JSONUtil; import com.huawei.wps.config.properties.AsyncConvertProgressThreadPoolProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync //這個注解是因為我后面使用的是異步線程池,所以加了這個開啟異步 @Slf4j //日志 public class AsyncConvertProgressThreadPoolTaskConfig { /** * 默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務, * 當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中; * 當隊列滿了,就繼續創建線程,當線程數量大於等於maxPoolSize后,開始使用拒絕策略拒絕 */ @Autowired private AsyncThreadPoolProperties asyncThreadPoolProperties; //配置線程池 @Bean("AsycnTaskExecutor") public ThreadPoolTaskExecutor convertProgressTaskExecutor() { log.info("加載異步線程池:{}", JSONUtil.toJsonStr(asyncThreadPoolProperties)); //加載參數 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用ThreadPoolTaskExector線程池類 executor.setCorePoolSize(asyncThreadPoolProperties.getCorePoolSize()); //配置核心線程數 executor.setMaxPoolSize(asyncThreadPoolProperties.getMaxPoolSize()); //配置最大線程數 executor.setQueueCapacity(asyncThreadPoolProperties.getQueueCapacity());//配置等待線程池數的容量 executor.setKeepAliveSeconds(asyncThreadPoolProperties.getKeepAliveTime()); // executor.setThreadNamePrefix(asyncThreadPoolProperties.getThreadNamePrefix()); // // 線程池對拒絕任務的處理策略 // CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //配置拒絕策略 // 初始化 executor.initialize(); return executor; } }
上面是配置,那么如何使用這個線程池?
@Async("AsyncTaskExecutor")//該注解是實現下面的方法異步,至於:AsyncTaskExecutory是因為綁定了上面的Bean實例,也就是使用線程池。完整的解釋是:使用異步線程池來完成方法:asyncTask的業務處理 public void asyncTask(Integer paramater,String phone) {
}
上面是參數配置,以及異步線程池的使用。目前我在實際工作中,文件進行處理(分片上傳,分片下載),使用該異步線程池,都能滿足業務的需要(這里不能說是最優,畢竟其他小伙伴肯定其他更好的方案)。
四、扒拉原理(待講解)
大家看到了異步線程池主要是,那么我們來看看里面有哪些邏輯代碼:ThreadPoolTaskExecutor
ThreadPoolTaskExecutor: 原來是繼承了ExecutorConfigurationSupport和實現AnyncListanbleTaskExecutory、SchedulingTaskExecutor
ExecutorConfigurationSupport.class如下:
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean { protected final Log logger = LogFactory.getLog(this.getClass()); private ThreadFactory threadFactory = this; private boolean threadNamePrefixSet = false; private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy(); //給了默認拒絕策略 private boolean waitForTasksToCompleteOnShutdown = false; private long awaitTerminationMillis = 0L; @Nullable private String beanName; @Nullable private ExecutorService executor; public ExecutorConfigurationSupport() { }
//省了其它get、set代碼
}
AsyncListenableTaskExecutor.class如下:
public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor { /** * Submit a {@code Runnable} task for execution, receiving a {@code ListenableFuture} * representing that task. The Future will return a {@code null} result upon completion. * @param task the {@code Runnable} to execute (never {@code null}) * @return a {@code ListenableFuture} representing pending completion of the task * @throws TaskRejectedException if the given task was not accepted */ ListenableFuture<?> submitListenable(Runnable task); /** * Submit a {@code Callable} task for execution, receiving a {@code ListenableFuture} * representing that task. The Future will return the Callable's result upon * completion. * @param task the {@code Callable} to execute (never {@code null}) * @return a {@code ListenableFuture} representing pending completion of the task * @throws TaskRejectedException if the given task was not accepted */ <T> ListenableFuture<T> submitListenable(Callable<T> task); }
接着:AsyncTaskExecutor接口如下:你會發現只有execute、sumbit抽象方法
public interface AsyncTaskExecutor extends TaskExecutor { /** Constant that indicates immediate execution. */ long TIMEOUT_IMMEDIATE = 0; /** Constant that indicates no time limit. */ long TIMEOUT_INDEFINITE = Long.MAX_VALUE; /** * Execute the given {@code task}. * @param task the {@code Runnable} to execute (never {@code null}) * @param startTimeout the time duration (milliseconds) within which the task is * supposed to start. This is intended as a hint to the executor, allowing for * preferred handling of immediate tasks. Typical values are {@link #TIMEOUT_IMMEDIATE} * or {@link #TIMEOUT_INDEFINITE} (the default as used by {@link #execute(Runnable)}). * @throws TaskTimeoutException in case of the task being rejected because * of the timeout (i.e. it cannot be started in time) * @throws TaskRejectedException if the given task was not accepted */ void execute(Runnable task, long startTimeout); /** * Submit a Runnable task for execution, receiving a Future representing that task. * The Future will return a {@code null} result upon completion. * @param task the {@code Runnable} to execute (never {@code null}) * @return a Future representing pending completion of the task * @throws TaskRejectedException if the given task was not accepted * @since 3.0 */ Future<?> submit(Runnable task); /** * Submit a Callable task for execution, receiving a Future representing that task. * The Future will return the Callable's result upon completion. * @param task the {@code Callable} to execute (never {@code null}) * @return a Future representing pending completion of the task * @throws TaskRejectedException if the given task was not accepted * @since 3.0 */ <T> Future<T> submit(Callable<T> task); }
接着繼續看:TaskExecutor接口:只有重寫了Executor接口 的execute抽象方法
@FunctionalInterface public interface TaskExecutor extends Executor { /** * Execute the given {@code task}. * <p>The call might return immediately if the implementation uses * an asynchronous execution strategy, or might block in the case * of synchronous execution. * @param task the {@code Runnable} to execute (never {@code null}) * @throws TaskRejectedException if the given task was not accepted */ @Override void execute(Runnable task); }
那么我們繼續看:Executor接口————————你會發現,原來異步線程池,主要是還是使用了頂層容器:Executor接口
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
那么,我們一起看看execute方法,具體邏輯:ThreadPoolTaskExecutor--》execute,看源碼如下:
public void execute(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { executor.execute(task); //我們繼續進入該方法,看看里面的邏輯是什么 } catch (RejectedExecutionException var4) { //這里是拒絕策略 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } }
我們繼續進入該方法,看看里面的邏輯是什么:executor.execute(task);
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) {
//這里不用講解了吧。。。。大家都看得懂 if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //上面這個變量的定義是:AtomicInteger ctl=new AtomicInteger(ctlOf(RUNNING,0)) //得到工作隊列數 if (workerCountOf(c) < corePoolSize) { // if (addWorker(command, true)) //添加進工作隊列 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //判斷如果當前線程任務正在運行,那么把線程放入工作隊列中,並且成功,進入if方法 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //再次判斷該線程不運行了,並且從工作隊列中移除成功 reject(command); //拒絕該線程 else if (workerCountOf(recheck) == 0) //如果正在工作的線程數量為0,進入下面的addWorker addWorker(null, false); //null參數的意思是,沒有線程可以加入工作隊列 } else if (!addWorker(command, false)) reject(command); }