java線程池的使用(jdk1.8)


今天是2022-02-19,周六。今天我們來聊聊java的線程池。大家聽到線程池,第一反應是聯想到線程。那么何為線程池?跟線程有啥區別?以及線程池、線程都是怎么使用?帶着這些疑問,看完這篇大家有幾本的了解。

 

一、線程池是什么?

線程池就是可以創建固定線程數量、最大線程數、等待隊列數、每一個線程的執行時間、線程的名稱等參數的線程。大家可以理解成,線程池就是多個線程組成,但是跟線程又有區別。線程是單一且需要時就創建,執行完任務就銷毀,而線程池就不會,需要就取一個創建好的線程,用完就放回去。

 

二、創建線程池有哪些方式?(使用Executors頂層容器靜態類實現創建)

  1、Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE

  2、Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池

  3、Executory.newFixedThreadPool(int); //創建固定容量大小的線程池

 

三、使用ThreadPoolExecutor創建線程池(參數可以定義配置到配置到配置文件)

下面直接上代碼:

import cn.hutool.json.JSONUtil;
import com.huawei.wps.config.properties.AsyncConvertProgressThreadPoolProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync //這個注解是因為我后面使用的是異步線程池,所以加了這個開啟異步
@Slf4j //日志
public class AsyncConvertProgressThreadPoolTaskConfig {
    /**
     * 默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,
     * 當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
     * 當隊列滿了,就繼續創建線程,當線程數量大於等於maxPoolSize后,開始使用拒絕策略拒絕
     */

    @Autowired
    private AsyncThreadPoolProperties asyncThreadPoolProperties;

    //配置線程池
    @Bean("AsycnTaskExecutor")
    public ThreadPoolTaskExecutor convertProgressTaskExecutor() {
        log.info("加載異步線程池:{}", JSONUtil.toJsonStr(asyncThreadPoolProperties)); //加載參數
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用ThreadPoolTaskExector線程池類
        executor.setCorePoolSize(asyncThreadPoolProperties.getCorePoolSize()); //配置核心線程數
        executor.setMaxPoolSize(asyncThreadPoolProperties.getMaxPoolSize()); //配置最大線程數
        executor.setQueueCapacity(asyncThreadPoolProperties.getQueueCapacity());//配置等待線程池數的容量
        executor.setKeepAliveSeconds(asyncThreadPoolProperties.getKeepAliveTime()); //
        executor.setThreadNamePrefix(asyncThreadPoolProperties.getThreadNamePrefix()); //

        // 線程池對拒絕任務的處理策略
        // CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //配置拒絕策略
        // 初始化
        executor.initialize();
        return executor;
    }
}

 

上面是配置,那么如何使用這個線程池?

 @Async("AsyncTaskExecutor")//該注解是實現下面的方法異步,至於:AsyncTaskExecutory是因為綁定了上面的Bean實例,也就是使用線程池。完整的解釋是:使用異步線程池來完成方法:asyncTask的業務處理
 public void asyncTask(Integer paramater,String phone) {
}

上面是參數配置,以及異步線程池的使用。目前我在實際工作中,文件進行處理(分片上傳,分片下載),使用該異步線程池,都能滿足業務的需要(這里不能說是最優,畢竟其他小伙伴肯定其他更好的方案)。

 

四、扒拉原理(待講解)

 大家看到了異步線程池主要是,那么我們來看看里面有哪些邏輯代碼:ThreadPoolTaskExecutor

ThreadPoolTaskExecutor: 原來是繼承了ExecutorConfigurationSupport和實現AnyncListanbleTaskExecutory、SchedulingTaskExecutor

 

 

ExecutorConfigurationSupport.class如下:
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
    protected final Log logger = LogFactory.getLog(this.getClass());
    private ThreadFactory threadFactory = this;
    private boolean threadNamePrefixSet = false;
    private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();  //給了默認拒絕策略
    private boolean waitForTasksToCompleteOnShutdown = false;
    private long awaitTerminationMillis = 0L;
    @Nullable
    private String beanName;
    @Nullable
    private ExecutorService executor;

    public ExecutorConfigurationSupport() {
    }

//省了其它get、set代碼
}

 

AsyncListenableTaskExecutor.class如下:
public interface AsyncListenableTaskExecutor extends AsyncTaskExecutor {

    /**
     * Submit a {@code Runnable} task for execution, receiving a {@code ListenableFuture}
     * representing that task. The Future will return a {@code null} result upon completion.
     * @param task the {@code Runnable} to execute (never {@code null})
     * @return a {@code ListenableFuture} representing pending completion of the task
     * @throws TaskRejectedException if the given task was not accepted
     */
    ListenableFuture<?> submitListenable(Runnable task);

    /**
     * Submit a {@code Callable} task for execution, receiving a {@code ListenableFuture}
     * representing that task. The Future will return the Callable's result upon
     * completion.
     * @param task the {@code Callable} to execute (never {@code null})
     * @return a {@code ListenableFuture} representing pending completion of the task
     * @throws TaskRejectedException if the given task was not accepted
     */
    <T> ListenableFuture<T> submitListenable(Callable<T> task);

}

 

接着:AsyncTaskExecutor接口如下:你會發現只有execute、sumbit抽象方法 

public interface AsyncTaskExecutor extends TaskExecutor {

    /** Constant that indicates immediate execution. */
    long TIMEOUT_IMMEDIATE = 0;

    /** Constant that indicates no time limit. */
    long TIMEOUT_INDEFINITE = Long.MAX_VALUE;


    /**
     * Execute the given {@code task}.
     * @param task the {@code Runnable} to execute (never {@code null})
     * @param startTimeout the time duration (milliseconds) within which the task is
     * supposed to start. This is intended as a hint to the executor, allowing for
     * preferred handling of immediate tasks. Typical values are {@link #TIMEOUT_IMMEDIATE}
     * or {@link #TIMEOUT_INDEFINITE} (the default as used by {@link #execute(Runnable)}).
     * @throws TaskTimeoutException in case of the task being rejected because
     * of the timeout (i.e. it cannot be started in time)
     * @throws TaskRejectedException if the given task was not accepted
     */
    void execute(Runnable task, long startTimeout);

    /**
     * Submit a Runnable task for execution, receiving a Future representing that task.
     * The Future will return a {@code null} result upon completion.
     * @param task the {@code Runnable} to execute (never {@code null})
     * @return a Future representing pending completion of the task
     * @throws TaskRejectedException if the given task was not accepted
     * @since 3.0
     */
    Future<?> submit(Runnable task);

    /**
     * Submit a Callable task for execution, receiving a Future representing that task.
     * The Future will return the Callable's result upon completion.
     * @param task the {@code Callable} to execute (never {@code null})
     * @return a Future representing pending completion of the task
     * @throws TaskRejectedException if the given task was not accepted
     * @since 3.0
     */
    <T> Future<T> submit(Callable<T> task);

}

 

接着繼續看:TaskExecutor接口:只有重寫了Executor接口 的execute抽象方法

@FunctionalInterface
public interface TaskExecutor extends Executor {

    /**
     * Execute the given {@code task}.
     * <p>The call might return immediately if the implementation uses
     * an asynchronous execution strategy, or might block in the case
     * of synchronous execution.
     * @param task the {@code Runnable} to execute (never {@code null})
     * @throws TaskRejectedException if the given task was not accepted
     */
    @Override
    void execute(Runnable task);

}

 

那么我們繼續看:Executor接口————————你會發現,原來異步線程池,主要是還是使用了頂層容器:Executor接口

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

 

那么,我們一起看看execute方法,具體邏輯:ThreadPoolTaskExecutor--》execute,看源碼如下:

    public void execute(Runnable task) {
        ThreadPoolExecutor executor = this.getThreadPoolExecutor();

        try {
            executor.execute(task); //我們繼續進入該方法,看看里面的邏輯是什么
        } catch (RejectedExecutionException var4) { //這里是拒絕策略
            throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
        }
    }

我們繼續進入該方法,看看里面的邏輯是什么:executor.execute(task);
 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
//這里不用講解了吧。。。。大家都看得懂
if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //上面這個變量的定義是:AtomicInteger ctl=new AtomicInteger(ctlOf(RUNNING,0)) //得到工作隊列數 if (workerCountOf(c) < corePoolSize) { // if (addWorker(command, true)) //添加進工作隊列 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //判斷如果當前線程任務正在運行,那么把線程放入工作隊列中,並且成功,進入if方法 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //再次判斷該線程不運行了,並且從工作隊列中移除成功 reject(command); //拒絕該線程 else if (workerCountOf(recheck) == 0) //如果正在工作的線程數量為0,進入下面的addWorker addWorker(null, false); //null參數的意思是,沒有線程可以加入工作隊列 } else if (!addWorker(command, false)) reject(command); }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM