線程池最佳實踐


簡單演示一下如何使用線程池

private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {

        //使用阿里巴巴推薦的創建線程池的方式
        //通過ThreadPoolExecutor構造函數自定義參數創建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
            });
        }
        //終止線程池
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished all threads");
    }

1. 使用 ThreadPoolExecutor 的構造函數聲明線程池

1. 線程池必須手動通過 ThreadPoolExecutor 的構造函數來聲明,避免使用Executors 類的 newFixedThreadPool 和 newCachedThreadPool ,因為可能會有 OOM 的風險。

Executors 返回線程池對象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,從而導致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允許創建的線程數量為 Integer.MAX_VALUE ,可能會創建大量線程,從而導致 OOM。

說白了就是:使用有界隊列,控制線程創建數量。

除了避免 OOM 的原因之外,不推薦使用 Executors 提供的兩種快捷的線程池的原因還有:

  1. 實際使用中需要根據自己機器的性能、業務場景來手動配置線程池的參數比如核心線程數、使用的任務隊列、飽和策略等等。
  2. 我們應該顯示地給我們的線程池命名,這樣有助於我們定位問題。

2.監測線程池運行狀態

你可以通過一些手段來檢測線程池的運行狀態比如 SpringBoot 中的 Actuator 組件。

除此之外,我們還可以利用 ThreadPoolExecutor 的相關 API做一個簡陋的監控。從下圖可以看出, ThreadPoolExecutor提供了獲取線程池當前的線程數和活躍線程數、已經執行完成的任務數、正在排隊中的任務數等等。

下面是一個簡單的 Demo。printThreadPoolStatus()會每隔一秒打印出線程池的線程數、活躍線程數、完成的任務數、以及隊列中的任務數。

    /**
     * 打印線程池的狀態
     *
     * @param threadPool 線程池對象
     */
    public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-thread-pool-status", false));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
            log.info("Active Threads: {}", threadPool.getActiveCount());
            log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);
    }

 

3.建議不同類別的業務用不同的線程池

很多人在實際項目中都會有類似這樣的問題:我的項目中多個業務需要用到線程池,是為每個線程池都定義一個還是說定義一個公共的線程池呢?

一般建議是不同的業務使用不同的線程池,配置線程池的時候根據當前業務的情況對當前線程池進行配置,因為不同的業務的並發以及對資源的使用情況都不同,重心優化系統性能瓶頸相關的業務。

我們再來看一個真實的事故案例! (本案例來源自:《線程池運用不當的一次線上事故》 ,很精彩的一個案例)

上面的代碼可能會存在死鎖的情況,為什么呢?畫個圖給大家捋一捋。

試想這樣一種極端情況:

假如我們線程池的核心線程數為 n,父任務(扣費任務)數量為 n,父任務下面有兩個子任務(扣費任務下的子任務),其中一個已經執行完成,另外一個被放在了任務隊列中。由於父任務把線程池核心線程資源用完,所以子任務因為無法獲取到線程資源無法正常執行,一直被阻塞在隊列中。父任務等待子任務執行完成,而子任務等待父任務釋放線程池資源,這也就造成了 "死鎖"。

解決方法也很簡單,就是新增加一個用於執行子任務的線程池專門為其服務。

4.別忘記給線程池命名

初始化線程池的時候需要顯示命名(設置線程池名稱前綴),有利於定位問題。

默認情況下創建的線程名字類似 pool-1-thread-n 這樣的,沒有業務含義,不利於我們定位問題。

給線程池里的線程命名通常有下面兩種方式:

**1.利用 guava 的 ThreadFactoryBuilder **

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

 2.自己實現 ThreadFactor

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author zfang
 * @date 2021/6/11 15:04
 */
public class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 創建一個帶名字的線程池生產工廠
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniqueness
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

 

 shutdown

    public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
        executor.shutdown();//停止接收新任務,原來的任務繼續執行
        int retry = 3;
        while (retry > 0) {
            retry--;
            try {
                //當前線程阻塞,直到:
                //等所有已提交的任務(包括正在跑的和隊列中等待的)執行完;
                //或者 等超時時間到了(timeout 和 TimeUnit設定的時間);
                //或者 線程被中斷,拋出InterruptedException
                if (executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.interrupted();
            } catch (Throwable ex) {
                if (logger != null) {
                    logger.error("ThreadPoolManager shutdown executor has error : ", ex);
                }
            }
        }
        //立即停止線程池,正在跑的和正在等待的任務都停下了
        executor.shutdownNow();
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM