java並發編程:Executor、Executors、ExecutorService


1.Executor和ExecutorService

Executor:一個接口,其定義了一個接收Runnable對象的方法executor,其方法簽名為executor(Runnable command),該方法接收一個Runable實例,它用來執行一個任務,任務即一個實現了Runnable接口的類,一般來說,Runnable任務開辟在新線程中的使用方法為:new Thread(new RunnableTask())).start(),但在Executor中,可以使用Executor而不用顯示地創建線程:executor.execute(new RunnableTask()); // 異步執行

ExecutorService:是一個比Executor使用更廣泛的子類接口,其提供了生命周期管理的方法,返回 Future 對象,以及可跟蹤一個或多個異步任務執行狀況返回Future的方法;可以調用ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,調用該方法后,將導致ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢后將會關閉ExecutorService。因此我們一般用該接口來實現和管理多線程。

通過 ExecutorService.submit() 方法返回的 Future 對象,可以調用isDone()方法查詢Future是否已經完成。當任務完成時,它具有一個結果,你可以調用get()方法來獲取該結果。你也可以不用isDone()進行檢查就直接調用get()獲取結果,在這種情況下,get()將阻塞,直至結果准備就緒,還可以取消任務的執行。Future 提供了 cancel()方法用來取消執行 pending 中的任務。ExecutorService 部分代碼如下:

public interface ExecutorService extends Executor {
    void shutdown();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}

ExecutorService常用的幾個方法

  • execute方法:方法接收一個Runnable實例,並且異步的執行。
  • submit(Runnable)方法:返回一個Future對象,通過返回的Future對象,我們可以檢查提交的任務是否執行完畢。
  • submit(Callable):與submit(Callable)類似,也會返回一個Future對象,但是除此之外,submit(Callable)接收的是一個Callable的實現,Callable接口中的call()方法有一個返回值,可以返回任務的執行結果,而Runnable接口中的run()方法是void的,沒有返回值。
  • invokeAny(...):方法接收的是一個Callable的集合,執行這個方法不會返回Future,但是會返回所有Callable任務中其中一個任務的執行結果。這個方法無法保證返回的是哪個任務的執行結果。
  • invokeAll(...):與 invokeAny(...)類似也是接收一個Callable集合,但是前者執行之后會返回一個Future的List,其中對應着每個Callable任務執行后的Future對象。
  • shutdown():我們使用完成ExecutorService之后應該關閉它,否則它里面的線程會一直處於運行狀態。

2.Executors類: 主要用於提供線程池相關的操作

Executors類,提供了一系列工廠方法用於創建線程池,返回的線程池都實現了ExecutorService接口。

  1. public static ExecutorService newFiexedThreadPool(int Threads) 創建固定數目線程的線程池。
  2. public static ExecutorService newCachedThreadPool():創建一個可緩存的線程池,調用execute 將重用以前構造的線程(如果線程可用)。如果沒有可用的線程,則創建一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鍾未被使用的線程。
  3. public static ExecutorService newSingleThreadExecutor():創建一個單線程化的Executor。
  4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize):創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。

3.Executor VS  ExecutorService VS Executors

正如上面所說,這三者均是 Executor 框架中的一部分。Java 開發者很有必要學習和理解他們,以便更高效的使用 Java 提供的不同類型的線程池。總結一下這三者間的區別,以便大家更好的理解:

  • Executor 和 ExecutorService 這兩個接口主要的區別是:ExecutorService 接口繼承了 Executor 接口,是 Executor 的子接口
  • Executor 和 ExecutorService 第二個區別是:Executor 接口定義了 execute()方法用來接收一個Runnable接口的對象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的對象。
  • Executor 和 ExecutorService 接口第三個區別是 Executor 中的 execute() 方法不返回任何結果,而 ExecutorService 中的 submit()方法可以通過一個 Future 對象返回運算結果。
  • Executor 和 ExecutorService 接口第四個區別是除了允許客戶端提交一個任務,ExecutorService 還提供用來控制線程池的方法。比如:調用 shutDown() 方法終止線程池。可以通過 《Java Concurrency in Practice》 一書了解更多關於關閉線程池和如何處理 pending 的任務的知識。
  • Executors 類提供工廠方法用來創建不同類型的線程池。比如: newSingleThreadExecutor() 創建一個只有一個線程的線程池,newFixedThreadPool(int numOfThreads)來創建固定線程數的線程池,newCachedThreadPool()可以根據需要創建新的線程,但如果已有線程是空閑的會重用已有線程。

下面給出一個Executor執行Callable任務的示例代碼:

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   
  
public class CallableDemo{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
        List<Future<String>> resultList = new ArrayList<Future<String>>();   
  
        //創建10個任務並執行   
        for (int i = 0; i < 10; i++){   
            //使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中   
            Future<String> future = executorService.submit(new TaskWithResult(i));   
            //將任務執行結果存儲到List中   
            resultList.add(future);   
        }   
  
        //遍歷任務的結果   
        for (Future<String> fs : resultList){   
                try{   
                    while(!fs.isDone);//Future返回如果沒有完成,則一直循環等待,直到Future返回完成  
                    System.out.println(fs.get());     //打印各個線程(任務)執行的結果   
                }catch(InterruptedException e){   
                    e.printStackTrace();   
                }catch(ExecutionException e){   
                    e.printStackTrace();   
                }finally{   
                    //啟動一次順序關閉,執行以前提交的任務,但不接受新任務  
                    executorService.shutdown();   
                }   
        }   
    }   
}   
  
  
class TaskWithResult implements Callable<String>{   
    private int id;   
  
    public TaskWithResult(int id){   
        this.id = id;   
    }   
  
    /**  
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
     * 則該方法自動在一個線程上執行 
     */   
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用!!!    " + Thread.currentThread().getName());   
        //該返回結果將被Future的get方法得到  
        return "call()方法被自動調用,任務返回的結果是:" + id + "    " + Thread.currentThread().getName();   
    }   
}

4.自定義線程池

自定義線程池,可以用ThreadPoolExecutor類創建,它有多個構造方法來創建線程池,用該類很容易實現自定義的線程池:

import java.util.concurrent.ArrayBlockingQueue;   
import java.util.concurrent.BlockingQueue;   
import java.util.concurrent.ThreadPoolExecutor;   
import java.util.concurrent.TimeUnit;   
  
public class ThreadPoolTest{   
    public static void main(String[] args){   
        //創建等待隊列   
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
        //創建線程池,池中保存的線程數為3,允許的最大線程數為5  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
        //創建七個任務   
        Runnable t1 = new MyThread();   
        Runnable t2 = new MyThread();   
        Runnable t3 = new MyThread();   
        Runnable t4 = new MyThread();   
        Runnable t5 = new MyThread();   
        Runnable t6 = new MyThread();   
        Runnable t7 = new MyThread();   
        //每個任務會在一個線程上執行  
        pool.execute(t1);   
        pool.execute(t2);   
        pool.execute(t3);   
        pool.execute(t4);   
        pool.execute(t5);   
        pool.execute(t6);   
        pool.execute(t7);   
        //關閉線程池   
        pool.shutdown();   
    }   
}   
  
class MyThread implements Runnable{   
    @Override   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "正在執行。。。");   
        try{   
            Thread.sleep(100);   
        }catch(InterruptedException e){   
            e.printStackTrace();   
        }   
    }   
}

運行結果如下:

從結果中可以看出,七個任務是在線程池的三個線程上執行的。

ThreadPoolExecuror類的構造方法中各個參數的含義

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:線程池中所保存的核心線程數,包括空閑線程。

maximumPoolSize:池中允許的最大線程數。

keepAliveTime:線程池中的空閑線程所能持續的最長時間。

unit:持續時間的單位。

workQueue:任務執行前保存任務的隊列,僅保存由execute方法提交的Runnable任務。

excute方法將一個Runnable任務添加到線程池中的順序

  1. 如果線程池中的線程數量少於corePoolSize,即使線程池中有空閑線程,也會創建一個新的線程來執行新添加的任務;
  2. 如果線程池中的線程數量大於等於corePoolSize,但緩沖隊列workQueue未滿,則將新添加的任務放到workQueue中,按照FIFO的原則依次等待執行(線程池中有線程空閑出來后依次將緩沖隊列中的任務交付給空閑的線程執行);
  3. 如果線程池中的線程數量大於等於corePoolSize,且緩沖隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的線程來處理被添加的任務;
  4. 如果線程池中的線程數量等於maximumPoolSize,且緩沖隊列workQueue已滿,這種情況下默認就會拋出java.util.concurrent.RejectedExecutionException異常;除此之外還有其它幾種處理方式(該構造方法調用了含有5個參數的構造方法,並將最后一個構造方法為RejectedExecutionHandler類型,它在處理線程溢出時有4種方式,這里不再細說,要了解的,自己可以閱讀下源碼)。

總結起來,也即是說,當有新的任務要處理時,先看線程池中的線程數量是否大於corePoolSize,再看緩沖隊列workQueue是否滿,最后看線程池中的線程數量是否大於maximumPoolSize。

另外,當線程池中的線程數量大於corePoolSize時,如果里面有線程的空閑時間超過了keepAliveTime,就將其移除線程池,這樣,可以動態地調整線程池中線程的數量。

排隊的策略:

  1. 直接提交。緩沖隊列采用 SynchronousQueue,它將任務直接交給線程處理而不保持它們。如果不存在可用於立即運行任務的線程(即線程池中的線程都在工作),則試圖把任務加入緩沖隊列將會失敗,因此會構造一個新的線程來處理新添加的任務,並將其加入到線程池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務。newCachedThreadPool采用的便是這種策略。
  2. 無界隊列。使用無界隊列(典型的便是采用預定義容量的 LinkedBlockingQueue,理論上是該緩沖隊列可以對無限多的任務排隊)將導致在所有 corePoolSize 線程都工作的情況下將新任務加入到緩沖隊列中。這樣,創建的線程就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列。newFixedThreadPool采用的便是這種策略。
  3. 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(一般緩沖隊列使用ArrayBlockingQueue,並制定隊列的最大長度)有助於防止資源耗盡,但是可能較難調整和控制,隊列大小和最大池大小需要相互折衷,需要設定合理的參數。

5.比較Executor和new Thread()

new Thread的弊端如下:

  • 每次new Thread新建對象性能差。
  • 線程缺乏統一管理,可能無限制新建線程,相互之間競爭,及可能占用過多系統資源導致死機或oom。
  • 缺乏更多功能,如定時執行、定期執行、線程中斷。

相比new Thread,Java提供的四種線程池的好處在於:

  • 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
  • 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
  • 提供定時執行、定期執行、單線程、並發數控制等功能。


參考鏈接:

https://blog.csdn.net/weixin_40304387/article/details/80508236

https://www.cnblogs.com/love-Stefanie/p/6728228.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM