線程池工具類幾種實現


線程池參數:核心線程數設置,根據生產環境平時QPS,任務處理能力決定,但也不能絕對參照這一算法。也與服務器整體處理能力,配置有關。
如:QPS是10,處理任務時間2S,核心線程數至少應該設置為20。也就是,10個任務需要總時長20S完成。那至少需要20個線程同時處理,粗略算法,其他因素影響需要留出冗余。
還有一種核心線程數 設置公式參見:https://www.cnblogs.com/warehouse/p/10810338.html
其結論:
IO密集型 = 2Ncpu(可以測試后自己控制大小,2Ncpu一般沒問題)(常出現於線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等)
計算密集型 = Ncpu(常出現於線程中:復雜算法)+ 1

一 線程池工具類

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description 線程池工具類
 */
public class ThreadPoolUtil {

    /**
     * 核心線程數,會一直存活,即使沒有任務,線程池也會維護線程的最少數量
     */
    private static final int SIZE_CORE_POOL = 5;
    /**
     * 線程池維護線程的最大數量
     */
    private static final int SIZE_MAX_POOL = 10;
    /**
     * 線程池維護線程所允許的空閑時間
     */
    private static final long ALIVE_TIME = 2000;
    /**
     * 線程緩沖隊列
     */
    private static BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(100);
    private static ThreadPoolExecutor pool = new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, ALIVE_TIME, TimeUnit.MILLISECONDS, bqueue, new ThreadPoolExecutor.CallerRunsPolicy());

    static {
        pool.prestartAllCoreThreads();
    }

    public static ThreadPoolExecutor getPool() {
        return pool;
    }
}

測試類

import com.dashuai.cloud.consulconsumer.util.ThreadPoolUtil;

public class TestUtil {
    public static void main(String[] args) {
        ThreadPoolUtil.getPool().execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("線程池調用");
            }
        });
    }
}

二 線程池支持多線程返回結果

import org.springframework.stereotype.Service;
import org.springframework.beans.factory.DisposableBean;


/**
 * ClassName:CommenThreadPoolUtil <br/>
 * Function:線程池公共入口處理類. <br/>
 *
 */
@Service
public class CommonThreadPoolUtil implements DisposableBean{

    // 核心線程數(默認初始化為10)
    private int cacheCorePoolSize = 8;

    // 核心線程控制的最大數目
    private int maxCorePoolSize = 160;

    // 隊列等待線程數閾值
    private int blockingQueueWaitSize = 16;

    // 核心線程數自動調整的增量幅度
    private int incrementCorePoolSize = 4;

    // 初始化線程對象ThreadLocal,重寫initialValue(),保證ThreadLocal首次執行get方法時不會null異常
    private ThreadLocal<List<Future<?>>> threadlocal = new ThreadLocal<List<Future<?>>>() {

        protected List<Future<?>> initialValue() {

            return new ArrayList<Future<?>>();
        }
    };

    // 初始化線程池
    private MyselfThreadPoolExecutor ThreadPool = new MyselfThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());

    /**
     *
     * dealTask:(線程池執行操作-包含每個進程返回結果). <br/>
     * 1、運用場景:例如,需要同時校驗很多不同的邏輯,依賴於獲取校驗結果響應給用戶; 2、具體實現java類:implements
     * 的Callable接口,重寫call方法即可,支持返回值
     *
     * @author
     * @param callable
     * @return
     */
    public Map<String, Object> dealTask(Callable<?> callable) {

        try {
            // 動態更改核心線程數大小
            dynamicTuningPoolSize();
            // 執行線程業務邏輯及獲取返回結果
            Future<?> result = ThreadPool.submit(callable);
            // 獲取當前進程的局部變量
            List<Future<?>> threadLocalResult = threadlocal.get();
            // 疊加主進程對應的多個進程處理結果
            threadLocalResult.add(result);
            // 設置最新的threadLocal變量到當前主進程
            threadlocal.set(threadLocalResult);
        } catch (Exception e) {
            e.printStackTrace();
            return errorResp("線程池發生異常-Future", null);
        }
        return successResp(null);
    }

    /**
     *
     * dealTask:(線程池執行操作-不包含每個進程返回結果). <br/>
     * 1、運用場景:例如,不依賴於響應給用戶執行結果的業務邏輯 ; 2、具體實現java類:implements
     * 的Runnable接口,重寫run方法,沒有返回值
     *
     * @author
     * @param runnable
     * @return
     */
    public Map<String, Object> dealTask(Runnable runnable) {

        try {
            // 動態更改核心線程數大小
            dynamicTuningPoolSize();
            // 執行線程業務邏輯
            ThreadPool.execute(runnable);
        } catch (Exception e) {
            e.printStackTrace();
            return errorResp("線程池發生異常", null);
        }
        return successResp(null);
    }

    /**
     * obtainTaskFuture:(獲取線程池執行結果:此為阻塞線程,即所有線程都執行完成才能獲取結果,故應將執行時間稍長的業務邏輯先執行,
     * 減少等待時間). <br/>
     * 此方法只能調用一次,即調用之后清除ThreadLocal變量,以便於同一進程再次調用線程池獲取最新的執行結果以及釋放內存, 防止內存泄露
     *
     * @author
     * @return
     */
    public Map<String, Object> obtainTaskFuture() {

        List<Future<?>> threadLocalResult = null;
        try {
            // 獲取當前進程變量
            threadLocalResult = threadlocal.get();
            if (threadLocalResult == null || threadLocalResult.size() == 0) {
                return errorResp("獲取線程池執行結果為空", null);
            } else {
                return successResp(threadLocalResult);
            }
        } catch (Exception e) {
            return errorResp("獲取線程池執行結果發生異常:" + e.getMessage(), null);
        } finally {
            // 1、釋放內存;2、防止主進程再次調用線程池方法時對結果互有影響。
            threadlocal.remove();
        }

    }

    /**
     *
     * dynamicTuningPoolSize:(動態改變核心線程數). <br/>
     *
     * @author
     * @return
     */
    private void dynamicTuningPoolSize() {

        // 隊列等待任務數(此為近似值,故采用>=判斷)
        int queueSize = ThreadPool.getQueueSize();
        // 動態更改核心線程數大小
        if (queueSize >= blockingQueueWaitSize) {
            // 核心線程數小於設定的最大線程數才會自動擴展線程數
            if (cacheCorePoolSize <= maxCorePoolSize) {
                // 原有核心線程數
                int corePoolSize = ThreadPool.getCorePoolSize();
                // 將要累積的核心線程數
                int currentcorePoolSize = corePoolSize + incrementCorePoolSize;
                ThreadPool.setCorePoolSize(currentcorePoolSize);
                ThreadPool.setMaximumPoolSize(currentcorePoolSize);
                cacheCorePoolSize = currentcorePoolSize;
                System.out.println("動態改變線程池大小====原核心線程池數目為:" + corePoolSize + ";現累加為:" + currentcorePoolSize);
            } else {
                System.out.println("動態改變線程池大小====核心線程池數目已累加為:" + cacheCorePoolSize + ";不會繼續無限增加");
            }
        } else {
            // 縮容
            if (queueSize == 0 && cacheCorePoolSize >= CORE_POOL_SIZE) {
                // 原有核心線程數
                int corePoolSize = ThreadPool.getCorePoolSize();
                // 將要累積的核心線程數
                int currentcorePoolSize = corePoolSize - incrementCorePoolSize;

                if (currentcorePoolSize <= CORE_POOL_SIZE) {
                    currentcorePoolSize = CORE_POOL_SIZE;
                }
                ThreadPool.setCorePoolSize(currentcorePoolSize);
                ThreadPool.setMaximumPoolSize(currentcorePoolSize);
                cacheCorePoolSize = currentcorePoolSize;
            }
        }
    }

    /**
     * 獲取核心線程數 getCacheCorePoolSize:(). <br/>
     *
     * @author
     * @return
     */
    public int getCacheCorePoolSize() {

        return ThreadPool.getCorePoolSize();
    }

    /**
     * 設置核心線程數 setCacheCorePoolSize:(). <br/>
     *
     * @author
     * @param cacheCorePoolSize
     */
    public void setCacheCorePoolSize(int cacheCorePoolSize) {

        ThreadPool.setCorePoolSize(cacheCorePoolSize);
        ThreadPool.setMaximumPoolSize(cacheCorePoolSize);
        this.cacheCorePoolSize = cacheCorePoolSize;
    }

    /**
     *
     * successResp:(正確響應信息). <br/>
     *
     * @author
     * @param data
     * @return
     */
    private Map<String, Object> successResp(Object data) {

        Map<String, Object> result = new HashMap<String, Object>();
        result.put("status", "0");
        result.put("data", data);
        return result;

    }

    /**
     *
     * errorResp:(錯誤響應信息). <br/>
     *
     * @author
     * @param errorMsg
     * @param data
     * @return
     */
    public Map<String, Object> errorResp(String errorMsg, Object data) {

        Map<String, Object> result = new HashMap<String, Object>();
        result.put("status", "1");
        result.put("msg", errorMsg);
        result.put("data", data);
        return result;

    }

    @Override
    public void destroy() throws Exception {
        ThreadPool.shutdown();
        logger.info("線程池銷毀");
    }
}

創建線程池類

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class MyselfThreadPoolExecutor extends ThreadPoolExecutor {


	// 初始化父類構造函數及startTime
	public MyselfThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {

		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	// 按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務(已執行的任務不會停止)
	@Override
	public void shutdown() {

		super.shutdown();

	}

	// 嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。在從此方法返回的任務隊列中排空(移除)這些任務。並不保證能夠停止正在處理的活動執行任務,但是會盡力嘗試。
	@Override
	public List<Runnable> shutdownNow() {

		return super.shutdownNow();

	}

	// 在執行給定線程中的給定 Runnable 之前調用的方法.可用於重新初始化ThreadLocals或者執行日志記錄。
	@Override
	protected void beforeExecute(Thread t, Runnable r) {

		super.beforeExecute(t, r);
	}

	// 基於完成執行給定 Runnable 所調用的方法
	@Override
	protected void afterExecute(Runnable r, Throwable t) {

		super.afterExecute(r, t);

		try {
			// Future<?> result = (Future<?>) r;
			// "任務結果:" result.get();
		} catch (Exception e) {
		}
	}

	/**
	 * 
	 * getQueueSize:(已執行的任務數). <br/>
	 *
	 * @author
	 * @return
	 */
	@Override
	public long getCompletedTaskCount() {

		return super.getCompletedTaskCount();
	}

	/**
	 * 
	 * getQueueSize:(正在運行的任務數). <br/>
	 *
	 * @author
	 * @return
	 */
	@Override
	public int getActiveCount() {

		return super.getActiveCount();
	}

	/**
	 * 
	 * getQueueSize:(隊列等待任務數). <br/>
	 *
	 * @author
	 * @return
	 */
	public int getQueueSize() {

		return getQueue().size();
	}
}

測試類

public class TestUtil {
    public static void main(String[] args) {
        
        CommonThreadPoolUtil poolUtil = new CommonThreadPoolUtil();
        poolUtil.dealTask(new Runnable() {
            @Override
            public void run() {
                System.out.println("線程池調用");
            }
        });
        poolUtil.dealTask(new Callable<HashMap<String, Object>>() {
            @Override
            public HashMap<String, Object> call() {
                System.out.println("線程池調用");
                return new HashMap<String, Object>();
            }
        }
        Map<String, Object> result = poolUtil.obtainTaskFuture();
        List<Future<HashMap<String, Object>>> list = (List<Future<HashMap<String, Object>>>) result.get("data");
        for(int j = 0; j < list.size(); j++){
            Future<HashMap<String, Object>> future = list.get(j);
        }
    }
}

三 jdk1.5之后提供工具類 Executors
工具類Executors面提供了一些靜態工廠方法,生成一些常用的線程池,如下所示:

  • newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制(Interger. MAX_VALUE),線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。

  • newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

  • newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

  • newScheduledThreadPool:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

總結:除了newScheduledThreadPool的內部實現特殊一點之外,其它線程池內部都是基於 ThreadPoolExecutor 類(Executor的子類)實現的。
實現:

public class TestUtil {
    public static void main(String[] args) {

        ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(10);
        scheduExec.schedule(new Runnable() {

            @SuppressWarnings("static-access")
            @Override
            public void run() {
                System.out.println("20秒后處理");
            }
        }, 20, TimeUnit.SECONDS);
    }
}

周期性定時任務20秒后執行

四、線程池組

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Service("dispatchExecutorsPool")
public class DispatchExecutorsPool implements DisposableBean {
    private int default_corePoolSize = 4;
    private int default_maximumPoolSize = 500;
    private int default_keepAliveTime = 300 * 1000;
    private static final Logger logger = LoggerFactory.getLogger(DispatchExecutorsPool.class);
    //初始化線程池
    private ThreadPoolExecutor orderThreadPool = null;
    private ThreadPoolExecutor productThreadPool = null;

    @Value("${executor_core_size}")
    private String executor_core_size;
    @Value("${executor_max_size}")
    private String executor_max_size;
    @Value("${executor_alive_time}")
    private String executor_alive_time;

    /**
     * 線程池集合
     */
    List<ThreadPoolExecutor> executorList = new ArrayList<>();

    @PostConstruct
    public void init() {
        orderThreadPool = generateOrderExecutor(executor_core_size, executor_max_size, executor_alive_time);
        productThreadPool = generatePorductExecutor(executor_core_size, executor_max_size, executor_alive_time);
    }

    /**
     * @Description: 訂單拉取線程池
     * @Date: 2021/8/9 14:23
     * @Param: [coolSize, maxSize, keepAliveTime]
     * @Return: java.util.concurrent.ThreadPoolExecutor
     */
    private ThreadPoolExecutor generateOrderExecutor(String coolSize, String maxSize, String keepAliveTime) {
        int configPoolSize = Integer.valueOf(coolSize);
        int configMaxSize = Integer.valueOf(maxSize);
        int configKeepAliveTime = Integer.valueOf(keepAliveTime);
        ThreadPoolExecutor executor = new ThreadPoolExecutor((configPoolSize == 0 ? default_corePoolSize : configPoolSize),
                (configMaxSize == 0 ? default_maximumPoolSize : configMaxSize),
                (configKeepAliveTime == 0 ? default_keepAliveTime : configKeepAliveTime),
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            //使用hook在spring close的同時停止線程池shutdown之后的new submit task會走threadPool.setRejectedExecutionHandler
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                    try {
                        logger.error("DispatchExecutorsPool.rejectedExecution:" + executor.toString());
                        executor.awaitTermination(15, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        executor.getQueue().size();
                        logger.error("executor_drg.awaitTermination.error", e);
                    }
                }
            }
        });
        executorList.add(executor);
        return executor;
    }

    /**
     * @Description: 獲取線程池
     * @Date: 2021/8/9 14:24
     * @Param: [coolSize, maxSize, keepAliveTime]
     * @Return: java.util.concurrent.ThreadPoolExecutor
     */
    private ThreadPoolExecutor generatePorductExecutor(String coolSize, String maxSize, String keepAliveTime) {
        int configPoolSize = Integer.valueOf(coolSize);
        int configMaxSize = Integer.valueOf(maxSize);
        int configKeepAliveTime = Integer.valueOf(keepAliveTime);
        ThreadPoolExecutor executor = new ThreadPoolExecutor((configPoolSize == 0 ? default_corePoolSize : configPoolSize),
                (configMaxSize == 0 ? default_maximumPoolSize : configMaxSize),
                (configKeepAliveTime == 0 ? default_keepAliveTime : configKeepAliveTime),
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            //使用hook在spring close的同時停止線程池shutdown之后的new submit task會走threadPool.setRejectedExecutionHandler
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                    try {
                        logger.error("DispatchExecutorsPool.rejectedExecution:" + executor.toString());
                        executor.awaitTermination(15, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        executor.getQueue().size();
                        logger.error("executor_drg.awaitTermination.error", e);
                    }
                }
            }
        });
        executorList.add(executor);
        return executor;
    }

    @Override
    public void destroy() throws Exception {
        for (ThreadPoolExecutor threadPoolExecutor : executorList) {
            threadPoolExecutor.shutdown();
        }
    }

    /**
     * @Description: 獲取訂單線程池
     * @Date: 2021/8/9 14:25
     * @Param:
     * @Return:
     */
    public ThreadPoolExecutor getOrderThreadPool() {
        return orderThreadPool;
    }

    /**
     * @Description: 產品線程池
     * @Date: 2021/8/9 14:26
     * @Param: []
     * @Return: java.util.concurrent.ThreadPoolExecutor
     */
    public ThreadPoolExecutor getProductThreadPool() {
        return productThreadPool;
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM