深入淺出吃透多線程、線程池核心原理及代碼詳解


一、多線程詳解

  1、什么是線程

  線程是一個操作系統概念。操作系統負責這個線程的創建、掛起、運行、阻塞和終結操作。而操作系統創建線程、切換線程狀態、終結線程都要進行CPU調度——這是一個耗費時間和系統資源的事情。

  2、線程生命周期

  Java當中,線程通常都有五種狀態,創建、就緒、運行、阻塞和死亡:

  • 創建狀態。在生成線程對象,並沒有調用該對象的start方法,這是線程處於創建狀態。
  • 就緒狀態。當調用了線程對象的start方法之后,該線程就進入了就緒狀態,但是此時線程調度程序還沒有把該線程設置為當前線程,此時處於就緒狀態。在線程運行之后,從等待或者睡眠中回來之后,也會處於就緒狀態。
  • 運行狀態。線程調度程序將處於就緒狀態的線程設置為當前線程,此時線程就進入了運行狀態,開始運行run函數當中的代碼。
  • 阻塞狀態。線程正在運行的時候,被暫停,通常是為了等待某個時間的發生(比如說某項資源就緒)之后再繼續運行。sleep,suspend,wait等方法都可以導致線程阻塞。
  • 死亡狀態。如果一個線程的run方法執行結束或者調用stop方法后,該線程就會死亡。對於已經死亡的線程,無法再使用start方法令其進入就緒。

           

  可以用過jstack 或者idea debug快照顯示狀態,常見名詞大致意思為:

  • "Low Memory Detector":負責對可使用內存進行檢測,如果發現可用內存低,分配新的內存空間。
  • "CompilerThread0":用來調用JITing,實時編譯裝卸class。
  • "Signal Dispatcher":負責分發內部事件。
  • "Finalizer":負責調用Finalizer方法。
  • "Reference Handler":負責處理引用。
  • "main":是主線程。
  • "VM Thread", "VM Periodic Task Thread":從名字上看是虛機內部線程。

  3、線程狀態描述

  • NEW:狀態是指線程剛創建, 尚未啟動。
  • RUNNABLE:狀態是線程正在正常運行中, 當然可能會有某種耗時計算/IO等待的操作/CPU時間片切換等, 這個狀態下發生的等待一般是其他系統資源, 而不是鎖, Sleep等
  • BLOCKED:這個狀態下, 是在多個線程有同步操作的場景, 比如正在等待另一個線程的synchronized 塊的執行釋放, 或者可重入的 synchronized塊里別人調用wait() 方法, 也就是這里是線程在等待進入臨界區
  • WAITING:這個狀態下是指線程擁有了某個鎖之后, 調用了他的wait方法, 等待其他線程/鎖擁有者調用 notify / notifyAll 一般該線程可以繼續下一步操作
  • TIMED_WAITING: 這個狀態就是有限的(時間限制)的WAITING, 一般出現在調用wait(long), join(long)等情況下, 另外一個線程sleep后, 也會進入TIMED_WAITING狀態
  • TERMINATED:這個狀態下表示 該線程的run方法已經執行完畢了, 基本上就等於死亡了(當時如果線程被持久持有, 可能不會被回收)

  要區分 BLOCKED 和 WATING 的區別, 一個是在臨界點外面等待進入, 一個是在理解點里面wait等待別人notify, 線程調用了join方法 join了另外的線程的時候, 也會進入WAITING狀態, 等待被他join的線程執行結束。核心區別就是BLOCKED沒拿到鎖,WAITING拿到了鎖。

  4、線程優先級Priority

  線程的優先級是將該線程的重要性傳給了調度器、cpu處理線程順序有一定的不確定,但是調度器會傾向於優先權高的先執行。

  5、線程實現方式

  線程有三種實現方式:Thread、Runnable、Callable。

  Thread實現方式代碼如下:

public class Thread01 extends Thread {

   @Override
   public void run() {
      System.out.println("Thread 方式創建線程");
   }

   public static void main(String[] args) throws InterruptedException {
      new Thread01().start();//多線程
   }
}

  Runnable實現方式:

public class Runnable01 implements  Runnable {
   @Override
   public void run() {
      System.out.println("Runnable方式創建線程");
   }

   public static void main(String[] args) {
      new Thread(new Runnable01()).start();
   }
}

  Callable實現方式:

public class Callable01 implements Callable<String> {
   @Override
   public String call() throws Exception {
      System.out.println("Callable方式創建線程");
      return "Callable";
   }

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      FutureTask task=new FutureTask(new Callable01());//有參 賦值 成員屬性
      new Thread(task).start();
      System.out.println( task.get());;
   }
}

  6、Thread和Runnable的聯系與區別

  • Runnable的實現方式是實現其接口即可。
  • Thread的實現方式是繼承其類。
  • Runnable接口支持多繼承,但基本上用不到。
  • Thread實現了Runnable接口並進行了擴展,而Thread和Runnable的實質是實現的關系,不是同類東西,所以Runnable或Thread本身沒有可比性。
    public class Thread implements Runnable {
        // 省略
        @Override
        public void run() {
            if (target != null) {
                target.run();
            }
        }
        // 省略
    }

  綜上所述:Thread和Runnable的實質是繼承關系,沒有可比性。無論使用Runnable還是Thread,都會new Thread,然后執行run方法。用法上,如果有復雜的線程操作需求,那就選擇繼承Thread,如果只是簡單的執行一個任務,那就實現runnable。

  7、Callable原理是什么

  Callable 1.5引入,具有返回值,並且支持泛型:

public interface Callable<V> {
    V call() throws Exception;
}

  返回加入泛型既可以返回Object,也可以讓調用限定類型,更靈活。Callble相關源碼如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

public class FutureTask<V> implements RunnableFuture<V> {

private Callable<V> callable;
public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }

  我們在使用以下代碼構建task時,實際上在FutureTask類的構造方法就給自己的屬性callable進行了賦值。

FutureTask task=new FutureTask(new Callable01());

  而可以看到FutureTask實際上也是一個Runnable的具體實現,因此可以使用以下方法進行task執行(和Runnable的使用方式一致):

new Thread(task).start();

  調用start方法,實際上就是調用Runnable的run方法,因此調用了FutureTask的run方法,然后這個新起的線程采用方法調用方式調用了具體Callable實現類的call方法,並將返回值進行set,因此我們可以通過task.get()方法獲取執行結果。

  8、和使用線程池有什么不一樣

  看以下代碼:

public class ThreadPkTest {
    public static void main(String[] args) throws InterruptedException {
        Long start= System.currentTimeMillis();
        final List<Integer> l=new ArrayList<Integer>();
        final Random random=new Random();
        for(int i=0;i<10000;i++){
            Thread thread=new Thread(){
                public void run(){
                    l.add(random.nextInt());
                }
            };
            thread.start();
            thread.join();
        }
        System.out.println("直接創建線程執行時間:"+(System.currentTimeMillis()-start));
        System.out.println("size:"+l.size());

        start= System.currentTimeMillis();
        final List<Integer> list=new ArrayList<Integer>();
        ExecutorService executorService= Executors.newSingleThreadExecutor();for(int i=0;i<10000;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    list.add(random.nextInt());
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("線程池執行時間:"+(System.currentTimeMillis()-start));
        System.out.println("size:"+list.size());
        
    }
}

  執行結果如下:

直接創建線程執行時間:1601
size:10000
線程池執行時間:33
size:10000

  由此可以對比線程池效率要高出很多,是什么原因呢?大致有這么幾點:

  • 避免線程的創建和銷毀帶來的性能開銷;
  • 避免大量的線程因為互相搶占系統資源導致的阻塞現象;
  • 能夠對線程進行簡單的管理並提供定時執行、間隔執行等功能(和性能無關)。

  那我們接下來就核心進行線程池的研究。

二、線程池代碼詳解

  1、線程池使用示例

  首先我們來看下如何使用線程池,線程持有submit以及execute兩種寫法,代碼如下:

public class ThreadPool01 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(()->   System.out.println("Submit方式執行optimized"));
        executorService.submit(new Runnable() {
            @Override
            public void run() {

                System.out.println("Submit方式執行");
            }
        });
        executorService.execute(()-> System.out.println("Execute方式執行optimized"));
        executorService.execute(new Runnable() {
            @Override
            public void run() {

                System.out.println("Execute方式執行");
            }
        });
        executorService.shutdown();
    }
}

  2、線程池類、接口

  然后我們來看看線程池有哪些類與接口,核心如圖所示:

           

   如圖所示,有這么一些重要的接口與類,如下表所示:

   

  3、線程池執行流程

  3.1、初始化ThreadPoolExecutor

  不管我們是通過Executors工具類快速初始化線程池,還是手動配置線程池參數,我們第一步都是初始化線程池:

ExecutorService executorService = Executors.newCachedThreadPool();    //快速構建
ExecutorService es = new ThreadPoolExecutor(5, 5,              //手動構建 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                Executors.defaultThreadFactory())

  參數詳情如下:

public ThreadPoolExecutor(
        int corePoolSize,    //核心線程數大小 - 10
        int maximumPoolSize,   //最大線程數 - 100
        long keepAliveTime,   //非核心線程存活時間
        TimeUnit unit,   //時間單位
        BlockingQueue<Runnable> workQueue,   //存放任務的阻塞隊列
        ThreadFactory threadFactory,   //創建線程的工廠
       RejectedExecutionHandler handler    //拒絕策略
)

  3.2、調用execute、submit執行

  我們知道有兩種方式,分別是submit和execute,但是底層核心都是調用execute,無非是submit有返回,execute無返回。代碼如下:

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

  但是其實execute和submit還有點不同,就是task類型不一樣,submit類型是FutureTask,而execute的task類型是線程池運行的run方法所屬類的類型。

  3.3、核心、非核心線程協作原理

  如execute方法中的代碼所示:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  如上四處代碼,如下圖所示四種不同規則所示:

           

   所以當一個任務通過execute(Runnable)方法添加到線程池時:

  • 如果此時線程池中的數量小於corePoolSize,創建新的核心線程來處理被添加的任務。
  • 如果此時線程池中的數量等於 corePoolSize,則新任務被添加到workQueue隊列中,直到workQueue隊列滿,但不超過maximumPoolSize。
  • 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的非核心線程來處理被添加的任務。
  • 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。 

  綜上所述:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。

  3.4、創建Worker對象addWorker

  Worker是一個實現了Runnable接口的類,Worker的執行最終會調用我們提交的任務中的run()方法。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}

  創建Worker對象代碼如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();        //此處會調用Worker這個Thread包裝類的start方法,start方法會調用run方法,run方法會調用runWorker方法。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  3.5、啟動worker對象

  啟動worker對象如下代碼所示:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  Worker啟動后,會執行我們提交的任務的run()方法,執行完成后會調用finally中的 processWorkerExit 方法。

  3.6、循環調用Worker

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

  由此可知,調用循環進入到了3.4。

  4、什么是Worker

  • Worker是ThreadPoolExecutor類的一個內部類,這里Worker就是thread和task的一個包裝類,它的職能就是控制中斷和任務的運行。
  • Worker是一個集成了AQS,實現了Runnable方法的內部類。Worker創建好后,通過new好的線程來運行任務。Worker本身不運行run,而是里面thread通過start運行這個方法。
  • 核心Worker通過while不斷從隊列中取出任務(addWorker入參為null時從隊列取,否則就說明是新添加到隊列要執行的任務),任務隊列為空線程就阻塞;非核心Worker也是通過while不斷取任務,只是有個取任務時keepAliveTime的超時時間,在時間之內取不到的任務的話線程就跳出循環,自動銷毀了。

  5、拒絕策略

  有四種拒絕策略,分別如下:
  • AbortPolicy (默認):當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常。
  • CallerRunsPolicy:當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程池中處理被拒絕的任務。
  • DiscardOldestPolicy:當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務,然后將被拒絕的任務添加到等待隊列中
  • DiscardPolicy:當任務添加到線程池中被拒絕時,線程池將丟棄被拒絕的任務。

  6、Java中提供的幾種快捷線程池

  • newFixedThreadPool;通過創建一個corePoolSize和maximumPoolSize相同的線程池。使用LinkedBlockingQuene作為阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程。
  • newCachedThreadPool:初始化一個可以緩存線程的線程池,默認緩存60s,線程池的線程數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞隊列;和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷;
  • newSingleThreadExecutor;初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞隊列。
  • newScheduledThreadPool;初始化的線程池可以在指定的時間內周期性的執行所提交的任務,在實際的業務場景中可以使用該線程池定期的同步數據。除了newScheduledThreadPool的內部實現特殊一點之外,其它幾個線程池都是基於ThreadPoolExecutor類實現的。

  7、newScheduledThreadPool

  newScheduledThreadPool是一個可以在指定的時間內周期性執行所提交的任務,有以下兩種模式:

scheduleWithFixedDelay:上一個任務執行完的時間后固定時間,與任務執行時間有關
scheduleAtFixedRate:固定速率,與任務執行所需時間無關 

  其核心代碼差別為:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0L)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period),
                                      sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0L)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      -unit.toNanos(delay),
                                      sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  兩者區別就是都會執行這段核心代碼:

private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM