Executor框架(二)Executor 與 ExecutorService兩個基本接口


一、Executor 接口簡介

Executor接口是Executor框架的一個最基本的接口,Executor框架的大部分類都直接或間接地實現了此接口。

只有一個方法

void execute(Runnable command): 在未來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由 Executor 實現決定。

Executor的幾種實現原理介紹:

1、 Executor 接口並沒有嚴格地要求執行是異步的。在最簡單的情況下,執行程序可以在調用者的線程中立即運行已提交的任務:

 class DirectExecutor implements Executor {
     public void execute(Runnable r) {
         r.run();
     }
 }

2、 更常見的是,任務是在某個不是調用者線程的線程中執行的。以下執行程序將為每個任務生成一個新線程。

class ThreadPerTaskExecutor implements Executor {
     public void execute(Runnable r) {
         new Thread(r).start();
     }
 }

3、 許多 Executor 實現都對調度任務的方式和時間強加了某種限制。以下執行程序使任務提交與第二個執行程序保持連續,這說明了一個復合執行程序。

 class SerialExecutor implements Executor {
     private final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
     private  final Executor executor;
     Runnable active;

     SerialExecutor(Executor executor) {
         this.executor = executor;
     }

     public synchronized void execute(final Runnable r) {
         tasks.offer(new Runnable() {
             public void run() {
                 try {
                     r.run();
                 } finally {
                     scheduleNext();
                 }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }

 }

二、ExecutorService 接口簡介

  ExecutorService 是一個接口,提供了管理終止的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成Future 的方法。
ExecutorService 的實現:

  • 三個實現類:AbstractExecutorService(默認實現類) , ScheduledThreadPoolExecutor, ThreadPoolExecutor
  • Executors 提供了此接口的幾種常用實現的工廠方法。

方法摘要

1. 從Executor 接口中繼承了不跟蹤異步線程,沒有返回的 execute 方法:

void execute(Runnable command):
在未來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由 Executor 實現決定。

2. 擴展的跟蹤異步線程、返回Future 接口的實現類的方法:

Future<?> submit(Runnable task):
提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功 完成時將會返回 null。

**<T> Future<T> submit(Runnable task,T result): **
提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結果。

<T> Future<T> submit(Callable<T> task):
提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。該 Future 的 get 方法在成功完成時將會返回該任務的結果。如果想立即阻塞任務的等待,則可以使用 result = exec.submit(aCallable).get(); 形式的構造

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException:
執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表。返回列表的所有元素的 Future.isDone() 為 true。注意,可以正常地或通過拋出異常來終止已完成 任務。如果正在進行此操作時修改了給定的 collection,則此方法的結果是不確定的。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit) throws InterruptedException:
超時等待,同上。

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException:
與 invokeAll的區別是,任務列表里只要有一個任務完成了,就立即返回。而且一旦正常或異常返回后,則取消尚未完成的任務。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException:
超時等待,同上。

boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException:
一直等待,直到所有任務完成。請求關閉、發生超時或者當前線程中斷,無論哪一個首先發生之后,都將導致阻塞,直到所有任務完成執行,或者超時時間的到來如果此執行程序終止,則返回 true;如果終止前超時期滿,則返回 false

3. 管理生命周期

void shutdown():
啟動一次順序關閉,執行以前提交的任務,但不接受新任務。如果已經關閉,則調用沒有其他作用。
List<Runnable> shutdownNow():
試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。 無法保證能夠停止正在處理的活動執行任務,但是會盡力嘗試。例如,在 ThreadPoolExecutor 中,通過 Thread.interrupt() 來取消典型的實現,所以如果任務無法響應中斷,則永遠無法終止。
boolean isShutdown(): 如果此執行程序已關閉,則返回 true。
boolean isTerminated():
如果關閉后所有任務都已完成,則返回 true。注意,除非首先調用 shutdown 或 shutdownNow,否則 isTerminated 永不為 true。

用法示例

下面給出了一個網絡服務的簡單結構,這里線程池中的線程作為傳入的請求。它使用了預先配置的 Executors.newFixedThreadPool(int) 工廠方法:

class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public NetworkService(int port, int poolSize)
        throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }
 
    public void run() { // run the service
      try {
        for (;;) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // read and service request on socket
    }
 }
 

下列方法分兩個階段關閉 ExecutorService。第一階段調用 shutdown 拒絕傳入任務,然后調用 shutdownNow(如有必要)取消所有遺留的任務:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

參考文獻

  • 《java並發編程的藝術》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM