java 線程池 使用實例


在前面的文章中,我們使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:

如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。

那么有沒有一種辦法使得線程可以復用,就是執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務?

在Java中可以通過線程池來達到這樣的效果。

 

1 線程池做什么

 

網絡請求通常有兩種形式:

第一種,請求不是很頻繁,而且每次連接后會保持相當一段時間來讀數據或者寫數據,最后斷開,如文件下載,網絡流媒體等。

另一種形式是請求頻繁,但是連接上以后讀/寫很少量的數據就斷開連接。考慮到服務的並發問題,如果每個請求來到以后服務都為它啟動一個線程,那么這對服務的資源可能會造成很大的浪費,特別是第二種情況。

因為通常情況下,創建線程是需要一定的耗時的,設這個時間為T1,而連接后讀/寫服務的時間為T2,當T1>>T2時,我們就應當考慮一種策略或者機制來控制,使得服務對於第二種請求方式也能在較低的功耗下完成。

通常,我們可以用線程池來解決這個問題,首先,在服務啟動的時候,我們可以啟動好幾個線程,並用一個容器(如線程池)來管理這些線程。

當請求到來時,可以從池中取一個線程出來,執行任務(通常是對請求的響應),當任務結束后,再將這個線程放入池中備用;

如果請求到來而池中沒有空閑的線程,該請求需要排隊等候。最后,當服務關閉時銷毀該池即可。

 

多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。
假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。
如果:T1 + T3 遠大於 T2,則可以采用線程池,以提高服務器性

線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的

它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:

假設一個服務器一天要處理50000個請求,並且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,

而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小於50000。

所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。

 

合理利用線程池能夠帶來三個好處:

第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

        但是要做到合理的利用線程池,必須對其原理了如指掌。

 

2 線程池的繼承架構

程序啟動一個新線程成本是比較高的,因為它涉及到要與操作系統進行交互。而使用線程池可以很好的提高性能,尤其是當程序中要創建大量生存期很短的線程時,更應該考慮使用線程池。

線程池里的每一個線程代碼結束后,並不會死亡,而是再次回到線程池中成為空閑狀態,等待下一個對象來使用。

在JDK5之前,我們必須手動實現自己的線程池,從JDK5開始,Java內置支持線程池

 

Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor並不是一個線程池,而只是一個執行線程的工具。

真正的線程池接口是ExecutorService。下面這張圖完整描述了線程池的類體系結構。

Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

 

然后ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

然后ThreadPoolExecutor繼承了類AbstractExecutorService。

 

標記一下比較重要的類:

ExecutorService:

真正的線程池接口。

ScheduledExecutorService

能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。

ThreadPoolExecutor

ExecutorService的默認實現。

ScheduledThreadPoolExecutor

繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現。

 

要配置一個線程池是比較復雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,

因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池。

 

newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。

                                           如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。

                                   線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,

                                       當任務數增加時,此線程池又可以智能的添加新線程來處理任務。

                                       此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。

newScheduledThreadPool:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

newSingleThreadExecutor:創建一個單線程的線程池。此線程池支持定時以及周期性執行任務的需求。

 

這些方法的返回值是ExecutorService對象,該對象表示一個線程池,可以執行Runnable對象或者Callable對象代表的線程。

它提供了如下方法來提交一個任務:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

 Callable 與 Runable的相關內容參見:

03 創建線程的第3式 
02 如何創建線程 線程並發與synchornized
 
3 使用線程池步驟及案例

線程池的好處:線程池里的每一個線程代碼結束后,並不會死亡,而是再次回到線程池中成為空閑狀態,等待下一個對象來使用。

如何實現線程的代碼呢?

A:創建一個線程池對象,控制要創建幾個線程對象。

    public static ExecutorService newFixedThreadPool(int nThreads)

B:這種線程池的線程可以執行:

  可以執行Runnable對象或者Callable對象代表的線程

  做一個類實現Runnable接口。

C:調用如下方法即可

    Future<?> submit(Runnable task)

    <T> Future<T> submit(Callable<T> task)

D:我就要結束,可以嗎? 可以。

復制代碼
 1 package com.jt.thread.demo05;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 
 6 class MyRunnable implements Runnable {
 7     @Override
 8     public void run() {
 9         for (int x = 0; x < 100; x++) {
10             System.out.println(Thread.currentThread().getName() + ":" + x);
11        }
12     }
13 }
14 
15 public class ExecutorServiceDemo {
16     public static void main(String[] args) {
17      // 創建一個線程池對象,控制要創建幾個線程對象。
18      // public static ExecutorService newFixedThreadPool(int nThreads)
19      ExecutorService pool = Executors.newFixedThreadPool(2);
20 
21      // 可以執行Runnable對象或者Callable對象代表的線程
22      pool.submit(new MyRunnable());
23      pool.submit(new MyRunnable());
24 
25     //結束線程池
26     pool.shutdown();
27    }
28 } 
復制代碼

運行結果:

pool-1-thread-1:0

pool-1-thread-1:1

pool-1-thread-1:2

pool-1-thread-2:0

pool-1-thread-2:1

pool-1-thread-2:2

pool-1-thread-2:3

。。。

 

說明:

 

(1 newFixedThreadPool

是固定大小的線程池 有結果可見 我們指定2 在運行時就只有2個線程工作

若其中有一個線程異常  會有新的線程替代他

 

(2 shutdown方法有2個重載:

void shutdown() 啟動一次順序關閉,等待執行以前提交的任務完成,但不接受新任務。

List<Runnable> shutdownNow() 試圖立即停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。

 

(3 submit 與 execute

3.1 submit是ExecutorService中的方法 用以提交一個任務

他的返回值是future對象  可以獲取執行結果

<T> Future<T> submit(Callable<T> task) 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。

Future<?> submit(Runnable task) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。

<T> Future<T> submit(Runnable task, T result) 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。

 

3.2 execute是Executor接口的方法

他雖然也可以像submit那樣讓一個任務執行  但並不能有返回值

 

void execute(Runnable command)

在未來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由 Executor 實現決定。

 

(4 Future

Future 表示異步計算的結果。

它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。

計算完成后只能使用 get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。

取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。

如果為了可取消性而使用 Future 但又不提供可用的結果,則可以聲明 Future<?> 形式類型、並返回 null 作為底層任務的結果。

 

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。

必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。 

也就是說Future提供了三種功能:

--判斷任務是否完成;

--能夠中斷任務;

--能夠獲取任務執行結果。

 

boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。

V get() 如有必要,等待計算完成,然后獲取其結果。

V get(long timeout, TimeUnit unit) 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。

boolean isCancelled() 如果在任務正常完成前將其取消,則返回 true。

boolean isDone() 如果任務已完成,則返回 true。

 

4 線程池簡單使用案例2

java.util.concurrent.Executors類的API提供大量創建連接池的靜態方法:

 

復制代碼
 1 import java.util.concurrent.Executors;
 2 import java.util.concurrent.ExecutorService;
 3 public class JavaThreadPool {
 4   public static void main(String[] args) {
 5     // 創建一個可重用固定線程數的線程池
 6     ExecutorService pool = Executors.newFixedThreadPool(2);
 7     // 創建實現了Runnable接口對象,Thread對象當然也實現了Runnable接口
 8     Thread t1 = new MyThread();
 9     Thread t2 = new MyThread();
10     Thread t3 = new MyThread();
11     Thread t4 = new MyThread();
12     Thread t5 = new MyThread();
13     // 將線程放入池中進行執行
14     pool.execute(t1);
15     pool.execute(t2);
16     pool.execute(t3);
17     pool.execute(t4);
18     pool.execute(t5);
19     // 關閉線程池
20     pool.shutdown();
21    }
22 }
23 class MyThread extends Thread {
24    @Override
25    public void run() {
26     System.out.println(Thread.currentThread().getName() + "正在執行… …");
27    }
28 }
復制代碼

運行效果示例:

 

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-2正在執行… …

pool-1-thread-1正在執行… …

 

 

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-1正在執行… …

pool-1-thread-2正在執行… …

 

可見線程池中有2個線程在工作,可見 newFixedThreadPool 是固定大小的線程池

 

5 單任務線程池:

//創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。

ExecutorService pool = Executors.newSingleThreadExecutor(); 

案例:

復制代碼
 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 public class SingleThreadPollDemo {
 5 
 6 public static void main(String[] args) {
 7    // 創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
 8    ExecutorService pool = Executors.newSingleThreadExecutor();
 9 
10    Runnable task1 = new SingelTask();
11    Runnable task2 = new SingelTask();
12    Runnable task3 = new SingelTask();
13 
14    pool.execute(task3);
15    pool.execute(task2);
16    pool.execute(task1);
17 
18    // 等待已提交的任務全部結束 不再接受新的任務
19    pool.shutdown();
20   }
21 }
22 
23 class SingelTask implements Runnable{
24 
25 @Override
26 public void run() {
27   System.out.println(Thread.currentThread().getName() + "正在執行… …");
28   try {
29    Thread.sleep(3000);
30   } catch (InterruptedException e) {
31    e.printStackTrace();
32   }
33   System.out.println(Thread.currentThread().getName() + "執行完畢");
34  }
35 
36 }
復制代碼

運行結果:

 

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

pool-1-thread-1正在執行… …

pool-1-thread-1執行完畢

 

可見線程池中只有一個線程在執行任務

6 小結:

對於以上兩種連接池,大小都是固定的,當要加入的池的線程(或者任務)超過池最大尺寸時候,則入此線程池需要排隊等待。

一旦池中有線程完畢,則排隊等待的某個線程會入池執行。

 

其他線程池示例:

 

固定大小線程池

import java.util.concurrent.Executors; 

import java.util.concurrent.ExecutorService;

ExecutorService pool = Executors.newFixedThreadPool(2);

pool.execute(t1);

pool.shutdown();

 

單任務線程池

ExecutorService pool = Executors.newSingleThreadExecutor();

 

可變尺寸線程池

ExecutorService pool = Executors.newCachedThreadPool();

 

延遲連接池

import java.util.concurrent.Executors; 

import java.util.concurrent.ScheduledExecutorService; 

import java.util.concurrent.TimeUnit;

ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

pool.schedule(t4, 10, TimeUnit.MILLISECONDS);

 

單任務延遲連接池

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();

 

7 使用示例

復制代碼
 1 public class Test {
 2      public static void main(String[] args) {   
 3          ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
 4                  new ArrayBlockingQueue<Runnable>(5));
 5           
 6          for(int i=0;i<15;i++){
 7              MyTask myTask = new MyTask(i);
 8              executor.execute(myTask);
 9              System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
10              executor.getQueue().size()+",已執行完別的任務數目:"+executor.getCompletedTaskCount());
11          }
12          executor.shutdown();
13      }
14 }
15  
16  
17 class MyTask implements Runnable {
18     private int taskNum;
19      
20     public MyTask(int num) {
21         this.taskNum = num;
22     }
23      
24     @Override
25     public void run() {
26         System.out.println("正在執行task "+taskNum);
27         try {
28             Thread.currentThread().sleep(4000);
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32         System.out.println("task "+taskNum+"執行完畢");
33     }
34 }
復制代碼

執行結果:

  View Code

從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。

如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。

不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:

1 Executors.newCachedThreadPool();        //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
2 Executors.newSingleThreadExecutor();    //創建容量為1的緩沖池
3 Executors.newFixedThreadPool(int);      //創建固定容量大小的緩沖池

下面是這三個靜態方法的具體實現:

復制代碼
 1 public static ExecutorService newFixedThreadPool(int nThreads) {
 2     return new ThreadPoolExecutor(nThreads, nThreads,
 3                                   0L, TimeUnit.MILLISECONDS,
 4                                   new LinkedBlockingQueue<Runnable>());
 5 }
 6 public static ExecutorService newSingleThreadExecutor() {
 7     return new FinalizableDelegatedExecutorService
 8         (new ThreadPoolExecutor(1, 1,
 9                                 0L, TimeUnit.MILLISECONDS,
10                                 new LinkedBlockingQueue<Runnable>()));
11 }
12 public static ExecutorService newCachedThreadPool() {
13     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
14                                   60L, TimeUnit.SECONDS,
15                                   new SynchronousQueue<Runnable>());
16 }
復制代碼

     從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

  newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。

  實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

 

from: https://www.cnblogs.com/wihainan/p/4765862.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM