類圖:
其實從類圖我們能發現concurrent包(除去java.util.concurrent.atomic 和 java.util.concurrent.locks)中的內容並沒有特別多,大概分為四類:BlockingQueue阻塞隊列體系、Executor線程組執行框架、Future線程返回值體系、其他各種單獨的並發工具等。
首先學習的是Executor體系,是我們處理多線程最常接觸的內容。首先我們單獨看下繼承體系:
Executor是頂級接口,里面只有一個方法:
public interface Executor { void execute(Runnable command);---執行一個Runnable對象,Runnable在前面的文章里面已經講到,是線程的頂級接口,里面有個run方法 }
ExecutorService是我們經常用到的多線程執行框架的聲明,源碼也很少,我們逐個方法進行解釋:
public interface ExecutorService extends Executor { void shutdown();----關閉線程執行框架中的線程,但效果是不再接受新線程加入,並且等待線程執行結束后關閉Executor List<Runnable> shutdownNow();---大體同上的,但是該命令會嘗試關閉正在運行中的線程,但是也僅僅是調用terminate方法然后讓jdk去決定是否結束,同時該方法返回那些awaiting狀態的線程組 boolean isShutdown();---是否已經被關閉的狀態 boolean isTerminated();---是否已經被中止的狀態,在shutdown和shutdownnow被調用后,並且線程全部執行結束,該狀態才是true,否則都是false boolean awaitTermination(long timeout, TimeUnit unit) ---如果線程組terminate了,返回true,超時時間到了返回false。 throws InterruptedException; <T> Future<T> submit(Callable<T> task);--提交一個Callable的回調,然后執行完成后將結果放入Future對象。 <T> Future<T> submit(Runnable task, T result);--提交一個Runnable接口實現,然后result是Future返回值 Future<?> submit(Runnable task);--提交一個Runnable接口實現,如果執行完成,future.get()可以返回一個null <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)--執行提交的Callable集合,然后返回各自的執行結果的Future對象列表,每個元素isDone都是true throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)--執行提交的task集合,執行完成或者timeout之后返回結果,isDone為true throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks)--執行提交的task集合,有一個執行完成就返回結果 throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)--同上,有一個執行完成或者有timeout出現 throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService實現了ExecutorService接口,並且聲明為抽象類,然而其中一個抽象方法都沒有。。.
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {---為子類定義了一個創建RunnableFuture對象的快捷方法
return new FutureTask<T>(runnable, value);
}
---其余方法都是接口的實現方法
}
還有一個繼承了ExecutorService的接口的接口:
public interface ScheduledExecutorService extends ExecutorService { ---定義延遲執行的動作 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); ---延遲delay個時間單位后開始執行 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);---延遲delay個時間單位后執行並返回Future對象 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);在initialDelay、initialDelay+N*period分別出發 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }--initialDelay開始進行,后面每次執行成功后的dealy時間單位開始
接下來是重磅的內容ThreadPoolExecutor 也就是Executor框架的核心邏輯所在,ThreadPoolExecutor類算上注釋有2100多行,但里面有很多一部分是方法的詳細說明,還有大量的變量和private/protected的方法,真正開放出來的public方法很少,我們只要逐個分析這些public方法就可以了。
我們首先來看構造函數,構造函數有多個重載方法,目前只看參數最全的情況:
public ThreadPoolExecutor(int corePoolSize, ----池子里面的線程數量,即便idle狀態的線程也會保留這個數量 int maximumPoolSize,----池子里面能盛放最大線程數量 long keepAliveTime,----當線程數量大於core核心數量的時候,並且里有idle狀態的線程,那么最大可以被terminate的的等待時間
(就是在cpu借的線程資源如果閑置多久就必須還回去) TimeUnit unit,--timeUnit BlockingQueue<Runnable> workQueue,---任務的隊列 ThreadFactory threadFactory,---線程工廠 RejectedExecutionHandler handler) {---線程池處理不過來任務隊列的時候的默認處理方法 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
public void execute(Runnable command) {}---執行一個任務
public void shutdown() {---使用可重入鎖實現,關閉線程池執行框架,不再接受新的任務,關閉idle狀態的線程,等待正在執行的執行完成。
final ReentrantLock mainLock = this.mainLock;
}
public List<Runnable> shutdownNow() {}--- 同ExecutorService中的接口說明,與shuwdown的區別在於會嘗試關閉正在執行的線程
public boolean isShutdown() {}---是否已經關閉(沒有running狀態的線程了)
-------一堆get和set方法
public int getActiveCount() {}---獲得活動的線程數量
public long getCompletedTaskCount() {}---或者完成的任務數量
其實我們只要知道ThreadPoolExecutor的構造參數就可以明白它的工作方式,而且我們需要做的也是把這些內容構造好。
我們最常用的Executors里面的更加方便的Pool的類型其實都是為我們加了一些default參數的ThreadPoolExecutor:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
也就是其實最后都是會到ThreadPoolExecutor上面去實現,區別就是一個用着方便,一個你可以控制的粒度更小,進而可能效率更高。
我們在類圖的繼承體系里面還能發現一個叫做ForkJoinPool的類,這個類其實是很多人口中所謂未來java並發方向的類,由於涉及的內容較多,后面單獨隨筆進行學習和理解吧。