一、多線程詳解
1、什么是線程
線程是一個操作系統概念。操作系統負責這個線程的創建、掛起、運行、阻塞和終結操作。而操作系統創建線程、切換線程狀態、終結線程都要進行CPU調度——這是一個耗費時間和系統資源的事情。
2、線程生命周期
Java當中,線程通常都有五種狀態,創建、就緒、運行、阻塞和死亡:
- 創建狀態。在生成線程對象,並沒有調用該對象的start方法,這是線程處於創建狀態。
- 就緒狀態。當調用了線程對象的start方法之后,該線程就進入了就緒狀態,但是此時線程調度程序還沒有把該線程設置為當前線程,此時處於就緒狀態。在線程運行之后,從等待或者睡眠中回來之后,也會處於就緒狀態。
- 運行狀態。線程調度程序將處於就緒狀態的線程設置為當前線程,此時線程就進入了運行狀態,開始運行run函數當中的代碼。
- 阻塞狀態。線程正在運行的時候,被暫停,通常是為了等待某個時間的發生(比如說某項資源就緒)之后再繼續運行。sleep,suspend,wait等方法都可以導致線程阻塞。
- 死亡狀態。如果一個線程的run方法執行結束或者調用stop方法后,該線程就會死亡。對於已經死亡的線程,無法再使用start方法令其進入就緒。
可以用過jstack 或者idea debug快照顯示狀態,常見名詞大致意思為:
- "Low Memory Detector":負責對可使用內存進行檢測,如果發現可用內存低,分配新的內存空間。
- "CompilerThread0":用來調用JITing,實時編譯裝卸class。
- "Signal Dispatcher":負責分發內部事件。
- "Finalizer":負責調用Finalizer方法。
- "Reference Handler":負責處理引用。
- "main":是主線程。
- "VM Thread", "VM Periodic Task Thread":從名字上看是虛機內部線程。
3、線程狀態描述
- NEW:狀態是指線程剛創建, 尚未啟動。
- RUNNABLE:狀態是線程正在正常運行中, 當然可能會有某種耗時計算/IO等待的操作/CPU時間片切換等, 這個狀態下發生的等待一般是其他系統資源, 而不是鎖, Sleep等
- BLOCKED:這個狀態下, 是在多個線程有同步操作的場景, 比如正在等待另一個線程的synchronized 塊的執行釋放, 或者可重入的 synchronized塊里別人調用wait() 方法, 也就是這里是線程在等待進入臨界區
- WAITING:這個狀態下是指線程擁有了某個鎖之后, 調用了他的wait方法, 等待其他線程/鎖擁有者調用 notify / notifyAll 一般該線程可以繼續下一步操作
- TIMED_WAITING: 這個狀態就是有限的(時間限制)的WAITING, 一般出現在調用wait(long), join(long)等情況下, 另外一個線程sleep后, 也會進入TIMED_WAITING狀態
- TERMINATED:這個狀態下表示 該線程的run方法已經執行完畢了, 基本上就等於死亡了(當時如果線程被持久持有, 可能不會被回收)
要區分 BLOCKED 和 WATING 的區別, 一個是在臨界點外面等待進入, 一個是在理解點里面wait等待別人notify, 線程調用了join方法 join了另外的線程的時候, 也會進入WAITING狀態, 等待被他join的線程執行結束。核心區別就是BLOCKED沒拿到鎖,WAITING拿到了鎖。
4、線程優先級Priority
線程的優先級是將該線程的重要性傳給了調度器、cpu處理線程順序有一定的不確定,但是調度器會傾向於優先權高的先執行。
5、線程實現方式
線程有三種實現方式:Thread、Runnable、Callable。
Thread實現方式代碼如下:
public class Thread01 extends Thread { @Override public void run() { System.out.println("Thread 方式創建線程"); } public static void main(String[] args) throws InterruptedException { new Thread01().start();//多線程 } }
Runnable實現方式:
public class Runnable01 implements Runnable { @Override public void run() { System.out.println("Runnable方式創建線程"); } public static void main(String[] args) { new Thread(new Runnable01()).start(); } }
Callable實現方式:
public class Callable01 implements Callable<String> { @Override public String call() throws Exception { System.out.println("Callable方式創建線程"); return "Callable"; } public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask task=new FutureTask(new Callable01());//有參 賦值 成員屬性 new Thread(task).start(); System.out.println( task.get());; } }
6、Thread和Runnable的聯系與區別
- Runnable的實現方式是實現其接口即可。
- Thread的實現方式是繼承其類。
- Runnable接口支持多繼承,但基本上用不到。
- Thread實現了Runnable接口並進行了擴展,而Thread和Runnable的實質是實現的關系,不是同類東西,所以Runnable或Thread本身沒有可比性。
public class Thread implements Runnable { // 省略 @Override public void run() { if (target != null) { target.run(); } } // 省略 }
綜上所述:Thread和Runnable的實質是繼承關系,沒有可比性。無論使用Runnable還是Thread,都會new Thread,然后執行run方法。用法上,如果有復雜的線程操作需求,那就選擇繼承Thread,如果只是簡單的執行一個任務,那就實現runnable。
7、Callable原理是什么
Callable 1.5引入,具有返回值,並且支持泛型:
public interface Callable<V> { V call() throws Exception; }
返回加入泛型既可以返回Object,也可以讓調用限定類型,更靈活。Callble相關源碼如下:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); } public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable; public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }
我們在使用以下代碼構建task時,實際上在FutureTask類的構造方法就給自己的屬性callable進行了賦值。
FutureTask task=new FutureTask(new Callable01());
而可以看到FutureTask實際上也是一個Runnable的具體實現,因此可以使用以下方法進行task執行(和Runnable的使用方式一致):
new Thread(task).start();
調用start方法,實際上就是調用Runnable的run方法,因此調用了FutureTask的run方法,然后這個新起的線程采用方法調用方式調用了具體Callable實現類的call方法,並將返回值進行set,因此我們可以通過task.get()方法獲取執行結果。
8、和使用線程池有什么不一樣
看以下代碼:
public class ThreadPkTest { public static void main(String[] args) throws InterruptedException { Long start= System.currentTimeMillis(); final List<Integer> l=new ArrayList<Integer>(); final Random random=new Random(); for(int i=0;i<10000;i++){ Thread thread=new Thread(){ public void run(){ l.add(random.nextInt()); } }; thread.start(); thread.join(); } System.out.println("直接創建線程執行時間:"+(System.currentTimeMillis()-start)); System.out.println("size:"+l.size()); start= System.currentTimeMillis(); final List<Integer> list=new ArrayList<Integer>(); ExecutorService executorService= Executors.newSingleThreadExecutor();for(int i=0;i<10000;i++){ executorService.execute(new Runnable() { @Override public void run() { list.add(random.nextInt()); } }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("線程池執行時間:"+(System.currentTimeMillis()-start)); System.out.println("size:"+list.size()); } }
執行結果如下:
直接創建線程執行時間:1601 size:10000 線程池執行時間:33 size:10000
由此可以對比線程池效率要高出很多,是什么原因呢?大致有這么幾點:
- 避免線程的創建和銷毀帶來的性能開銷;
- 避免大量的線程因為互相搶占系統資源導致的阻塞現象;
- 能夠對線程進行簡單的管理並提供定時執行、間隔執行等功能(和性能無關)。
那我們接下來就核心進行線程池的研究。
二、線程池代碼詳解
1、線程池使用示例
首先我們來看下如何使用線程池,線程持有submit以及execute兩種寫法,代碼如下:
public class ThreadPool01 { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(()-> System.out.println("Submit方式執行optimized")); executorService.submit(new Runnable() { @Override public void run() { System.out.println("Submit方式執行"); } }); executorService.execute(()-> System.out.println("Execute方式執行optimized")); executorService.execute(new Runnable() { @Override public void run() { System.out.println("Execute方式執行"); } }); executorService.shutdown(); } }
2、線程池類、接口
然后我們來看看線程池有哪些類與接口,核心如圖所示:
如圖所示,有這么一些重要的接口與類,如下表所示:
3、線程池執行流程
3.1、初始化ThreadPoolExecutor
不管我們是通過Executors工具類快速初始化線程池,還是手動配置線程池參數,我們第一步都是初始化線程池:
ExecutorService executorService = Executors.newCachedThreadPool(); //快速構建 ExecutorService es = new ThreadPoolExecutor(5, 5, //手動構建 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory())
參數詳情如下:
public ThreadPoolExecutor( int corePoolSize, //核心線程數大小 - 10 int maximumPoolSize, //最大線程數 - 100 long keepAliveTime, //非核心線程存活時間 TimeUnit unit, //時間單位 BlockingQueue<Runnable> workQueue, //存放任務的阻塞隊列 ThreadFactory threadFactory, //創建線程的工廠 RejectedExecutionHandler handler //拒絕策略 )
3.2、調用execute、submit執行
我們知道有兩種方式,分別是submit和execute,但是底層核心都是調用execute,無非是submit有返回,execute無返回。代碼如下:
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
但是其實execute和submit還有點不同,就是task類型不一樣,submit類型是FutureTask,而execute的task類型是線程池運行的run方法所屬類的類型。
3.3、核心、非核心線程協作原理
如execute方法中的代碼所示:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
如上四處代碼,如下圖所示四種不同規則所示:
所以當一個任務通過execute(Runnable)方法添加到線程池時:
- 如果此時線程池中的數量小於corePoolSize,創建新的核心線程來處理被添加的任務。
- 如果此時線程池中的數量等於 corePoolSize,則新任務被添加到workQueue隊列中,直到workQueue隊列滿,但不超過maximumPoolSize。
- 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的非核心線程來處理被添加的任務。
- 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。
綜上所述:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
3.4、創建Worker對象addWorker
Worker是一個實現了Runnable接口的類,Worker的執行最終會調用我們提交的任務中的run()方法。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}
創建Worker對象代碼如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //此處會調用Worker這個Thread包裝類的start方法,start方法會調用run方法,run方法會調用runWorker方法。 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
3.5、啟動worker對象
啟動worker對象如下代碼所示:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
Worker啟動后,會執行我們提交的任務的run()方法,執行完成后會調用finally中的 processWorkerExit 方法。
3.6、循環調用Worker
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
由此可知,調用循環進入到了3.4。
4、什么是Worker
- Worker是ThreadPoolExecutor類的一個內部類,這里Worker就是thread和task的一個包裝類,它的職能就是控制中斷和任務的運行。
- Worker是一個集成了AQS,實現了Runnable方法的內部類。Worker創建好后,通過new好的線程來運行任務。Worker本身不運行run,而是里面thread通過start運行這個方法。
- 核心Worker通過while不斷從隊列中取出任務(addWorker入參為null時從隊列取,否則就說明是新添加到隊列要執行的任務),任務隊列為空線程就阻塞;非核心Worker也是通過while不斷取任務,只是有個取任務時keepAliveTime的超時時間,在時間之內取不到的任務的話線程就跳出循環,自動銷毀了。
5、拒絕策略
- AbortPolicy (默認):當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常。
- CallerRunsPolicy:當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程池中處理被拒絕的任務。
- DiscardOldestPolicy:當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務,然后將被拒絕的任務添加到等待隊列中
- DiscardPolicy:當任務添加到線程池中被拒絕時,線程池將丟棄被拒絕的任務。
6、Java中提供的幾種快捷線程池
- newFixedThreadPool;通過創建一個corePoolSize和maximumPoolSize相同的線程池。使用LinkedBlockingQuene作為阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程。
- newCachedThreadPool:初始化一個可以緩存線程的線程池,默認緩存60s,線程池的線程數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞隊列;和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷;
- newSingleThreadExecutor;初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞隊列。
- newScheduledThreadPool;初始化的線程池可以在指定的時間內周期性的執行所提交的任務,在實際的業務場景中可以使用該線程池定期的同步數據。除了newScheduledThreadPool的內部實現特殊一點之外,其它幾個線程池都是基於ThreadPoolExecutor類實現的。
7、newScheduledThreadPool
newScheduledThreadPool是一個可以在指定的時間內周期性執行所提交的任務,有以下兩種模式:
scheduleWithFixedDelay:上一個任務執行完的時間后固定時間,與任務執行時間有關
scheduleAtFixedRate:固定速率,與任務執行所需時間無關
其核心代碼差別為:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
兩者區別就是都會執行這段核心代碼:
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }