線程池(Java中有哪些方法獲取多線程)


線程池(Java中有哪些方法獲取多線程)

前言

獲取多線程的方法,我們都知道有三種,還有一種是實現Callable接口

  • 實現Runnable接口
  • 實現Callable接口
  • 實例化Thread類
  • 使用線程池獲取

Callable接口

Callable接口,是一種讓線程執行完成后,能夠返回結果的

在說到Callable接口的時候,我們不得不提到Runnable接口

/**
 * 實現Runnable接口
 */
class MyThread implements Runnable {

    @Override
    public void run() {

    }
}

我們知道,實現Runnable接口的時候,需要重寫run方法,也就是線程在啟動的時候,會自動調用的方法

同理,我們實現Callable接口,也需要實現call方法,但是這個時候我們還需要有返回值,這個Callable接口的應用場景一般就在於批處理業務,比如轉賬的時候,需要給一會返回結果的狀態碼回來,代表本次操作成功還是失敗

/**
 * Callable有返回值
 * 批量處理的時候,需要帶返回值的接口(例如支付失敗的時候,需要返回錯誤狀態)
 *
 */
class MyThread2 implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("come in Callable");
        return 1024;
    }
}

最后我們需要做的就是通過Thread線程, 將MyThread2實現Callable接口的類包裝起來

這里需要用到的是FutureTask類,他實現了Runnable接口,並且還需要傳遞一個實現Callable接口的類作為構造函數

// FutureTask:實現了Runnable接口,構造函數又需要傳入 Callable接口
// 這里通過了FutureTask接觸了Callable接口
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());

然后在用Thread進行實例化,傳入實現Runnabnle接口的FutureTask的類

Thread t1 = new Thread(futureTask, "aaa");
t1.start();

最后通過 futureTask.get() 獲取到返回值

// 輸出FutureTask的返回值
System.out.println("result FutureTask " + futureTask.get());

這就相當於原來我們的方式是main方法一條龍之心,后面在引入Callable后,對於執行比較久的線程,可以單獨新開一個線程進行執行,最后在進行匯總輸出

最后需要注意的是 要求獲得Callable線程的計算結果,如果沒有計算完成就要去強求,會導致阻塞,直到計算完成

也就是說 futureTask.get() 需要放在最后執行,這樣不會導致主線程阻塞

也可以使用下面算法,使用類似於自旋鎖的方式來進行判斷是否運行完畢

// 判斷futureTask是否計算完成
while(!futureTask.isDone()) {

}

注意

多個線程執行 一個FutureTask的時候,只會計算一次

FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());

// 開啟兩個線程計算futureTask
new Thread(futureTask, "AAA").start();
new Thread(futureTask, "BBB").start();

如果我們要兩個線程同時計算任務的話,那么需要這樣寫,需要定義兩個futureTask

FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());
FutureTask<Integer> futureTask2 = new FutureTask<>(new MyThread2());

// 開啟兩個線程計算futureTask
new Thread(futureTask, "AAA").start();

new Thread(futureTask2, "BBB").start();

ThreadPoolExecutor

為什么用線程池

線程池做的主要工作就是控制運行的線程的數量,處理過程中,將任務放入到隊列中,然后線程創建后,啟動這些任務,如果線程數量超過了最大數量的線程排隊等候,等其它線程執行完畢,再從隊列中取出任務來執行。

它的主要特點為:線程復用、控制最大並發數、管理線程

線程池中的任務是放入到阻塞隊列中的

線程池的好處

多核處理的好處是:省略的上下文的切換開銷

原來我們實例化對象的時候,是使用 new關鍵字進行創建,到了Spring后,我們學了IOC依賴注入,發現Spring幫我們將對象已經加載到了Spring容器中,只需要通過@Autowired注解,就能夠自動注入,從而使用

因此使用多線程有下列的好處

  • 降低資源消耗。通過重復利用已創建的線程,降低線程創建和銷毀造成的消耗
  • 提高響應速度。當任務到達時,任務可以不需要等到線程創建就立即執行
  • 提高線程的可管理性。線程是稀缺資源,如果無線創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控

架構說明

Java中線程池是通過Executor框架實現的,該框架中用到了Executor,Executors(代表工具類),ExecutorService,ThreadPoolExecutor這幾個類。

創建線程池

  • Executors.newFixedThreadPool(int i) :創建一個擁有 i 個線程的線程池
    • 執行長期的任務,性能好很多
    • 創建一個定長線程池,可控制線程數最大並發數,超出的線程會在隊列中等待
  • Executors.newSingleThreadExecutor:創建一個只有1個線程的 單線程池
    • 一個任務一個任務執行的場景
    • 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序執行
  • Executors.newCacheThreadPool(); 創建一個可擴容的線程池
    • 執行很多短期異步的小程序或者負載教輕的服務器
    • 創建一個可緩存線程池,如果線程長度超過處理需要,可靈活回收空閑線程,如無可回收,則新建新線程

具體使用,首先我們需要使用Executors工具類,進行創建線程池,這里創建了一個擁有5個線程的線程池

// 一池5個處理線程(用池化技術,一定要記得關閉)
ExecutorService threadPool = Executors.newFixedThreadPool(5);

// 創建一個只有一個線程的線程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 創建一個擁有N個線程的線程池,根據調度創建合適的線程
ExecutorService threadPool = Executors.newCacheThreadPool();

然后我們執行下面的的應用場景

模擬10個用戶來辦理業務,每個用戶就是一個來自外部請求線程

我們需要使用 threadPool.execute執行業務,execute需要傳入一個實現了Runnable接口的線程

threadPool.execute(() -> {
	System.out.println(Thread.currentThread().getName() + "\t 給用戶辦理業務");
});

然后我們使用完畢后關閉線程池

threadPool.shutdown();

完整代碼為:

/**
 * 第四種獲取 / 使用 Java多線程的方式,通過線程池
 */
public class MyThreadPoolDemo {
    public static void main(String[] args) {

        // Array  Arrays(輔助工具類)
        // Collection Collections(輔助工具類)
        // Executor Executors(輔助工具類)


        // 一池5個處理線程(用池化技術,一定要記得關閉)
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        // 模擬10個用戶來辦理業務,每個用戶就是一個來自外部請求線程
        try {

            // 循環十次,模擬業務辦理,讓5個線程處理這10個請求
            for (int i = 0; i < 10; i++) {
                final int tempInt = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 給用戶:" + tempInt + " 辦理業務");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }
}

最后結果:

pool-1-thread-1	 給用戶:0 辦理業務
pool-1-thread-5	 給用戶:4 辦理業務
pool-1-thread-1	 給用戶:5 辦理業務
pool-1-thread-4	 給用戶:3 辦理業務
pool-1-thread-2	 給用戶:1 辦理業務
pool-1-thread-3	 給用戶:2 辦理業務
pool-1-thread-2	 給用戶:9 辦理業務
pool-1-thread-4	 給用戶:8 辦理業務
pool-1-thread-1	 給用戶:7 辦理業務
pool-1-thread-5	 給用戶:6 辦理業務

我們能夠看到,一共有5個線程,在給10個用戶辦理業務

底層實現

我們通過查看源碼,點擊了Executors.newSingleThreadExecutor 和 Executors.newFixedThreadPool能夠發現底層都是使用了ThreadPoolExecutor

我們可以看到線程池的內部,還使用到了LinkedBlockingQueue 鏈表阻塞隊列

同時在查看Executors.newCacheThreadPool 看到底層用的是 SynchronousBlockingQueue阻塞隊列

最后查看一下,完整的三個創建線程的方法

線程池的重要參數

線程池在創建的時候,一共有7大參數

  • corePoolSize:核心線程數,線程池中的常駐核心線程數
    • 在創建線程池后,當有請求任務來之后,就會安排池中的線程去執行請求任務,近似理解為今日當值線程
    • 當線程池中的線程數目達到corePoolSize后,就會把到達的隊列放到緩存隊列中
  • maximumPoolSize:線程池能夠容納同時執行的最大線程數,此值必須大於等於1、
    • 相當有擴容后的線程數,這個線程池能容納的最多線程數
  • keepAliveTime:多余的空閑線程存活時間
    • 當線程池數量超過corePoolSize時,當空閑時間達到keepAliveTime值時,多余的空閑線程會被銷毀,直到只剩下corePoolSize個線程為止
    • 默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用
  • unit:keepAliveTime的單位
  • workQueue:任務隊列,被提交的但未被執行的任務(類似於銀行里面的候客區)
    • LinkedBlockingQueue:鏈表阻塞隊列
    • SynchronousBlockingQueue:同步阻塞隊列
  • threadFactory:表示生成線程池中工作線程的線程工廠,用於創建線程池 一般用默認即可
  • handler:拒絕策略,表示當隊列滿了並且工作線程大於線程池的最大線程數(maximumPoolSize3)時,如何來拒絕請求執行的Runnable的策略

當營業窗口和阻塞隊列中都滿了時候,就需要設置拒絕策略

拒絕策略

以下所有拒絕策略都實現了RejectedExecutionHandler接口

  • AbortPolicy:默認,直接拋出RejectedExcutionException異常,阻止系統正常運行
  • CallerRunsPolicy:該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者
  • DiscardPolicy:直接丟棄任務,不予任何處理也不拋出異常,如果運行任務丟失,這是一種好方案
  • DiscardOldestPolicy:拋棄隊列中等待最久的任務,然后把當前任務加入隊列中嘗試再次提交當前任務

線程池底層工作原理

線程池運行架構圖

文字說明

  1. 在創建了線程池后,等待提交過來的任務請求

  2. 當調用execute()方法添加一個請求任務時,線程池會做出如下判斷

    1. 如果正在運行的線程池數量小於corePoolSize,那么馬上創建線程運行這個任務
    2. 如果正在運行的線程數量大於或等於corePoolSize,那么將這個任務放入隊列
    3. 如果這時候隊列滿了,並且正在運行的線程數量還小於maximumPoolSize,那么還是創建非核心線程like運行這個任務;
    4. 如果隊列滿了並且正在運行的線程數量大於或等於maximumPoolSize,那么線程池會啟動飽和拒絕策略來執行
  3. 當一個線程完成任務時,它會從隊列中取下一個任務來執行

  4. 當一個線程無事可做操作一定的時間(keepAliveTime)時,線程池會判斷:

    1. 如果當前運行的線程數大於corePoolSize,那么這個線程就被停掉
    2. 所以線程池的所有任務完成后,它會最終收縮到corePoolSize的大小

以顧客去銀行辦理業務為例,談談線程池的底層工作原理

  1. 最開始假設來了兩個顧客,因為corePoolSize為2,因此這兩個顧客直接能夠去窗口辦理
  2. 后面又來了三個顧客,因為corePool已經被顧客占用了,因此只有去候客區,也就是阻塞隊列中等待
  3. 后面的人又陸陸續續來了,候客區可能不夠用了,因此需要申請增加處理請求的窗口,這里的窗口指的是線程池中的線程數,以此來解決線程不夠用的問題
  4. 假設受理窗口已經達到最大數,並且請求數還是不斷遞增,此時候客區和線程池都已經滿了,為了防止大量請求沖垮線程池,已經需要開啟拒絕策略
  5. 臨時增加的線程會因為超過了最大存活時間,就會銷毀,最后從最大數削減到核心數

為什么不用默認創建的線程池?

線程池創建的方法有:固定數的,單一的,可變的,那么在實際開發中,應該使用哪個?

我們一個都不用,在生產環境中是使用自己自定義的

為什么不用Executors中JDK提供的?

根據阿里巴巴手冊:並發控制這章

  • 線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程
    • 使用線程池的好處是減少在創建和銷毀線程上所消耗的時間以及系統資源的開銷,解決資源不足的問題,如果不使用線程池,有可能造成系統創建大量同類線程而導致消耗完內存或者“過度切換”的問題
  • 線程池不允許使用Executors去創建,而是通過ThreadToolExecutors的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險
    • Executors返回的線程池對象弊端如下:
      • FixedThreadPool和SingleThreadPool:
        • 運行的請求隊列長度為:Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM
      • CacheThreadPool和ScheduledThreadPool
        • 運行的請求隊列長度為:Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM

手寫線程池

采用默認拒絕策略

從上面我們知道,因為默認的Executors創建的線程池,底層都是使用LinkBlockingQueue作為阻塞隊列的,而LinkBlockingQueue雖然是有界的,但是它的界限是 Integer.MAX_VALUE 大概有20多億,可以相當是無界的了,因此我們要使用ThreadPoolExecutor自己手動創建線程池,然后指定阻塞隊列的大小

下面我們創建了一個 核心線程數為2,最大線程數為5,並且阻塞隊列數為3的線程池

        // 手寫線程池
        final Integer corePoolSize = 2;
        final Integer maximumPoolSize = 5;
        final Long keepAliveTime = 1L;

        // 自定義線程池,只改變了LinkBlockingQueue的隊列大小
        ExecutorService executorService = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

然后使用for循環,模擬10個用戶來進行請求

      // 模擬10個用戶來辦理業務,每個用戶就是一個來自外部請求線程
        try {

            // 循環十次,模擬業務辦理,讓5個線程處理這10個請求
            for (int i = 0; i < 10; i++) {
                final int tempInt = i;
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 給用戶:" + tempInt + " 辦理業務");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }

但是在用戶執行到第九個的時候,觸發了異常,程序中斷

pool-1-thread-1	 給用戶:0 辦理業務
pool-1-thread-4	 給用戶:6 辦理業務
pool-1-thread-3	 給用戶:5 辦理業務
pool-1-thread-2	 給用戶:1 辦理業務
pool-1-thread-2	 給用戶:4 辦理業務
pool-1-thread-5	 給用戶:7 辦理業務
pool-1-thread-4	 給用戶:2 辦理業務
pool-1-thread-3	 給用戶:3 辦理業務
java.util.concurrent.RejectedExecutionException: Task com.moxi.interview.study.thread.MyThreadPoolDemo$$Lambda$1/1747585824@4dd8dc3 rejected from java.util.concurrent.ThreadPoolExecutor@6d03e736[Running, pool size = 5, active threads = 3, queued tasks = 0, completed tasks = 5]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at com.moxi.interview.study.thread.MyThreadPoolDemo.main(MyThreadPoolDemo.java:34)

這是因為觸發了拒絕策略,而我們設置的拒絕策略是默認的AbortPolicy,也就是拋異常的

觸發條件是,請求的線程大於 阻塞隊列大小 + 最大線程數 = 8 的時候,也就是說第9個線程來獲取線程池中的線程時,就會拋出異常從而報錯退出。

采用CallerRunsPolicy拒絕策略

當我們更好其它的拒絕策略時,采用CallerRunsPolicy拒絕策略,也稱為回退策略,就是把任務丟回原來的請求開啟線程着,我們看運行結果

pool-1-thread-1	 給用戶:0 辦理業務
pool-1-thread-5	 給用戶:7 辦理業務
pool-1-thread-4	 給用戶:6 辦理業務
main	 給用戶:8 辦理業務
pool-1-thread-3	 給用戶:5 辦理業務
pool-1-thread-2	 給用戶:1 辦理業務
pool-1-thread-3	 給用戶:9 辦理業務
pool-1-thread-4	 給用戶:4 辦理業務
pool-1-thread-5	 給用戶:3 辦理業務
pool-1-thread-1	 給用戶:2 辦理業務

我們發現,輸出的結果里面出現了main線程,因為線程池出發了拒絕策略,把任務回退到main線程,然后main線程對任務進行處理

采用 DiscardPolicy 拒絕策略

pool-1-thread-1	 給用戶:0 辦理業務
pool-1-thread-3	 給用戶:5 辦理業務
pool-1-thread-1	 給用戶:2 辦理業務
pool-1-thread-2	 給用戶:1 辦理業務
pool-1-thread-1	 給用戶:4 辦理業務
pool-1-thread-5	 給用戶:7 辦理業務
pool-1-thread-4	 給用戶:6 辦理業務
pool-1-thread-3	 給用戶:3 辦理業務

采用DiscardPolicy拒絕策略會,線程池會自動把后面的任務都直接丟棄,也不報異常,當任務無關緊要的時候,可以采用這個方式

采用DiscardOldestPolicy拒絕策略

pool-1-thread-1	 給用戶:0 辦理業務
pool-1-thread-4	 給用戶:6 辦理業務
pool-1-thread-1	 給用戶:4 辦理業務
pool-1-thread-3	 給用戶:5 辦理業務
pool-1-thread-2	 給用戶:1 辦理業務
pool-1-thread-1	 給用戶:9 辦理業務
pool-1-thread-4	 給用戶:8 辦理業務
pool-1-thread-5	 給用戶:7 辦理業務

這個策略和剛剛差不多,會把最久的隊列中的任務替換掉

線程池的合理參數

生產環境中如何配置 corePoolSize 和 maximumPoolSize

這個是根據具體業務來配置的,分為CPU密集型和IO密集型

  • CPU密集型

CPU密集的意思是該任務需要大量的運算,而沒有阻塞,CPU一直全速運行

CPU密集任務只有在真正的多核CPU上才可能得到加速(通過多線程)

而在單核CPU上,無論你開幾個模擬的多線程該任務都不可能得到加速,因為CPU總的運算能力就那些

CPU密集型任務配置盡可能少的線程數量:

一般公式:CPU核數 + 1個線程數

  • IO密集型

由於IO密集型任務線程並不是一直在執行任務,則可能多的線程,如 CPU核數 * 2

IO密集型,即該任務需要大量的IO操作,即大量的阻塞

在單線程上運行IO密集型的任務會導致浪費大量的CPU運算能力花費在等待上

所以IO密集型任務中使用多線程可以大大的加速程序的運行,即使在單核CPU上,這種加速主要就是利用了被浪費掉的阻塞時間。

IO密集時,大部分線程都被阻塞,故需要多配置線程數:

參考公式:CPU核數 / (1 - 阻塞系數) 阻塞系數在0.8 ~ 0.9左右

例如:8核CPU:8/ (1 - 0.9) = 80個線程數


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM