Java如何讓線程池滿后再存放隊列


1.線程池源碼分析:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){};

核心線程數量(corePoolSize)、最大線程數量(maximumPoolSize)、超出核心線程數量的存活時間(keepAliveTime)、

超出核心線程數量的存活時間單位(unit)、存放任務隊列(workQueue)、執行程序創建新線程時使用的工廠(threadFactory)、當線程邊界和隊列容量達到時拒絕策略(handler)

正常線程池工作流程

1:當提交的任務小於核心線程池數量的時候,使用線程池中的核心線程。

2:當提交的任務大於線程池中核心線程數量的時候,會將新任務存放到隊列中。

3:當隊列存滿后,會開啟新線程直到達到設置的最大線程池數量。

4:當隊列存滿后,且線程池中的最大線程數量達到最大的時候,這時候在提交過來任務,直接采用線程池設置的拒絕策略。

2.場景分析

由上面可得,如果隊列在沒有存滿的情況下我們的最大線程數量是沒有開啟的,這時候並沒有達到我們想要的多線程的效果。所以我們需要改寫一下邏輯

1:自定義線程池繼承ThreadPoolExecutor類,改寫核心的邏輯。

2:自定義隊列繼承LinkedBlockingQueue,改寫 offer 方法。

自定義隊列方法:

package com.example.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @Description: 線程池工作隊列重寫
 * @Author: mingtian
 * @CreateDate: 2021/4/9 13:22
 * @Version: 1.0
 */
public class TaskQueue<Runnable> extends LinkedBlockingQueue<Runnable> {

    /**
     * 打印日志
     */
    private static Logger logger = LoggerFactory.getLogger(TaskQueue.class);

    /**
     * 自定義的線程池類,繼承自ThreadPoolExecutor
     */
    private CustomThreadPoolExecutor threadPoolExecutor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    /**
     * 對象賦值
     *
     * @param customThreadPoolExecutor
     */
    public void setExecutor(CustomThreadPoolExecutor customThreadPoolExecutor) {
        threadPoolExecutor = customThreadPoolExecutor;
    }

    /**
     * offer方法的含義是:將任務提交到隊列中,返回值為true/false,分別代表提交成功/提交失敗。
     * 作用:TaskQueue的offer返回值來決定是否創建更多的線程,達到先判斷maximumPoolSize再判斷隊列的目的
     *
     * @param runnable
     * @return
     */
    @Override
    public boolean offer(Runnable runnable) {
        if (threadPoolExecutor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        // 線程池的當前線程數
        int currentPoolThreadSize = threadPoolExecutor.getPoolSize();
        if (threadPoolExecutor.getSubmittedTaskCount() < currentPoolThreadSize) {
            // 已提交的任務數量小於當前線程數,意味着線程池中有空閑線程,直接扔進隊列里,讓線程去處理
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < threadPoolExecutor.getMaximumPoolSize()) {
            // 重點: 當前線程數小於 最大線程數 ,返回false,暗含入隊失敗,讓線程池去創建新的線程
            return false;
        }
        // 重點: 代碼運行到此處,說明當前線程數 >= 最大線程數,需要真正的提交到隊列中
        return super.offer(runnable);
    }

    /**
     * 重試 在線程池沒有關閉的狀態時 將任務存放到隊列中
     *
     * @param o
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (threadPoolExecutor.isShutdown()) {
            logger.error("threadPoolExecutor is shutdown!!!");
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}

自定義線程池類:

package com.example.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Description: 自定義線程池 重寫線程池執行順序
 * @Author: mingtian
 * @CreateDate: 2021/4/9 13:21
 * @Version: 1.0
 */
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * 打印日志
     */
    private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class);

    /**
     * 定義一個成員變量,用於記錄當前線程池中已提交的任務數量
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    /**
     * 自定義線程池
     *
     * @param corePoolSize    核心線程池數量
     * @param maximumPoolSize 最大線程池數量
     * @param keepAliveTime   超過核心線程池數量存活時間
     * @param unit            超過核心線程池數量存活時間單位
     * @param workQueue       存放任務的隊列
     * @param threadFactory   線程工廠 可以定義線程池名稱
     * @param handler         當隊列滿時執行拒絕策略
     */
    public CustomThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit, TaskQueue<Runnable> workQueue,
                                    ThreadFactory threadFactory,
                                    RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /***
     * 獲取線程池中的任務數量
     * @return
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    /**
     * 獲取線程池對象
     *
     * @return
     */
    public CustomThreadPoolExecutor getThreadPoolExecutor() {
        return CustomThreadPoolExecutorUtil.getCustomThreadPoolExecutor();
    }

    /**
     * 方法執行完畢之后執行
     *
     * @param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // ThreadPoolExecutor的勾子方法,在task執行完后需要將池中已提交的任務數 - 1
        submittedTaskCount.decrementAndGet();
    }

    /**
     * 重寫execute 方法
     *
     * @param command
     */
    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 將池中已提交的任務數 + 1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    logger.warn("submittedTaskCount:{},maximumPoolSize:{},queueSize:{},completedTaskCount:{}",
                            getSubmittedTaskCount(), getThreadPoolExecutor().getMaximumPoolSize(),
                            getThreadPoolExecutor().getQueue().size(), getThreadPoolExecutor().getCompletedTaskCount());
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

測試類:

package com.example.util;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: 自定義線程池隊列
 * @Author: mingtian
 * @CreateDate: 2021/4/9 13:28
 * @Version: 1.0
 */
public class CustomThreadPoolExecutorUtil {
    /**
     * 打印日志
     */
    private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutorUtil.class);

    /**
     * 默認 CPU 核心數
     */
    private static int threadPoolSize = 0;

    static {
        // 獲取服務器 CPU 核心數
        threadPoolSize = Runtime.getRuntime().availableProcessors();
        logger.info("服務器 CPU 核心數量:{}", threadPoolSize);
    }

    public static int getThreadPoolSize() {
        return threadPoolSize;
    }

    /**
     * 線程工廠,用來創建線程
     */
    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("port-pool-%d").build();

    private static TaskQueue taskQueue = new TaskQueue<>(10);

    /**
     * 自定義線程池
     */
    private static CustomThreadPoolExecutor CustomThreadPoolExecutor = new CustomThreadPoolExecutor(2, 2 * 2,
            60L, TimeUnit.SECONDS, taskQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());

    /**
     * 獲取線程池對象方法
     *
     * @return
     */
    public static CustomThreadPoolExecutor getCustomThreadPoolExecutor() {
        return CustomThreadPoolExecutor;
    }

    /**
     * 模擬發送消息方法
     */
    public static class SendMessage implements Runnable {
        private int i;

        public SendMessage(int i) {
            this.i = i;
        }

        @SneakyThrows
        @Override
        public void run() {
            logger.info("我是第{}條消息,poolSize:{},queueSize:{},activeCount:{},completedTaskCount:{}", i,
                    CustomThreadPoolExecutor.getPoolSize(), CustomThreadPoolExecutor.getQueue().size(),
                    CustomThreadPoolExecutor.getActiveCount(), CustomThreadPoolExecutor.getCompletedTaskCount());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        logger.info("-------------------------------開始測試--------------------------------------");
        taskQueue.setExecutor(CustomThreadPoolExecutor);
        for (int i = 1; i <= 16; i++) {
            CustomThreadPoolExecutorUtil.SendMessage sendMessage = new CustomThreadPoolExecutorUtil.SendMessage(i);
            CustomThreadPoolExecutor.execute(sendMessage);
        }
        Thread.sleep(10000);
        CustomThreadPoolExecutor.shutdown();
        logger.info("-------------------------------測試結束--------------------------------------");
    }
}

自定義線程池核心邏輯:

當提交任務到CustomThreadPoolExecutor的時候,執行 submittedTaskCount.incrementAndGet(); 將線程池中數量+1處理,然后調用父類 super.execute(command); 執行。

1         // 代碼運行到此處,說明線程數 >= corePoolSize, 此時workQueue為自定義的TaskQueue       
2         if (isRunning(c) && workQueue.offer(command)) {
3             int recheck = ctl.get();
4             if (! isRunning(recheck) && remove(command))
5                 reject(command);
6             else if (workerCountOf(recheck) == 0)
7                 addWorker(null, false);
8         }

自定義隊列核心邏輯:

當執行到 workQueue.offer(command) 方法的時候走的我們自定義隊列TaskQueue的offer方法,而offer方法的返回值決定着是否創建更多的線程:返回true,代表入隊成功,不創建線程;返回false,代表入隊失敗,需要創建線程。

 1     public boolean offer(Runnable runnable) {
 2         if (threadPoolExecutor == null) {
 3             throw new RejectedExecutionException("The task queue does not have executor!");
 4         }
 5         // 線程池的當前線程數
 6         int currentPoolThreadSize = threadPoolExecutor.getPoolSize();
 7         if (threadPoolExecutor.getSubmittedTaskCount() < currentPoolThreadSize) {
 8             // 已提交的任務數量小於當前線程數,意味着線程池中有空閑線程,直接扔進隊列里,讓線程去處理
 9             return super.offer(runnable);
10         }
11 
12         // return false to let executor create new worker.
13         if (currentPoolThreadSize < threadPoolExecutor.getMaximumPoolSize()) {
14             // 重點: 當前線程數小於 最大線程數 ,返回false,暗含入隊失敗,讓線程池去創建新的線程
15             return false;
16         }
17         // 重點: 代碼運行到此處,說明當前線程數 >= 最大線程數,需要真正的提交到隊列中
18         return super.offer(runnable);
19     }

核心邏輯:當前線程數小於最大線程數就返回false,代表入隊失敗,需要創建線程。

因此,總結起來就是:自定義的CustomThreadPoolExecutor依賴自定義的TaskQueue的offer返回值來決定是否創建更多的線程,達到先判斷maximumPoolSize再判斷隊列的目的。

3.參考文獻

tomcat 源碼中的線程池也是使用的這樣的思想,該例子來源於tomcat源碼思想。

tomcat 線程池 源碼:

 private final AtomicInteger submittedCount = new AtomicInteger(0);

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet(); try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

tomcat 源碼中 TaskQueue 源碼:

    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM