Java高並發,ThreadPoolExecutor線程池技術


Java當中的線程池是通過Executor這個框架接口來實現的,該框架當中用到了Executor,Executors工具類,ExecutorService,ThreadPoolExecutor

 Executors創建線程的三種方法:

ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定容量
ExecutorService threadPool = Executors.newSingleThreadExecutor();     //單例的、單個線程的線程池
ExecutorService threadPool = Executors.newCachedThreadPool();     //緩存的 即超出就自動創建線程的

接下來講解一下這三個的區別:

固定容量的線程池

首先我們看的是第一個固定容量的線程池Executors.newFixedThreadPool(5);

首先看代碼:

/**
 * 主要特點:線程復用;控制最大並發數;管理線程。
 *
 * @author Cocowwy
 * @create 2020-05-05-20:20
 * Executor/ExecutorServic(Interface)
 * Executors  線程池的工具類
 */
public class MyThreadPoolDemo {
    public static void main(String[] args) {
        //一池五個受理線程
        ExecutorService threadPool = Executors.newFixedThreadPool(5);  //看源碼是LinkedBlockingQueue<Runnable>()
        try {
            //模擬10個用戶辦理業務,但是只有5個受理窗口
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務");
                });
                Thread.sleep(400);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); //關閉線程池
        }
    }

}

結果如下:

 接着我們加上一句線程睡眠一小會的代碼:

public class MyThreadPoolDemo {
    public static void main(String[] args) {
        //一池五個受理線程
        ExecutorService threadPool = Executors.newFixedThreadPool(5);  //看源碼是LinkedBlockingQueue<Runnable>()
        try {
            //模擬10個用戶辦理業務,但是只有5個受理窗口
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務");
                });
                Thread.sleep(400);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); //關閉線程池
        }
    }

}

 在這里我們可以看到有序辦理了每個業務。可以看出這個是固定了大小的線程池,每次都是從這個線程池中取的線程。

單例的線程池

這是第二個,單例的線程池:

ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一池1個受理線程
public class MyThreadPoolDemo {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);  //一池五個受理線程,看源碼是LinkedBlockingQueue<Runnable>()
        ExecutorService threadPool = Executors.newSingleThreadExecutor();  //一池1個受理線程

        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
              System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); //關閉線程池
        }
    }
}

 我們可以看到一直是一個線程在受理業務。

可擴展的線程池

接下來是第三個線程池:ExecutorService threadPool = Executors.newCachedThreadPool(); //一池N個受理線程 可擴展的

ExecutorService threadPool = Executors.newCachedThreadPool(); //一池N個受理線程 可擴展的

接下來上代碼:

public class MyThreadPoolDemo {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);  //一池五個受理線程,看源碼是LinkedBlockingQueue<Runnable>()
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();  //一池1個受理線程
         ExecutorService threadPool = Executors.newCachedThreadPool();  //一池N個受理線程

        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    try {
                        Thread.sleep(400);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); //關閉線程池
        }
    }

}

先看看效果:

在上面的代碼中,我們可以發現的是,對從線程池取的線程睡了0.4s,然而卻可以發現創建出了3,6,9,10,1,8,7,4,5.....這么多的線程

因為我們睡眠的時間太短了,表明需要受理的業務頻率太多,所以才開辟了這么多的線程去處理。

tips:我們可以設置睡眠時間,來控制線程池的開辟數量。當我們將睡眠時間設置的盡可能的大,那么開辟的線程數自然而然的就少了下來,證明需要受理的業務不那么頻繁

所以我們可以發現當請求過多,過於頻繁的時候使用可擴展的線程池newCachedThreadPool將會創建更多的線程。

線程池的源碼

首先點進newFixedThreadPool()的源碼可以看到:

 接下來點進去newSingleThreadExecutor()的源碼可以看到:

 接下來點進去newCachedThreadPool()的源碼可以看到:

綜上所述,返回的實際上只是一個ThreadPoolExecutor(可以看看繼承圖),利用構造器傳入的不同的參數而已,而且我們也能發現底層是阻塞隊列。
同時說明我們也可以通過ThreadPoolExecutor`來創建線程池,Executors只是一個創建線程池的工具類,實際上返回的還是ThreadPoolExecutor。

ThreadPoolExecutor的七大參數

接着我們繼續點進ThreadPoolExecutor

 接着再點進這this,我們可以看到它有七個參數,:corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler

 下圖是這七大參數的解釋:

線程池的底層工作原理圖

接下來結合下圖理解理解上述的7大參數:
首先看看線程池的底層工作原理圖:

 

看上圖以及參數解析對照我們可以知道maximumPool包含corePool,maximumPool表示最多能放的線程數,而corePool表示的就是線程的常駐數,可以理解為銀行的有最多有5個受理窗口,但是常用的卻只有2個。

而候客區就相當於我們的阻塞隊列(BlockingQueue),那當我們的阻塞隊列滿了之后,handle拒絕策略出來了,相當於銀行門口立了塊牌子,上面寫着不辦理后面的業務了!

然后當客戶都辦理的差不多了,此時多出來(在corePool的基礎上擴容的窗口)的窗口在經過keepAliveTime的時間后就關閉了,重新恢復到corePool個受理窗口。

總結一下線程池的工作流程:

首先線程池接收到任務,先判斷核心線程數是否滿了,如果corepool沒有滿接客則核心(常駐)線程處理。

常駐線程滿了就放到阻塞隊列,如果阻塞隊列沒滿,這些任務放在阻塞隊列。

如果阻塞隊列也滿了,就擴容線程數到最大線程數。

如果最大線程數也滿了,就是我們的拒絕策略。

這就是線程池四大步驟。 接客、放入隊列,擴容線程,拒絕策略!

也可以看下圖流程解釋:太妙了!!

 

實際開發當中如何合適的使用線程池

 為什么不建議使用Executors工具類去創建線程池?

舉個例子,回到之前講的 newSingleThreadExecutor(); ;以及Executors.newCachedThreadPool( );創建的線程池,看看源碼,

正如源碼中看到的那樣:

 如果用Executors去創建,默認的Integer.MAX_VALUE的大小是21億............極大的消耗內存,線程池永遠不會慢,內存會被你壓爆

又如下面源碼:

     //這是Single的
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    }
    //點進去LinkedBlockQueue
        public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

   //這是Cahed的
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

對於newSingleThreadExecutor()而言,LinkedBlockQueue的長度是Integer.MAX_VALUE,
對於newCachedThreadPool()而言,maximumPool的值竟然為Integer.MAX_VALUE!!
兩者均會導致OOM異常!

自定義線程池

public class MyThreadPoolDemo {
    public static void main(String[] args) {
        //自定義線程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), //不寫的話默認也是Integer.MAX_VALUE
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());//默認的拒絕策略

        try {
            //模擬10個用戶辦理業務,但是只有5個受理窗口
            for (int i = 0; i < 9; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown(); //關閉線程池
        }
    }

threadPool 是我們自定義的線程池,連接過上面的參數的應該都知道。

該線程池最大支持的並發量就應該是maximumPool+Queue的大小,即5+3=8,而超過了大小之后就會報錯:java.util.concurrent.RejectedExecutionException 拒絕執行異常

 線程池的四大拒絕策略

接下來我們看看線程池的四大拒絕策略,上述是JDK默認的拒絕策略:

接下來看看另外三種策略的運行結果

將上述代碼的拒絕策略改成第二種new ThreadPoolExecutor.CallerRunsPolicy(),回退到原始調用者,這里之main線程

 

 第三種new ThreadPoolExecutor.DiscardOldestPolicy():不報錯。

第四種new ThreadPoolExecutor.DiscardPolicy(): 同樣不報錯。

以上策略均繼承自RejectedExecutionHandler接口。

怎么設置maximumPoolSize合理

最后提一句怎么設置maximumPoolSize合理,

了解:IO密集型,CPU密集型:(調優)

1、CPU 密集型,一般設置為CPU核數加1,可以保持CPu的效率最高!

System.out.println(Runtime.getRuntime().availableProcessors()); //獲取CPU的核數,8核

2、IO 密集型, 判斷你程序中十分耗IO的線程

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM