Executor線程池的簡單使用


  我們都知道創建一個線程可以繼承Thread類或者實現Runnable接口,實際Thread類就是實現了Runnable接口。

  到今天才明白后端線程的作用:我們可以開啟線程去執行一些比較耗時的操作,類似於前台的ajax異步操作,比如說用戶上傳一個大的文件,我們可以獲取到文件之后開啟一個線程去操作該文件,但是可以提前將結果返回去,如果同步處理有可能太耗時,影響系統可用性。

1、new Thread的弊端

原生的開啟線程執行異步任務的方式:

new Thread(new Runnable() {

    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
}).start();

弊端如下:

  • 線程生命周期的開銷非常高。創建線程都會需要時間,延遲處理的請求,並且需要JVM和操作系統提供一些輔助操作。
  • 資源消耗。活躍的線程會消耗系統資源,尤其是內存。如果可運行的線程數量多於可用處理器的數量,那么有些線程將會閑置。大量空閑的線程會占用許多內存,給GC帶來壓力,而且大量線程在競爭CPU資源時會產生其他的性能開銷。
  •  穩定性。在可創建線程的數量上存在一個限制,這個限制受多個因素的制約,包括JVM的啟動參數、Thread構造函數中請求棧的大小以及底層操作系統的限制。如果破壞了這些限制,很可能拋出  outOfMemoryError異常。

  也就是說在一定的范圍內增加線程的數量可以提高系統的吞吐率,但是如果超出了這個范圍,再創建更多的線程只會降低程序的執行效率甚至導致系統的崩潰。

 例如:

 使用線程池:可以了解線程池的用法以及線程池的正確的關閉方法:shutdown之后馬上調用awaitTermination阻塞等待實現同步關閉。

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo1 {
    private static ExecutorService executorService = Executors.newFixedThreadPool(20);
    private static volatile AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 2000; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            });
        }
        executorService.shutdown();
        try {
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - startTime);
        System.out.println(atomicInteger);
    }
}

結果:

14
2000

 

package cn.qlq.thread.twenty;

import java.util.concurrent.atomic.AtomicInteger;

public class Demo2 {
    private static volatile AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 2000; i++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            });
            t.start();
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(System.currentTimeMillis() - startTime);
        System.out.println(atomicInteger);
    }
}

結果:

257
2000

  不使用線程池話費的時間比使用線程池長了好幾倍,也看出了效率問題。

 

2.核心類結構如下:

 

1、Executor是一個頂級接口,它提供了一種標准的方法將任務的提交過程與執行過程解耦開來,並用Runnable來表示任務。

2、ExecutorService擴展了Executor。添加了一些用於生命周期管理的方法(同時還提供一些用於任務提交的便利方法

3、下面兩個分支,AbstractExecutorService分支就是普通的線程池分支,ScheduledExecutorService是用來創建定時任務的。

3.Executor介紹

  線程池簡化了線程的管理工作。在Java類庫中,任務執行的主要抽象不是Thread,而是Executor,如下:

public interface Executor {
    void execute(Runnable command);
}

 

  Executor是個簡單的接口,它提供了一種標准的方法將任務的提交過程與執行過程解耦開來,並用Runnable來表示任務。Executor還提供了對生命周期的支持,以及統計信息收集、應用程序管理機制和性能監視機制。

  Executor基於"生產者-消費者"模式,提交任務的操作相當於生產者,執行任務的則相當於消費者。

生命周期:

  Executor的實現通常會創建線程來執行任務。但JVM只有在所有非守護線程全部終止才會退出。因此,如果無法正確的關閉Executor,那么JVM將無法結束。ExecutorService擴展了Executor接口,添加了一些用於生命周期管理的方法(同時還提供一些用於任務提交的便利方法

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  ExecutorService的生命周期有三種:運行、關閉和已終止。ExecutorService在創建時處於運行狀態。shutdown方法將執行平緩的關閉過程:不再接受新的任務,同時等待已經提交的任務執行完畢----包括還沒開始的任務,這種屬於正常關閉。shutdownNow方法將執行粗暴的關閉過程:它將取消所有運行中的任務,並且不再啟動隊列中尚未開始的任務,這種屬於強行關閉(關閉當前正在執行的任務,然后返回所有尚未啟動的任務清單)。

  在ExecutorService關閉后提交的任務將由"拒絕執行處理器"來處理,它會拋棄任務,或者使得execute方法拋出一個RejectedExecutionException。等所有任務執行完成后,ExecutorService將轉入終止狀態。可以調用awaitTermination來等待ExecutorService到達終止狀態,或者通過isTerminated來輪詢ExecutorService是否已經終止。通常在調用shutdown之后會立即調用awaitTermination阻塞等待,從而產生同步地關閉ExecutorService的效果。

 4.線程池--ThreadPoolExecutor

  線程池,從字面意義上看,是指管理一組同構工作線程的資源池。線程池是與工作隊列(work queue)密切相關的,其中在工作隊列保存了所有等待執行的任務。工作者線程的任務很簡單:從工作隊列中獲取一個任務並執行任務,然后返回線程池等待下一個任務。(線程池啟動初期線程不會啟動,有任務提交(調用execute或submit)才會啟動,直到到達最大數量就不再創建而是進入阻塞隊列)。

  "在線程池中執行任務"比"為每一個任務分配一個線程"優勢更多。通過重用現有的線程而不是創建新線程,可以處理多個請求時分攤在創建線程和銷毀過程中產生的巨大開銷。另外一個額外的好處是,當請求到達時,工作線程通常已經存在,因此不會由於創建線程而延遲任務的執行,從而提高了性能。

  ThreadPoolExecutor為Executor提供了一些基本實現。ThreadPoolExecutor是一個靈活的、穩定的線程池,允許各種允許機制。ThreadPoolExecutor定義了很多構造函數,最常見的是下面這個:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1、corePoolSize

  核心池的大小。在創建了線程池之后,默認情況下,線程池中沒有任何線程,而是等待有任務到來才創建線程去執行任務。默認情況下,在創建了線程池之后,線程池鍾的線程數為0,當有任務到來后就會創建一個線程去執行任務。只有在工作隊列滿了的情況下才會創建超出這個數量的線程。考慮到keepAliveTime和allowCoreThreadTimeOut超時參數的影響,所以沒有任務需要執行的時候,線程池的大小不一定是corePoolSize。

2、maximumPoolSize

  池中允許的最大線程數,這個參數表示了線程池中最多能創建的線程數量,當任務數量比corePoolSize大時,任務添加到workQueue,當workQueue滿了,並且當前線程個數小於maximumPoolSize,將繼續創建線程以處理任務,maximumPoolSize表示的就是wordQueue滿了,線程池中最多可以創建的線程數量。

3、keepAliveTime

  只有當線程池中的線程數大於corePoolSize時,這個參數才會起作用。當線程數大於corePoolSize時,終止前多余的空閑線程等待新任務的最長時間。

4、unit

  keepAliveTime時間單位

5、workQueue

  存儲還沒來得及執行的任務

6、threadFactory

  執行程序創建新線程時使用的線程工廠

7、handler

  由於超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序(拒絕執行處理器)

拒絕執行處理器實際上是定義了拒絕執行線程的行為:實際上也是一種飽和策略,當有界隊列被填滿后,飽和隊列開始發揮作用。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

 

在類庫中定義了四種實現:

1.  AbortPolicy-終止策略

  直接拋出一個RejectedExecutionException,也是JDK默認的拒絕策略

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2.CallerRunsPolicy-調運者運行策略

  如果線程池沒有被關閉,就嘗試執行任務。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

 3.DiscardOldestPolicy-拋棄最舊的策略

  如果線程池沒有關閉,就移除隊列中最先進入的任務,並且嘗試執行任務。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

4. DiscardPolicy-拋棄策略

  什么也不做,安靜的丟棄任務

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

 

補充:

線程池還有一個getPoolSize()方法,獲取線程池中當前線程的數量,當該值為0的時候,意味着沒有任何線程,線程池會終止;同一時刻,poolSize不會超過maximumPoolSize。源碼如下:

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

新提交一個任務時的處理流程很明顯:
1、如果當前線程池的線程數還沒有達到基本大小(poolSize < corePoolSize),無論是否有空閑的線程新增一個線程處理新提交的任務;
2、如果當前線程池的線程數大於或等於基本大小(poolSize >= corePoolSize) 且任務隊列未滿時,就將新提交的任務提交到阻塞隊列排隊,等候處理workQueue.offer(command);
3、如果當前線程池的線程數大於或等於基本大小(poolSize >= corePoolSize) 且任務隊列滿時;
3.1、當前poolSize<maximumPoolSize,那么就新增線程來處理任務;
3.2、當前poolSize=maximumPoolSize,那么意味着線程池的處理能力已經達到了極限,此時需要拒絕新增加的任務。至於如何拒絕處理新增的任務,取決於線程池的飽和策略RejectedExecutionHandler。

補充:一個很好的例子解釋 

核心線程數10,最大線程數30,keepAliveTime是3秒

  隨着任務數量不斷上升,線程池會不斷的創建線程,直到到達核心線程數10,就不創建線程了,這時多余的任務通過加入阻塞隊列來運行,當超出阻塞隊列長度+核心線程數時,這時不得不擴大線程個數來滿足當前任務的運行,這時就需要創建新的線程了(最大線程數起作用),上限是最大線程數30
  那么超出核心線程數10並小於最大線程數30的可能新創建的這20個線程相當於是“借”的,如果這20個線程空閑時間超過keepAliveTime,就會被退出。
1、線程為什么會空閑
  沒有任務時線程就會空閑下來,在線程池中任務是任務(Runnale)線程是線程(Worker)
2、線程為什么要退出
  通常超出核心線程的線程是“借”的,也就是說超出核心線程的情況算是一種能夠預見的異常情況,並且這種情況並不常常發生(如果常常發生,那我想你應該調整你的核心線程數了),所以這種不經常發生而創建的線程為了避免資源浪費就應該要退出

另外,需要注意,keepAliveTime設置為0時是空閑線程直接退出

5.Executors

5.1ThreadFactory

  在將這個之前先介紹一下ThreadFactory。每當線程池需要一個線程時,都是通過線程工廠創建的線程。默認的線程工廠方法將創建一個新的、非守護的線程,並且不包含特殊的線程信息。當然可以通過線程工廠定制線程的信息。此工廠也有好多實現:

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

  其實現類:

 

5.2Executors

  可以通過Executors中的靜態工廠方法之一創建一個線程池。Executors的靜態工廠可以創建常用的四種線程池:

newFixedThreadPool(采用LinkedBlockingQueue隊列--基於鏈表的阻塞隊列)

  創建一個定長線程池,每當提交一個任務時就創建一個線程,直到線程池的最大數量,這時線程池的規模將不再變化(如果由於某個線程由於發生了未預期的exception而結束,那么線程池會補充一個新的線程)。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newCachedThreadPool(使用SynchronousQueue同步隊列)

  創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。線程池的規模不受限。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newScheduledThreadPool(使用DelayedWorkQueue延遲隊列)

   創建一個定長線程池,支持定時及周期性任務執行。類似於Timer。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

 

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

 

newSingleThreadExecutor(采用LinkedBlockingQueue隊列--基於鏈表的阻塞隊列)

  創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。如果這個線程異常結束會創建一個新的線程來替代。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   

  newFixedThreadPool和newCachedThreadPool這兩個工廠方法返回通用的ThreadPoolExecutor實例,這些實例可以直接用來構造專門用途的execotor。另外上面創建的時候都有一個可以指定線程工廠的方法:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }  

  關於workqueue的選擇: DelayQueue 可以實現有序加延遲的效果。 SynchronousQueue 同步隊列,實際上它不是一個真正的隊列,因為它不會維護隊列中元素的存儲空間,與其他隊列不同的是,它維護一組線程,這些線程在等待把元素加入或移除隊列。LinkedBlockingQueue 類似於LinkedList,基於鏈表的阻塞隊列。此隊列如果不指定容量大小,默認采用Integer.MAX_VALUE(可以理解為無限隊列)。

  關於隊列的使用參考:https://www.cnblogs.com/qlqwjy/p/10175201.html

6.Java線程池的使用

下面所有的測試都是基於Myrunnale進行測試

package cn.qlq.thread.twenty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyRunnable implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MyRunnable.class);

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            log.info("threadName -> {},i->{} ", Thread.currentThread().getName(), i);
            try {
                Thread.sleep(1 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

1.FixedThreadPool的用法

  創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。在創建的時候並不會馬上創建2個線程,而是在提交任務的時候才創建線程。

創建方法:

    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

 

查看源碼:(使用了阻塞隊列,超過池子容量的線程會在隊列中等待)

 

測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo3 {
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

結果:(執行完線程並沒有銷毀)

 

 解釋:

  池子容量大小是2,所以前兩個先被執行,第三個runable只是暫時的加到等待隊列,前兩個執行完成之后線程 pool-1-thread-1空閑之后從等待隊列獲取runnable進行執行。

  定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

 

並且上面程序執行完畢之后JVM並沒有結束,因此線程池創建的線程默認是非守護線程:

 

2.CachedThreadPool

  創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

創建方法:

private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

 

查看源碼:(使用了同步隊列)

 

測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo4 {
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

結果:

 

執行完成執行線程並沒有結束

 

3.SingleThreadExecutor用法

   創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。類似於單線程執行的效果一樣。

創建方法:

    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

 

查看源碼;使用的阻塞隊列

 

測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo5 {
    private static final ExecutorService batchTaskPool = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
    }
}

結果:

 

只有一個線程在執行任務:

 

 4.ScheduledThreadPool用法------可以實現任務調度功能

   創建一個定長線程池(會指定容量初始化大小),支持定時及周期性任務執行。可以實現一次性的執行延遲任務,也可以實現周期性的執行任務。

創建方法:

    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

 

查看源碼:(使用了延遲隊列)

 

 

 測試代碼:

package cn.qlq.thread.twenty;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Demo6 {
    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            // 第一次執行是在3s后執行(延遲任務)
            batchTaskPool.schedule(new MyRunnable(), 3, TimeUnit.SECONDS);
            // 第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
            batchTaskPool.scheduleAtFixedRate(new MyRunnable(), 3, 7, TimeUnit.SECONDS);
            // 第一個參數是需要執行的任務,第二個參數是第一次的延遲時間,第三個參數是兩次執行的時間間隔,第四個參數是時間的單位
            batchTaskPool.scheduleWithFixedDelay(new MyRunnable(), 3, 5, TimeUnit.SECONDS);
        }
    }
}
schedule是一次性的任務,可以指定延遲的時間。
scheduleAtFixedRate已固定的頻率來執行某項計划(任務)
scheduleWithFixedDelay相對固定的延遲后,執行某項計划 (這個就是第一個任務執行完5s后再次執行,一般用這個方法任務調度)
  如果延遲時間傳入的是負數會立即執行,不會報非法參數錯誤。

 關於二者的區別:

  scheduleAtFixedRate :這個是按照固定的時間來執行,簡單來說:到點執行
  scheduleWithFixedDelay:這個呢,是等上一個任務結束后,在等固定的時間,然后執行。簡單來說:執行完上一個任務后再執行

舉例子

  scheduledThreadPool.scheduleAtFixedRate(new TaskTest("執行調度任務3"),0, 1, TimeUnit.SECONDS);  //這個就是每隔1秒,開啟一個新線程
  scheduledThreadPool.scheduleWithFixedDelay(new TaskTest("第四個"),0, 3, TimeUnit.SECONDS); //這個就是上一個任務執行完,3秒后開啟一個新線程

補充:比如想要實現在某一個時鍾定時晚上11點執行任務,並且每天都執行

        long curDateSecneds = 0;
        try {
            String time = "21:00:00";
            DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
            DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
            Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
            curDateSecneds = curDate.getTime();
        } catch (ParseException ignored) {
            // ignored
        }

        // 單位是s
        long initialDelay = (curDateSecneds - System.currentTimeMillis()) / 1000;
        int periodOneDaySeconds = 1 * 24 * 60 * 60;
        batchTaskPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("111222");
            }
        }, initialDelay, periodOneDaySeconds, TimeUnit.SECONDS);

注意: 上面的單位也就是四個參數是延遲時間和間隔的單位,也就是說第四個參數決定第二個和第三個參數。

 

 當然實現任務調度還可以采用quartz框架來實現,更加的靈活。參考:https://www.cnblogs.com/qlqwjy/p/8723358.html

 

5.測試創建線程池的時候傳入一個線程工廠創建的線程是守護線程且自定義線程的name

package cn.qlq.thread.twenty;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo7 {
    private static volatile AtomicInteger atomicInteger = new AtomicInteger();
    /**
     * 參數是初始化線程池子的大小
     */
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("t" + atomicInteger.incrementAndGet());
            thread.setDaemon(true);// 設置為守護線程
            return thread;
        }
    });

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new MyRunnable());
        }
        // 必須休眠。否則創建的是守護線程會直接關閉進程
        Thread.sleep(20 * 1000);
    }
}

結果:

 

補充:關於線程池中的線程銷毀問題

  線程池中的線程如果不調用shutdown()方法線程是不會銷毀的,即使是方法內部的局部變量線程池也不會銷毀;調用shutdown方法之后在所有線程執行完后會線程線程池中的線程。所以在使用線程池的時候如果是方法內部使用一定要shutdown銷毀線程,如果是全局使用的靜態線程池可以不shutdown。

例如:不調用shutdown方法不會銷毀線程。調用shutdown會銷毀線程。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TestPoolDestroy();
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);// 閉鎖
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "進入run");
                        Thread.sleep(5 * 10000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            latch.await();// 閉鎖產生同步效果
            System.out.println("三個都執行完畢");
            // batchTaskPool.shutdown();// 調用此方法等待執行完畢會銷毀線程,如果不調用此方法即使方法退出也不會銷毀線程
            System.out.println(batchTaskPool.isTerminated());
            System.out.println(batchTaskPool.isShutdown());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

結果:

 

 將上面的shutdown方法的注釋去掉再次測試,結果如下:  調用shutdown之后線程會銷毀

 

 

補充:如果想要判斷線程池中的線程是否執行完畢,或者在多個線程在線程池中執行完畢之后處理某些事情可以結合閉鎖來實現,參考:https://www.cnblogs.com/qlqwjy/p/10251610.html

 (1)閉鎖實現 (建議使用這種)

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        System.out.println("main end");
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);// 閉鎖
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "進入run");
                        Thread.sleep(5 * 1000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            latch.await();// 閉鎖產生同步效果
            System.out.println("三個都執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

結果:

pool-1-thread-1進入run
pool-1-thread-2進入run
pool-1-thread-3進入run
pool-1-thread-1退出run
pool-1-thread-3退出run
pool-1-thread-2退出run
三個都執行完畢
main end

 

(2)線程池自身攜帶的方法實現:  shuwdown后立即調用awaitTermination 實現

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolDestroy {
    public static void main(String[] args) {
        TestPoolDestroy();
        System.out.println("main end");
    }

    private static void TestPoolDestroy() {
        ExecutorService batchTaskPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            batchTaskPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "進入run");
                        Thread.sleep(5 * 1000);
                        System.out.println(Thread.currentThread().getName() + "退出run");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        try {
            batchTaskPool.shutdown();
            batchTaskPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("三個都執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

結果:

pool-1-thread-2進入run
pool-1-thread-3進入run
pool-1-thread-1進入run
pool-1-thread-3退出run
pool-1-thread-2退出run
pool-1-thread-1退出run
三個都執行完畢
main end

 

補充:例如我系統中使用的一個ExcutorService的例子:

/**
 * 同步釘釘組織結構和人員的Action
 * 
 * @author Administrator
 *
 */
@Namespace("/sync")
public class SyncGroupAndUserAndBaseInfoAction extends DMSActionSupport {

    /**
     * serialID
     */
    private static final long serialVersionUID = 3526083465788431949L;
    
    private static final ExecutorService batchTaskPool = Executors.newFixedThreadPool(2);

    private static Logger logger = LoggerFactory.getLogger(SyncGroupAndUserAndBaseInfoAction.class);

    @Autowired
    private GroupAndUserService groupService;

    @Autowired
    private BaseInfoService baseInfoService;

    /**
     * 同步基本信息的數據
     * 
     * @return
     */
    @Action(value = "syncGroupAndUser")
    public String syncGroupAndUser() {
        long startTime = System.currentTimeMillis();
        logger.info("manual sync groups and users!");

        String accessToken = FetchDataUtils.getAccessToken();
        if (StringUtils.isBlank(accessToken)) {
            setPreJs("accessToken is null!");
            return "js";
        }

        String groupStr = FetchDataUtils.getGroupStr(accessToken);
        if (StringUtils.isBlank(groupStr)) {
            setPreJs("groupStr is null");
            return "js";
        }
        
        
        Set<String> dingGroupIds = FetchDataUtils.getGroupIds(groupStr);// 釘釘同步回來的組
        //新開一個線程去獲取釘釘用戶和組織
        batchDisposeDingGroupAndUser(dingGroupIds,groupStr,accessToken);
        

        Map<String,Object> response = new HashMap<String,Object>();
        response.put("success", true);
        response.put("message", "success sync datas!");
        setPreJs(APIUtils.getJsonResultFromMap(response));
        
        long endTime = System.currentTimeMillis();
        logger.info("同步釘釘組織結構和用戶完成-----用時:{}ms",(endTime-startTime));
        return "js";
    }

    private void batchDisposeDingGroupAndUser(final Set<String> dingGroupIds, final String groupStr,final String accessToken) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                groupService.batchDisposeGroups(groupStr, dingGroupIds);
                groupService.fetchAndDisposeUsers(accessToken, dingGroupIds);                
            }
        };
        batchTaskPool.execute(run);
    }
    
}

注意:

  batchDisposeDingGroupAndUser()方法的形參必須聲明為final,否則編譯錯誤。


補充:阿里規約有一條

【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
  允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

 

阻塞隊列默認是Integer.MAX_VALUE(可以理解為無限隊列)

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

 

2) CachedThreadPool 和 ScheduledThreadPool:
  允許的創建線程數量為 Integer.MAX_VALUE, 可能會創建大量的線程,從而導致 OOM。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

 

補充:ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用

  invokeAny是啟動多個線程,相互獨立的(無同步)去計算一個結果,當某一個線程得到結果之后,立刻終止所有線程,因為只需要一個結果就夠了。

  invokeAll是啟動多個線程,相互獨立的計算結果,當全部線程都結束之后統一返回結果,相當於阻塞執行。

例如:invokeAny的測試,也可以指定最長等待時間

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.StringUtils;

public class PlainTest {

    private static ExecutorService es = Executors.newFixedThreadPool(5);
    private static List<Callable<String>> list = new ArrayList<>();

    public static void main(String[] a) throws InterruptedException, ExecutionException, TimeoutException {
        for (int i = 0; i < 6; i++) {
            final int nextInt = org.apache.commons.lang3.RandomUtils.nextInt(2, 4);
            System.out.println(nextInt + "\t" + i);

            list.add(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(nextInt * 1000);
                    return nextInt + "";
                }
            });
        }

        long start = System.currentTimeMillis();
        String invokeAny = es.invokeAny(list);
        long end = System.currentTimeMillis();
        System.out.println("用時: " + (end - start) / 1000 + " s");
        System.out.println("Result: " + invokeAny);
    }
}

結果:(可以看到休眠時間最短的最先結束也就導致后面的線程結束)

3 0
3 1
2 2
3 3
3 4
2 5
用時: 2 s
Result: 2

 

invokeAll的測試,也可以指定最長等待時間

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.StringUtils;

public class PlainTest {

    private static ExecutorService es = Executors.newFixedThreadPool(5);
    private static List<Callable<String>> list = new ArrayList<>();

    public static void main(String[] a) throws InterruptedException, ExecutionException, TimeoutException {
        for (int i = 0; i < 6; i++) {
            final int nextInt = org.apache.commons.lang3.RandomUtils.nextInt(2, 4);
            System.out.println(nextInt + "\t" + i);

            list.add(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Thread.sleep(nextInt * 1000);
                    return nextInt + "";
                }
            });
        }

        List<String> result = new ArrayList<>(6);

        long start = System.currentTimeMillis();
        List<Future<String>> invokeAll = es.invokeAll(list);
        long end = System.currentTimeMillis();
        System.out.println("用時: " + (end - start) / 1000 + " s");
        for (Future<String> future : invokeAll) {
            result.add(future.get());
        }
        System.out.println(StringUtils.join(result, ","));
    }
}

結果:(可以看出,存在阻塞,用時4s)

2 0
3 1
2 2
3 3
2 4
2 5
用時: 4 s
2,3,2,3,2,2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM