代碼示例:
public class ThreadPool_Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(new MyRunner()); Future<String> future = pool.submit(new MyCaller()); String ret = future.get(); System.out.println(ret); } } class MyCaller implements Callable<String> { @Override public String call() throws Exception { System.out.println("calling"); return "return_from_call"; } } class MyRunner implements Runnable { @Override public void run() { System.out.println("running"); } }
execute 方法執行 runnable 任務,submit 方法執行 callable 任務,callable 任務有返回值,而 runnable 任務是 void 的,無返回值。
// void java.util.concurrent.ThreadPoolExecutor final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
對於 Runnable,task 是 MyRunner,對於 Callable,task 是 FutureTask。
submit 方法的調用棧:
創建 FutureTask 對象,把 Callable 對象包裹起來,在 run 方法中調用 Callable 對象的方法,並設置返回值。
// java.util.concurrent.FutureTask.FutureTask public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }