在上一篇《並發編程(十一)—— Java 線程池 實現原理與源碼深度解析(一)》中提到了線程池ThreadPoolExecutor的原理以及它的execute方法。這篇文章是接着上一篇文章寫的,如果你沒有閱讀上一篇文章,建議你去讀讀。本文解析ThreadPoolExecutor#submit。
對於一個任務的執行有時我們不需要它返回結果,但是有我們需要它的返回執行結果。對於線程來講,如果不需要它返回結果則實現Runnable,而如果需要執行結果的話則可以實現Callable。在線程池同樣execute提供一個不需要返回結果的任務執行,而對於需要結果返回的則可調用其submit方法。
AbstractExecutorService
我們把上一篇文章的代碼貼過來
1 public abstract class AbstractExecutorService implements ExecutorService { 2 3 // RunnableFuture 是用於獲取執行結果的,我們常用它的子類 FutureTask 4 // 下面兩個 newTaskFor 方法用於將我們的任務包裝成 FutureTask 提交到線程池中執行 5 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 6 return new FutureTask<T>(runnable, value); 7 } 8 9 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 10 return new FutureTask<T>(callable); 11 } 12 13 // 提交任務 14 public Future<?> submit(Runnable task) { 15 if (task == null) throw new NullPointerException(); 16 // 1. 將任務包裝成 FutureTask 17 RunnableFuture<Void> ftask = newTaskFor(task, null); 18 // 2. 交給執行器執行,execute 方法由具體的子類來實現 19 // 前面也說了,FutureTask 間接實現了Runnable 接口。 20 execute(ftask); 21 return ftask; 22 } 23 24 public <T> Future<T> submit(Runnable task, T result) { 25 if (task == null) throw new NullPointerException(); 26 // 1. 將任務包裝成 FutureTask 27 RunnableFuture<T> ftask = newTaskFor(task, result); 28 // 2. 交給執行器執行 29 execute(ftask); 30 return ftask; 31 } 32 33 public <T> Future<T> submit(Callable<T> task) { 34 if (task == null) throw new NullPointerException(); 35 // 1. 將任務包裝成 FutureTask 36 RunnableFuture<T> ftask = newTaskFor(task); 37 // 2. 交給執行器執行 38 execute(ftask); 39 return ftask; 40 } 41 }
盡管submit方法能提供線程執行的返回值,但只有實現了Callable才會有返回值,而實現Runnable的線程則是沒有返回值的,也就是說在上面的3個方法中,submit(Callable<T> task)能獲取到它的返回值,submit(Runnable task, T result)能通過傳入的載體result間接獲得線程的返回值或者准確來說交給線程處理一下,而最后一個方法submit(Runnable task)則是沒有返回值的,就算獲取它的返回值也是null。
使用示例
submit(Callable<T> task)
1 /** 2 * @author: ChenHao 3 * @Date: Created in 14:54 2019/1/11 4 */ 5 public class Test { 6 public static void main(String[] args) throws ExecutionException, InterruptedException { 7 Callable<String> callable = new Callable<String>() { 8 public String call() throws Exception { 9 System.out.println("This is ThreadPoolExetor#submit(Callable<T> task) method."); 10 return "result"; 11 } 12 }; 13 14 ExecutorService executor = Executors.newSingleThreadExecutor(); 15 Future<String> future = executor.submit(callable); 16 executor.shutdown(); 17 System.out.println(future.get()); 18 } 19 }
運行結果:
submit(Runnable task, T result)
1 /** 2 * @author: ChenHao 3 * @Date: Created in 14:54 2019/1/11 4 */ 5 public class Test { 6 public static void main(String[] args) throws ExecutionException, InterruptedException { 7 8 ExecutorService executor = Executors.newSingleThreadExecutor(); 9 Data data = new Data(); 10 Future<Data> future = executor.submit(new Task(data), data); 11 executor.shutdown(); 12 System.out.println(future.get().getName()); 13 } 14 } 15 class Data { 16 String name; 17 public String getName() { 18 return name; 19 } 20 public void setName(String name) { 21 this.name = name; 22 } 23 } 24 25 class Task implements Runnable { 26 Data data; 27 public Task(Data data) { 28 this.data = data; 29 } 30 @Override 31 public void run() { 32 System.out.println("This is ThreadPoolExetor#submit(Runnable task, T result) method."); 33 data.setName("陳浩"); 34 } 35 }
運行結果:
submit(Runnable task)
1 /** 2 * @author: ChenHao 3 * @Date: Created in 14:54 2019/1/11 4 */ 5 public class Test { 6 public static void main(String[] args) throws ExecutionException, InterruptedException { 7 Runnable runnable = new Runnable() { 8 @Override 9 public void run() { 10 System.out.println("This is ThreadPoolExetor#submit(Runnable runnable) method."); 11 } 12 }; 13 14 ExecutorService executor = Executors.newSingleThreadExecutor(); 15 Future future = executor.submit(runnable); 16 executor.shutdown(); 17 System.out.println(future.get()); 18 } 19 }
運行結果:
從上面的源碼可以看到,這三者方法幾乎是一樣的,關鍵就在於:
1 RunnableFuture<T> ftask = newTaskFor(task); 2 execute(ftask);
是如何將一個任務作為參數傳遞給了newTaskFor,然后調用execute方法,最后進而返回ftask的呢?
1 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 2 return new FutureTask<T>(runnable, value); 3 } 4 5 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 6 return new FutureTask<T>(callable); 7 }
源碼分析
這里我建議大家去看看我之前的一篇文章《Java 多線程(五)—— 線程池基礎 之 FutureTask源碼解析》
submit(Callable<T> task)
我們看上面的源碼中知道實際上是調用了如下代碼
1 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 2 return new FutureTask<T>(callable); 3 }
我們看看 FutureTask 的結構
1 public class FutureTask<V> implements RunnableFuture<V> { 2 private volatile int state; 3 private static final int NEW = 0; //初始狀態 4 private static final int COMPLETING = 1; //結果計算完成或響應中斷到賦值給返回值之間的狀態。 5 private static final int NORMAL = 2; //任務正常完成,結果被set 6 private static final int EXCEPTIONAL = 3; //任務拋出異常 7 private static final int CANCELLED = 4; //任務已被取消 8 private static final int INTERRUPTING = 5; //線程中斷狀態被設置ture,但線程未響應中斷 9 private static final int INTERRUPTED = 6; //線程已被中斷 10 11 //將要執行的任務 12 private Callable<V> callable; //用於get()返回的結果,也可能是用於get()方法拋出的異常 13 private Object outcome; // non-volatile, protected by state reads/writes //執行callable的線程,調用FutureTask.run()方法通過CAS設置 14 private volatile Thread runner; //棧結構的等待隊列,該節點是棧中的最頂層節點。 15 private volatile WaitNode waiters; 16 17 public FutureTask(Callable<V> callable) { 18 if (callable == null) 19 throw new NullPointerException(); 20 this.callable = callable; 21 this.state = NEW; // ensure visibility of callable 22 } 23 .... 24 }
1 public interface RunnableFuture<V> extends Runnable, Future<V> { 2 /** 3 * Sets this Future to the result of its computation 4 * unless it has been cancelled. 5 */ 6 void run(); 7 }
我們知道 FutureTask 繼承了 Runnable,這里將 Callable<T> callable 的實例封裝成 FutureTask 傳給 execute(ftask);我們再來看看上一篇文章中線程運行的代碼
1 // 此方法由 worker 線程啟動后調用,這里用一個 while 循環來不斷地從等待隊列中獲取任務並執行 2 // 前面說了,worker 在初始化的時候,可以指定 firstTask,那么第一個任務也就可以不需要從隊列中獲取 3 final void runWorker(Worker w) { 4 Thread wt = Thread.currentThread(); 5 // 該線程的第一個任務(如果有的話) 6 Runnable task = w.firstTask; 7 w.firstTask = null; 8 w.unlock(); // allow interrupts 9 boolean completedAbruptly = true; 10 try { 11 // 循環調用 getTask 獲取任務 12 while (task != null || (task = getTask()) != null) { 13 w.lock(); 14 // 如果線程池狀態大於等於 STOP,那么意味着該線程也要中斷 15 if ((runStateAtLeast(ctl.get(), STOP) || 16 (Thread.interrupted() && 17 runStateAtLeast(ctl.get(), STOP))) && 18 !wt.isInterrupted()) 19 wt.interrupt(); 20 try { 21 beforeExecute(wt, task); 22 Throwable thrown = null; 23 try { 24 // 到這里終於可以執行任務了,這里是最重要的,task是什么?是Worker 中的firstTask屬性 25 26 task.run(); 27 } catch (RuntimeException x) { 28 thrown = x; throw x; 29 } catch (Error x) { 30 thrown = x; throw x; 31 } catch (Throwable x) { 32 thrown = x; throw new Error(x); 33 } finally { 34 afterExecute(task, thrown); 35 } 36 } finally { 37 // 一個任務執行完了,這個線程還可以復用,接着去隊列中拉取任務執行 38 // 置空 task,准備 getTask 獲取下一個任務 39 task = null; 40 // 累加完成的任務數 41 w.completedTasks++; 42 // 釋放掉 worker 的獨占鎖 43 w.unlock(); 44 } 45 } 46 completedAbruptly = false; 47 } finally { 48 // 如果到這里,需要執行線程關閉: 49 // 說明 getTask 返回 null,也就是超過corePoolSize的線程過了超時時間還沒有獲取到任務,也就是說,這個 worker 的使命結束了,執行關閉 50 processWorkerExit(w, completedAbruptly); 51 } 52 }
由上面第6行代碼 task 就是execute(ftask)傳入的任務,第26行 task.run(); 實際上就是 new FutureTask<T>(callable).run(),我們看看FutureTask中的run()方法
1 public void run() { 2 //保證callable任務只被運行一次 3 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 4 return; 5 try { 6 Callable < V > c = callable; 7 if (c != null && state == NEW) { 8 V result; 9 boolean ran; 10 try { 11 //執行任務,上面的例子我們可以看出,call()里面可能是一個耗時的操作,不過這里是同步的 12 result = c.call(); 13 //上面的call()是同步的,只有上面的result有了結果才會繼續執行 14 ran = true; 15 } catch (Throwable ex) { 16 result = null; 17 ran = false; 18 setException(ex); 19 } 20 if (ran) 21 //執行完了,設置result 22 set(result); 23 } 24 } 25 finally { 26 runner = null; 27 int s = state; 28 //判斷該任務是否正在響應中斷,如果中斷沒有完成,則等待中斷操作完成 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
在 FutureTask的構造方法中 this.callable = callable; ,因此我們可以知道上面run()方法中第6行 c 就是 代碼示例中的 new Callable<String>(),c.call()就是調用 代碼示例中new Callable 的call方法,並且這里可以取到返回結果,第22行處設置FutureTask 中 outcome 的值,這樣線程就可以取到返回值了。
1 protected void set(V v) { 2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 3 outcome = v; 4 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 5 finishCompletion(); 6 } 7 }
取值我就不分析了,我之前的文章里面有詳細分析。
submit(Runnable task, T result)
1 public <T> Future<T> submit(Runnable task, T result) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<T> ftask = newTaskFor(task, result); 4 execute(ftask); 5 return ftask; 6 } 7 8 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 9 return new FutureTask<T>(runnable, value); 10 }
我們來看看FutureTask的另外一個構造方法
1 public FutureTask(Runnable runnable, V result) { 2 this.callable = Executors.callable(runnable, result); 3 this.state = NEW; // ensure visibility of callable 4 }
1 public static <T> Callable<T> callable(Runnable task, T result) { 2 if (task == null) 3 throw new NullPointerException(); 4 return new RunnableAdapter<T>(task, result); 5 } 6 7 static final class RunnableAdapter<T> implements Callable<T> { 8 final Runnable task; 9 final T result; 10 RunnableAdapter(Runnable task, T result) { 11 this.task = task; 12 this.result = result; 13 } 14 public T call() { 15 task.run(); 16 return result; 17 } 18 }
上面將 runnable, result 封裝成了 RunnableAdapter 作為FutureTask的callable屬性,這和上面的submit(Callable<T> task) 是不同的,submit(Callable<T> task)是直接將 Callable<T> task作為FutureTask的callable屬性。我們看看FutureTask中的run()方法中第6行 c 就是FutureTask 構造方法中的new RunnableAdapter<T>(task, result) ,c.call()就是調用 RunnableAdapter<T>(task, result) 的call方法,call()中的task.run()就是上面代碼示例中new Task(data) 中的 run(),run()方法中業務大代碼改變了data對象的屬性,callable(Runnable task, T result)中也是傳的相同的對象data, 所以,result = c.call(); 就是把更改后的data返回,並且將data設置為設置FutureTask 中 outcome 的值,后面的邏輯就是一樣的了。
這里可以看成將同一個data傳入線程進行處理,同時這個data也傳入FutureTask中,並且在RunnableAdapter通過屬性進行保存data,等線程將data處理完了,由於是同一個對象,RunnableAdapter中的result也就是data指向的是同一個對象,然后把此result返回到FutureTask保存在屬性outcome中,就可以通過FutureTask.get()取到運行結果了。
如果new FutureTask<T>(runnable, null),則result = c.call(); 返回的值也是null,最后從線程池中get的值也是null。