Java並發編程系列-(6) Java線程池


目前已經更新完《Java並發編程》,《Docker教程》和《JVM性能優化》,歡迎關注【后端精進之路】,輕松閱讀全部文章。

Java並發編程:

Docker教程:

JVM性能優化:

6. 線程池

6.1 基本概念

在web開發中,服務器需要接受並處理請求,所以會為一個請求來分配一個線程來進行處理。如果每次請求都新創建一個線程的話實現起來非常簡便,但是存在一個問題:如果並發的請求數量非常多,但每個線程執行的時間很短,這樣就會頻繁的創建和銷毀線程,如此一來會大大降低系統的效率。可能出現服務器在為每個請求創建新線程和銷毀線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。

那么有沒有一種辦法使執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務呢?這就是線程池的目的了。線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。

什么時候使用線程池?

  • 單個任務處理時間比較短
  • 需要處理的任務數量很大

使用線程池好處

  • 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

6.2 實現自己的線程池

實現的線程池需要滿足以下基本條件:

1、線程必須在池子已經創建好了,並且可以保持住,要有容器保存多個線程;
2、線程還要能夠接受外部的任務,運行這個任務。容器保持這個來不及運行的任務.

以下是線程池的具體實現:

線程池中實現了任務隊列,用來保存所有的任務;工作線程,來執行具體的任務。

public class MyThreadPool2 {
    // 線程池中默認線程的個數為5
    private static int WORK_NUM = 5;
    // 隊列默認任務個數為100
    private static int TASK_COUNT = 100;  
    
    // 用戶在構造這個池,希望的啟動的線程數
    private final int worker_num;
    // 工作線程組
    private WorkThread[] workThreads;
    // 任務隊列,作為一個緩沖
    private final BlockingQueue<Runnable> taskQueue;

    // 創建具有默認線程個數的線程池
    public MyThreadPool2() {
        this(WORK_NUM,TASK_COUNT);
    }

    // 創建線程池,worker_num為線程池中工作線程的個數
    public MyThreadPool2(int worker_num,int taskCount) {
    	if (worker_num<=0) worker_num = WORK_NUM;
    	if(taskCount<=0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++) {
        	workThreads[i] = new WorkThread();
        	workThreads[i].start();
        }
    }

    // 執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器決定
    public void execute(Runnable task) {
    	try {
			taskQueue.put(task);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
    }

    // 銷毀線程池,該方法保證在所有任務都完成的情況下才銷毀所有線程,否則等待任務完成才銷毀
    public void destroy() {
        // 工作線程停止工作,且置為null
        System.out.println("ready close pool.....");
        for(int i=0;i<worker_num;i++) {
        	workThreads[i].stopWorker();
        	workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任務隊列
    }

    // 覆蓋toString方法,返回線程池信息:工作線程個數和已完成任務個數
    @Override
    public String toString() {
        return "WorkThread number:" + worker_num
                + "  wait task number:" + taskQueue.size();
    }

    /**
     * 內部類,工作線程
     */
    private class WorkThread extends Thread{
    	
    	@Override
    	public void run(){
    		Runnable r = null;
    		try {
				while (!isInterrupted()) {
					r = taskQueue.take();
					if(r!=null) {
						System.out.println(getId()+" ready exec :"+r);
						r.run();
					}
					r = null;//help gc;
				} 
			} catch (Exception e) {
				// TODO: handle exception
			}
    	}
    	
    	public void stopWorker() {
    		interrupt();
    	}
    	
    }
}

以下是測試程序:

分別創建多個任務,並放入線程池進行執行。

public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 創建3個線程的線程池
        MyThreadPool2 t = new MyThreadPool2(3,0);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        t.execute(new MyTask("testF"));
        t.execute(new MyTask("testG"));
        t.execute(new MyTask("testH"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 所有線程都執行完成才destory
        System.out.println(t);
    }

    // 任務類
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 執行任務
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任務 " + name + " 完成");
        }
    }
}

6.3 Executor框架

Executor框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制。

Executor框架的類繼承關系如下圖:

Screen Shot 2019-12-11 at 10.21.49 PM.png

J.U.C中有三個Executor接口:

  • Executor:一個運行新任務的簡單接口;
  • ExecutorService:擴展了Executor接口。添加了一些用來管理執行器生命周期和任務生命周期的方法;
  • ScheduledExecutorService:擴展了ExecutorService。支持Future和定期執行任務。

下面分別進行介紹:

1. Executor接口

Executor接口只有一個execute方法,用來替代通常創建或啟動線程的方法。

public interface Executor {
    void execute(Runnable command);
}

Executor接口只有一個execute方法,用來替代通常創建或啟動線程的方法。

executor.execute(new Thread())

對於不同的Executor實現,execute()方法可能是創建一個新線程並立即啟動,也有可能是使用已有的工作線程來運行傳入的任務,也可能是根據設置線程池的容量或者阻塞隊列的容量來決定是否要將傳入的線程放入阻塞隊列中或者拒絕接收傳入的線程。

2. ExecutorService接口

ExecutorService接口繼承自Executor接口,提供了管理終止的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即時關閉,也就是shutDownNow()方法,則任務需要正確處理中斷。

3. ScheduledExecutorService接口

ScheduledExecutorService擴展ExecutorService接口並增加了schedule方法。調用schedule方法可以在指定的延時后執行一個Runnable或者Callable任務。ScheduledExecutorService接口還定義了按照指定時間間隔定期執行任務的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

4. Executor框架基本使用流程

基本使用流程如下:

Picture1.png

6.4 ThreadPoolExecutor分析

ThreadPoolExecutor繼承自AbstractExecutorService,也實現了ExecutorService接口。JDK中的提供的內置線程池基本都基於ThreadPoolExecutor實現,后面會仔細介紹。

構造函數及參數意義

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

構造方法中的字段含義如下:

  • corePoolSize:線程池中核心線程數,運行的線程數<corePoolSize,就會創建新線程,>= corePoolSize,這個任務就會保存到BlockingQueue,如果調用prestartAllCoreThreads()方法就會一次性的啟動corePoolSize個數的線程。
  • maximumPoolSize: 允許的最大線程數,BlockingQueue也滿了,< maximumPoolSize時候就會再次創建新的線程.
  • keepAliveTime: 線程空閑下來后,存活的時間,這個參數只在 >corePoolSize 才有用.
  • TimeUnit unit: 存活時間的單位值.
  • workQueue:保存等待執行的任務的阻塞隊列,當提交一個新的任務到線程池以后, 線程池會根據當前線程池中正在運行着的線程的數量來決定對該任務的處理方式,主要有以下幾種處理方式:
  1. 使用直接切換隊列:這種方式常用的隊列是SynchronousQueue.
  2. 使用無界隊列:一般使用基於鏈表的阻塞隊列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創建的最大線程數就是corePoolSize,而maximumPoolSize就不會起作用了(后面也會說到)。當線程池中所有的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
  3. 使用有界隊列:一般使用ArrayBlockingQueue。使用該方式可以將線程池的最大線程數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調度變得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務的吞吐率達到一個相對合理的范圍,又想使線程調度相對簡單,並且還要盡可能的降低線程池對資源的消耗,就需要合理的設置這兩個數量。
  • threadFactory:它是ThreadFactory類型的變量,用來創建新線程。默認使用Executors.defaultThreadFactory() 來創建線程。使用默認的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先級並且是非守護線程,同時也設置了線程的名稱。

  • handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊列滿了並且沒有空閑的線程,這時如果繼續提交任務,就需要采取一種策略處理該任務。

線程池提供了4種策略:

  1. AbortPolicy:直接拋出異常,這是默認策略;
  2. CallerRunsPolicy:用調用者所在的線程來執行任務;
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  4. DiscardPolicy:直接丟棄任務;

任務執行

提交任務執行,主要有execute和submit兩種方式,主要區別是后者需要有返回值。

  • execute(Runnable command)
  • Future submit(Callable task)

下面主要介紹execute的流程:

簡單來說,在執行execute()方法時且狀態一直是RUNNING時,的執行過程如下:

  1. 如果workerCount < corePoolSize,則創建並啟動一個線程來執行新提交的任務;
  2. 如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建並啟動一個線程來執行新提交的任務;
  4. 如果workerCount >= maximumPoolSize,並且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

整個流程可以用下圖來總結:

Picture1.png

接下來結合代碼進行分析:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * clt記錄着runState和workerCount
     */
    int c = ctl.get();
    /*
     * workerCountOf方法取出低29位的值,表示當前活動的線程數;
     * 如果當前活動線程數小於corePoolSize,則新建一個線程放入線程池中;
     * 並把任務添加到該線程中。
     */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷;
         * 如果為true,根據corePoolSize來判斷;
         * 如果為false,則根據maximumPoolSize來判斷
         */
        if (addWorker(command, true))
            return;
        /*
         * 如果添加失敗,則重新獲取ctl值
         */
        c = ctl.get();
    }
    /*
     * 如果當前線程池是運行狀態並且任務添加到隊列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新獲取ctl值
        int recheck = ctl.get();
        // 再次判斷線程池的運行狀態,如果不是運行狀態,由於之前已經把command添加到workQueue中了,
        // 這時需要移除該command
        // 執行過后通過handler使用拒絕策略對該任務進行處理,整個方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法
         * 這里傳入的參數表示:
         * 1. 第一個參數為null,表示在線程池中創建一個線程,但不去啟動;
         * 2. 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize,添加線程時根據maximumPoolSize來判斷;
         * 如果判斷workerCount大於0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 如果執行到這里,有兩種情況:
     * 1. 線程池已經不是RUNNING狀態;
     * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。
     * 這時,再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize;
     * 如果失敗則拒絕該任務
     */
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法的主要工作是在線程池中創建一個新的線程並執行,firstTask參數 用於指定新增的線程執行的第一個任務,core參數為true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize,false表示新增線程前需要判斷當前活動線程數是否少於maximumPoolSize,代碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取運行狀態
        int rs = runStateOf(c);
        /*
         * 這個if判斷
         * 如果rs >= SHUTDOWN,則表示此時不再接收新任務;
         * 接着判斷以下3個條件,只要有1個不滿足,則返回false:
         * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務
         * 2. firsTask為空
         * 3. 阻塞隊列不為空
         * 
         * 首先考慮rs == SHUTDOWN的情況
         * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;
         * 然后,如果firstTask為空,並且workQueue也為空,則返回false,
         * 因為隊列中已經沒有任務了,不需要再添加線程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取線程數
            int wc = workerCountOf(c);
            // 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
            // 這里的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較,
            // 如果為false則根據maximumPoolSize來比較。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增加workerCount,如果成功,則跳出第一個for循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失敗,則重新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據firstTask來創建Worker對象
        w = new Worker(firstTask);
        // 每一個Worker對象都會創建一個線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態;
                // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向線程池中添加線程。
                // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄着線程池中出現過的最大線程數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

關閉線程池

關閉線程池通常有如下兩種方式:

  • shutdownNow():設置線程池的狀態,還會嘗試停止正在運行或者暫停任務的線程
  • shutdown():設置線程池的狀態,只會中斷所有沒有執行任務的線程

線程池的參數配置

通常來講,根據任務的性質來分,可以划分為:計算密集型(CPU),IO密集型,混合型。

  • 計算密集型:加密,大數分解,正則等,線程數適當小一點,最大推薦:機器的Cpu核心數+1,為什么+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors()😉
  • IO密集型:讀取文件,數據庫連接,網絡通訊, 線程數適當大一點,可以設置為機器的Cpu核心數*2。
  • 混合型:盡量拆分,IO密集型>>計算密集型,拆分意義不大,IO密集型~=計算密集型
    隊列的選擇上,應該使用有界,無界隊列可能會導致內存溢出,發生OOM。

線程池的狀態

線程池的運行狀態. 線程池一共有五種狀態, 分別是:

  1. RUNNING :能接受新提交的任務,並且也能處理阻塞隊列中的任務;
  2. SHUTDOWN:關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務。在線程池處於 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程中也會調用shutdown()方法進入該狀態);
  3. STOP:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處於 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
  4. TIDYING:如果所有的任務都已終止了,workerCount (有效線程數) 為0,線程池進入該狀態后會調用 terminated() 方法進入TERMINATED 狀態。
  5. TERMINATED:在terminated() 方法執行完后進入該狀態,默認terminated()方法中什么也沒有做。
    進入TERMINATED的條件如下:
    • 線程池不是RUNNING狀態;
    • 線程池狀態不是TIDYING狀態或TERMINATED狀態;
    • 如果線程池狀態是SHUTDOWN並且workerQueue為空;
    • workerCount為0;
    • 設置TIDYING狀態成功。

下圖是線程池的狀態轉換過程,

Screen Shot 2019-12-12 at 4.39.35 PM.png

6.5 Executors內置線程池

通常開發者都是利用 Executors 提供的通用線程池創建方法,去創建不同配置的線程池,主要區別在於不同的 ExecutorService 類型或者不同的初始參數。
Executors 目前提供了 5 種不同的線程池創建配置:

  • newCachedThreadPool(),它是一種用來處理大量短時間工作任務的線程池,具有幾個鮮明特點:它會試圖緩存線程並重用,當無緩存線程可用時,就會創建新的工作線程;如果線程閑置的時間超過60秒,則被終止並移出緩存;長時間閑置時,這種線程池,不會消耗什么資源。其內部使用 SynchronousQueue 作為工作隊列。
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newFixedThreadPool(int nThreads),創建固定數目(nThreads)的線程,其背后使用的是無界的工作隊列,任何時候最多有 nThreads 個工作線程是活動的。這意味着,如果任務數量超過了活動隊列數目,將在工作隊列中等待空閑線程出現;如果有工作線程退出,將會有新的工作線程被創建,以補足指定的數目nThreads。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor(),它的特點在於工作線程數目被限制為1,操作一個無界的工作隊列,所以它保證了所有任務的都是被順序執行,最多會有一個任務處於活動狀態,並且不允許使用者改動線程池實例,因此可以避免其改變線程數目。
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • newWorkStealingPool(int parallelism),這是一個經常被人忽略的線程池,Java 8 才加入這個創建方法,其內部會構建ForkJoinPool,利用Work-Stealing算法,並行地處理任務,不保證處理順序。
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }
  • newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),創建的是個 ScheduledExecutorService,可以進行定時或周期性的工作調度,區別在於單一工作線程還是多個工作線程。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

以下是ScheduledThreadPoolExecutor的構造函數,該類繼承於ThreadPoolExecutor,可以看到任務存放在DelayedWorkQueue。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

類中提供了多種執行定時任務的方法,

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

總結下來,主要分三種:

  • schedule:只執行一次,任務還可以延時執行
  • scheduleAtFixedRate:提交固定時間間隔的任務
  • scheduleWithFixedDelay:提交固定延時間隔執行的任務

注意scheduleAtFixedRate和scheduleWithFixedDelay的區別,下圖給出了兩者執行任務時間上的示意圖。scheduleAtFixedRate總是間隔固定的時間來執行task,但是如果下圖中Task1執行超時,也就是超過了Fixed Time,當Task1執行完之后,Task2將立刻執行。scheduleWithFixedDelay不同的是,每個任務總是在上一個任務結束之后,等待固定的Fixed Delay Time后開始執行。

Screen Shot 2019-12-12 at 2.33.08 PM.png

public class ScheduleWorkerTime implements Runnable{
    public final static int Long_8 = 8;//任務耗時8秒
    public final static int Short_2 = 2;//任務耗時2秒
    public final static int Normal_5 = 5;//任務耗時5秒

    public static SimpleDateFormat formater = new SimpleDateFormat(
            "HH:mm:ss");
    public static AtomicInteger count = new AtomicInteger(0);
    
    @Override
    public void run() {
    	if(count.get()==0) {
            System.out.println("Long_8....begin:"+formater.format(new Date()));
            SleepTools.second(Long_8);
            System.out.println("Long_8....end:"+formater.format(new Date())); 
            count.incrementAndGet();
    	}else if(count.get()==1) {
    		System.out.println("Short_2 ...begin:"+formater.format(new Date()));
    		SleepTools.second(Short_2);
    		System.out.println("Short_2 ...end:"+formater.format(new Date()));
            count.incrementAndGet();    		
    	}else {
    		System.out.println("Normal_5...begin:"+formater.format(new Date()));
    		SleepTools.second(Normal_5);
    		System.out.println("Normal_5...end:"+formater.format(new Date()));
    		count.incrementAndGet(); 
    	}
    }
    
	public static void main(String[] args) {
	    	ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
	    	//任務間隔6秒
	        schedule.scheduleAtFixedRate(new ScheduleWorkerTime(),
	                0, 6000, TimeUnit.MILLISECONDS);
	}
}

代碼中定義了3個任務,分別執行8s,2s,5s,設置的固定間隔為6s。從輸出結果可以看到,第一個場任務結束后,第二個任務立刻開始執行,第二個任務執行完時,到了10s,此時等待2s后,第三個任務開始執行。由此可以看到,當前序任務沒超時,后續任務會按照指定的時間進行執行;如果有超時,則會馬上執行。

執行結果如下:
Long_8....begin:14:56:27
Long_8....end:14:56:35
Short_2 ...begin:14:56:35
Short_2 ...end:14:56:37
Normal_5...begin:14:56:39
Normal_5...end:14:56:44

注意最好在提交給ScheduledThreadPoolExecutor的任務要catch異常,否則發生異常之后,程序會終止運行。

6.6 CompletionService

使用場景

當向Executor提交多個任務並且希望獲得它們在完成之后的結果,如果用FutureTask,可以循環獲取task,並調用get方法去獲取task執行結果,但是如果task還未完成,獲取結果的線程將阻塞直到task完成,由於不知道哪個task優先執行完畢,使用這種方式效率不會很高。

在jdk5時候提出接口CompletionService,它整合了Executor和BlockingQueue的功能,可以更加方便在多個任務執行時,按任務完成順序獲取結果。

使用流程

CompletionService的使用流程如下:

  1. 聲明task執行載體,線程池executor;

  2. 聲明CompletionService,來包裝執行task的線程池,存放已完成狀態task的阻塞隊列,隊列默認為基於鏈表結構的阻塞隊列LinkedBlockingQueue;

  3. 調用submit方法提交task;

  4. 調用take方法獲取已完成狀態task。

public class CompletionServiceTest {
	
	// 聲明線程池
	private static ExecutorService executorService = Executors.newFixedThreadPool(100);
	
	public void test() {
		
		// 聲明CompletionService包裝Executor
		CompletionService<Long>  completionService = new ExecutorCompletionService<Long>(executorService);
		
		final int groupNum = 10000000 / 100;
		
		for ( int i = 1; i <= 100; i++) {
			int start = (i-1) * groupNum + 1;
			int end = i * groupNum;
			
			completionService.submit(new Callable<Long>() {
				
				@Override
				public Long call() throws Exception {
					Long sum = 0L;
					
					for (int j = start; j <= end; j++) {
						sum += j;
					}
					return sum;
				}
			});
		}
		
		long result = 0L;
		try {
			for (int i = 0; i < 100; i++) {
				long taskResult = completionService.take().get();
				System.out.println(taskResult);
				result += taskResult;
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		System.out.println("the result is " + result);
	}
	
	public static void main(String[] args) {
		new CompletionServiceTest().test();
	}
}

源碼分析

CompletionService接口提供五個方法:

  • Future submit(Callable task)
    提交Callable類型的task;

  • Future submit(Runnable task, V result)
    提交Runnable類型的task;

  • Future take() throws InterruptedException
    獲取並移除已完成狀態的task,如果目前不存在這樣的task,則等待;

  • Future poll()
    獲取並移除已完成狀態的task,如果目前不存在這樣的task,返回null;

  • Future poll(long timeout, TimeUnit unit) throws InterruptedException
    獲取並移除已完成狀態的task,如果在指定等待時間內不存在這樣的task,返回null。

CompletionService與普通用FutureTask獲取結果的最大不同是,可以按照任務完成的順序返回結果。具體是如何實現的呢?

內部封裝了一個QueueingFuture對象,並且實現了done方法,在task執行完成之后將當前task添加到completionQueue。

    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

done方法將在FutureTask的finishCompletion方法中被調用。只是默認done方法是空的,completionQueue實現了該方法。

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

參考:


本文由『后端精進之路』原創,首發於博客 http://teckee.github.io/ , 轉載請注明出處

搜索『后端精進之路』關注公眾號,立刻獲取最新文章和價值2000元的BATJ精品面試課程

后端精進之路.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM