線程池創建線程的邏輯圖:
我們分析CachedThreadPool線程池里的線程是如何被回收的。
//Executors public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
牢牢記住CachedThreadPool的corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
工作線程的死循環:
//ThreadPoolExecutor final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
當工作線程第二次獲取的task等於null時,線程將退出while循環,於是就死掉了。
//ThreadPoolExecutor private Runnable getTask() { // 標記poll()是否超時 boolean timedOut = false; retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //工作線程數-1 decrementWorkerCount(); //返回null,工作線程將退出while循環,即線程會死掉 return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // allowCoreThreadTimeOut 默認為 false, newCachedThreadPool的corePoolSize為0 // 所以 timed = false || true,timed恆為true timed = allowCoreThreadTimeOut || wc > corePoolSize; // 對於newCachedThreadPool,wc 恆小於 maximumPoolSize // 第一次進for循環 true && !(false && true) = true // poll超時后,第二次進for循環 true && !(true && true) = false if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) //poll超時后,第二次進for循環, return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { //走poll分支 //poll會阻塞,直到有人調用workQueue.offer;或者超時返回null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //超時 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
如圖中所示:有2種情況會創建工作線程,1. 工作線程數小於corePoolSize;2. 入隊失敗,且工作線程數小於maximumPoolSize
//ThreadPoolExecutor public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //對於CachedThreadPool,如果有工作線程在poll中阻塞,則入隊成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //對於CachedThreadPool,如果沒有工作線程在poll中阻塞,則入隊失敗 //初次調用execute,走這個分支,創建工作線程 else if (!addWorker(command, false)) reject(command); }
CachedThreadPool使用的是SynchronousQueue的
入隊 :offer(E e)
出隊:poll(long timeout, TimeUnit unit)
工作線程調用poll阻塞,等待timeout時間,如果超時,則返回null並回收線程;如果在等待期內,有任務入隊,則成功返回任務,繼續執行線程while循環。