理解線程池和Executor框架


一、什么是線程池:

線程池是指在初始化一個多線程應用程序過程中創建一個線程集合,然后在需要執行新的任務時重用這些線程而不是新建一個線程。線程池中線程的數量通常完全取決於可用內存數量和應用程序的需求。然而,增加可用線程數量是可能的。線程池中的每個線程都有被分配一個任務,一旦任務已經完成了,線程回到池子中並等待下一次分配任務。

 

二、為什么要使用線程池:

因為創建和銷毀線程都是需要時間的,特別是需要創建大量線程的時候,時間和資源的消耗是不可忽略的,而合理的使用線程池中已經創建的線程,可以減少創建和銷毀線程而花費的時間和資源。

 

三、線程池的優點:

(1)降低資源消耗:通過線程的重用可以降低創建和銷毀線程花費的時間和資源;

(2)提高響應速度:任務到達時,因為利用線程池中已經創建好的線程,可以不用等待線程創建而直接執行任務;

(3)提高線程的可管理性:線程池允許我們開啟多個任務而不用為每個線程設置屬性(便於管理);線程池根據當前在系統中運行的進程來優化線程時間片(調優);線程池可以限制創建線程的數量,如果無限制的創建線程,不僅會消耗資源,還會降低系統的穩定性;

 

四、線程池的工作原理

 

 

 

 

五、在項目中線程池使用

1、線程池的創建和線程池的基本設置:(6個參數)

ThreadPoolExecutor tpe = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnalbleTaskQueue, handler);

corePoolSize線程池的大小。如果調用了prestartAllCoreThread()方法,那么線程池會提前創建並啟動所有基本線程。

maximumPoolSize線程池的大小。

keepAliveTime閑置時間空閑后,線程存活時間。如果任務多,任務周期短,可以調大keepAliveTime,提高線程利用率。

timeUnit閑置時間的時間單位。有天(DAYS)、小時(HOURS)、分(MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)、微秒(MICROSECONDS)、納秒(NANOSECONDS)

runnalbleTaskQueue阻塞隊列方4種)

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue

靜態工廠方法Executors.newFixedThreadPool(  )使用了LinkedBlockingQueue;

靜態工廠方法Executors.newCachedThreadPool(  )使用了SynchronousQueue;

handler飽和策論。(4種

線程池滿了的時候,任務無法得到處理,這時候需要飽和策略來處理無法完成的任務,飽和策略中有4種處理策略:

AbortPolicy:這是默認的策略,直接拋出異常;

CallerRunsPolicy:只是用調用者所在線程來運行任務;

DiscardOldestPolicy:丟棄隊列中最老的任務,並執行當前任務;

DiscardPolicy:不處理,直接把當前任務丟棄;

當然也可以自定義飽和處理策略,需要實現RejectedExecutionHandler接口,比如記錄日志或者持久化不能存儲的任務等。

2.向線程池提交任務

提交任務的兩種方法:execute()和submit()

(1)execute用於提交沒有返回值的任務,所以無法判斷任務是否被線程執行過。

package com.company;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Main {

    public static void main(String[] args) {
        //線程池的創建
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,10 ,
                100 ,MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));
        //提交線程池任務
        threadPoolExecutor.execute(new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("thread 1 execute work" );
            }
        }));
        //停止線程池
        threadPoolExecutor.shutdown();
    }
}

 

(2)submit():用於提交需要返回值的對象。

用於提交需要返回值的對象。返回一個future對象,可以通過future對象判斷任務是否被線程執行;可以通過future對象的get()方法獲取返回的值,get()方法會阻塞當前線程直到future對象被返回,也可以調用get(long timeout, TimeUnit unit)來實現由等待時間的獲取返回值,如果超時仍沒有返回值,那么立刻返回,這時候任務可能沒有執行完成。

package com.company;

import java.util.concurrent.*;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Main2 {
    public static void main(String[] args) throws ExecutionException {
        //線程池的創建
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
                100, MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
        //添加任務
        Future future= threadPoolExecutor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                String a ="return call";
                return a;
            }
        });
        try {
            System.out.println(future.get());//future.get()調用call()方法的返回結果
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
            threadPoolExecutor.shutdown();
        }


    }
}

3、關閉線程池

可以通過shutdown或者shutDownNow來關閉線程池。

原理:遍歷線程池中的線程,逐個調用線程的interrupt()方法來中斷線程,所以不響應中斷的線程可能永遠無法終止

(1)shutDown:把線程池的狀態設置為SHUTDOWN,然后中斷所有沒有正在執行任務的線程,而已經在執行任務的線程繼續執行直到任務執行完畢;

(2)shutDownNow:把當前線程池狀態設為STOP,嘗試停止所有的正在執行或者暫停的線程,並返回等待執行的任務的列表;

在調用了shutDown或者shutDownNow后,調用isShutDown()返回true;當所有任務都關閉后,調用isTerminaed()方法返回true。(注意關閉線程池和所有線程關閉是不同的

一、什么是Executor框架?

我們知道線程池就是線程的集合,線程池集中管理線程,以實現線程的重用,降低資源消耗,提高響應速度等。線程用於執行異步任務,單個的線程既是工作單元也是執行

機制,從JDK1.5開始,為了把工作單元與執行機制分離開,Executor框架誕生了,他是一個用於統一創建與運行的接口。Executor框架實現的就是線程池的功能。

 

二、Executor框架結構圖解

1、Executor框架包括3大部分:

(1)任務:也就是工作單元,包括被執行任務需要實現的接口:Runnable接口或者Callable接口;

(2)任務的執行:也就是把任務分派給多個線程的執行機制,包括Executor接口及繼承自Executor接口的ExecutorService接口。

(3)異步計算的結果:包括Future接口及實現了Future接口的FutureTask類。

三、Executor框架成員:

ThreadPoolExecutor實現類、ScheduledThreadPoolExecutor實現類、Future接口、Runnable和Callable接口、Executors工廠類

 

 

1、ThreadPoolExecutor:

(1)創建:

ThreadPoolExecutor tpe = new ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue);

(2)ThreadPoolExecutor的子類(3個):

ThreadPoolExecutor通過Executors工具類來創建ThreadPoolExecutor的子類FixedThreadPoolSingleThreadExecutorCachedThreadPool,這些子類繼承ThreadPoolExecutor,並且其中的一些參數已經被配置好

//FixedThreadPoll
ExecutorService ftp = Executors.newFixedThreadPool(int threadNums);
ExecutorService ftp = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);
//SingleThreadExecutor
ExecutorService ste = Executors.newSingleThreadExecutor();
ExecutorService ste = Executors.newSingleThradPool(ThreadFactory threadFactory);
//CachedThreadPool
ExecutorService ctp = Executors.newCachedThreadPool();
ExecutorService ctp = Executors.newCachedThreadPool(ThreadFactory threadFactory);

(3)子類詳解:

FixedThreadPool:

應用場景:FixedThreadPool是線程數量固定的線程池,適用於為了滿足資源管理的需求,而需要適當限制當前線程數量的情景,適用於負載比較重的服務器。

可以看出它的實現就是把線程池最大線程數量maxmumPoolSize和核心線程池的數量corePoolSize設置為相等,並且使用LinkedBlockingQueue作為阻塞隊列,那么首先可以知道線程池的線程數量最多就是nThread,只會在核心線程池階段創建,此外,因為LinkedBlockingQueue是無限的雙向隊列,因此當任務不能立刻執行時,都會添加到阻塞隊列中,因此可以得到FixedThreadPool的工作流程大致如下:

當前核心線程池總線程數量小於corePoolSize,那么創建線程並執行任務;
如果當前線程數量等於corePoolSize,那么把 任務添加到阻塞隊列中;
如果線程池中的線程執行完任務,那么獲取阻塞隊列中的任務並執行;


*注意:因為阻塞隊列是無限的雙向隊列,因此如果沒有調用shutDownNow()或者shutDown()方法,線程池是不會拒絕任務的,如果線程池中的線程一直被占有,FixedThreadPool是不會拒絕任務的。

因為使用的是LinkedBlockingQueue,因此maximumPoolSize,keepAliveTime都是無效的,因為阻塞隊列是無限的,因此線程數量肯定小於等於corePoolSize,因此keepAliveTime是無效的

 

SingleThreadExecutor

應用場景:SingleThreadExecutor是只有一個線程的線程池,常用於需要讓線程順序執行,並且在任意時間,只能有一個任務被執行,而不能有多個線程同時執行的場景。

因為阻塞隊列使用的是LinkedBlockingQueue,因此和FixedThreadPool一樣,maximumPoolSize,keepAliveTime都是無效的。corePoolSize為1,因此最多只能創建一個線程,SingleThreadPool的工作流程大概如下:

當前核心線程池總線程數量小於corePoolSize(1),那么創建線程並執行任務;
如果當前線程數量等於corePoolSize,那么把 任務添加到阻塞隊列中;
如果線程池中的線程執行完任務,那么獲取阻塞隊列中的任務並執行;


CachedThreadPool

應用場景:CachedThreadPool適用於執行很多短期異步任務的小程序,或者是負載較輕的服務器。

CachedThreadPool使用SynchronizedQueue作為阻塞隊列,SynchronizedQueue是不存儲元素的阻塞隊列,實現“一對一的交付”,也就是說,每次向隊列中put一個任務必須等有線程來take這個任務,否則就會一直阻塞該任務,如果一個線程要take一個任務就要一直阻塞知道有任務被put進阻塞隊列。

因為CachedThreadPool的maximumPoolSize為Integer.MUX_VALUE,因此CachedThreadPool是無界的線程池,也就是說可以一直不斷的創建線程。corePoolSize為0 ,因此在CachedThreadPool中直接通過阻塞隊列來進行任務的提交。

CachedThreadPool的工作流程大概如下:

首先執行SynchronizedQueue.offer(  )把任務提交給阻塞隊列,如果這時候正好在線程池中有空閑的線程執行SynchronizedQueue.poll( ),那么offer操作和poll操作配對,線程執行任務;
如果執行SynchronizedQueue.offer(  )把任務提交給阻塞隊列時maximumPoolSize=0.或者沒有空閑線程來執行SynchronizedQueue.poll( ),那么步驟1失敗,那么創建一個新線程來執行任務;
如果當前線程執行完任務則循環從阻塞隊列中獲取任務,如果當前隊列中沒有提交(offer)任務,那么線程等待keepAliveTime時間,在CacheThreadPool中為60秒,在keepAliveTime時間內如果有任務提交則獲取並執行任務,如果沒有則銷毀線程,因此最后如果一直沒有任務提交了,線程池中的線程數量最終為0。
*注意:因為maximumPoolSize=Integer.MAX_VALUE,因此可以不斷的創建新線程,這樣可能會CPU和內存資源耗盡。

 

2、ScheduledThreadPoolExecutor

可以利用Executors工廠類來創建兩種ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor.

 

3、Future接口/FutureTask實現類:

FutureTask的Future就源自於它的異步工作機制,如果我們在主線程中直接寫一個函數來執行任務,這是同步的任務,也就是說必須要等這個函數返回以后我們才能繼續做接下的事情,但是如果這個函數返回的結果對接下來的任務並沒有意義,那么我們等在這里是很浪費時間的,而FutureTask就提供了這么一個異步的返回結果的機制,當執行一個FutureTask的時候,我們可以接着做別的任務,在將來的某個時間,FutureTask任務完成后會返回FutureTask對象來包裝返回的結果,只要調用這個對象的get()方法即可獲取返回值。

(1) FutureTask的啟動:

  ① 由調用線程直接執行:在調用線程中執行futureTask.run()方法;

  ② 由調用線程直接執行:在調用線程中執行futureTask.run()方法;

(2) FutureTask執行完后結果的獲取  :futureTask.get( )

(3) futureTask被取消:futureTask.cancel( )

 


————————————————
版權聲明:本文為CSDN博主「tongdanping」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/tongdanping/article/details/79625109


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM