稍稍解讀下ThreadPoolExecutor


# 說說ThreadPoolExecutor


## 認識
先來看看它所在的架構體系:
```java
package java.util.concurrent;
public interface Executor { void execute(Runnable command); }
public interface ExecutorService extends Executor {
//新加一些方法
}
public abstract class AbstractExecutorService implements ExecutorService {
//新加一些方法,以及一些方法的基本實現
}
public class ThreadPoolExecutor extends AbstractExecutorService {
//新加一些方法,以及繼承方法的實現
}
```
- `Executor`接口就一個方法,用來執行Runnable。官方的說法是,將 `任務的執行``線程` 解耦和。
- `ExecutorService`接口,繼承了`Executor`,同時添加了一些管理任務的方法,如submit/invokeAll/invokeAny/shutdown/shutdownNow 等。
- `AbstractExecutorService`抽象類,實現了`ExecutorService`,提供了默認的一些實現,並添加了三個工具方法。見下圖:
- ![AbstractExecutorService.jpg](img/AbstractExecutorService.jpg)
-
- `ThreadPoolExecutor` 類,直接可用的類,相對來說很復雜,也是本文的重點。
## 構造
先從構造方法入手。
![ThreadPoolExecutorConstructor.jpg](img/ThreadPoolExecutorConstructor.jpg)

由上圖可見,有4個構造方法,大同小異。具體如下:
```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
```
可見,本質上都是在調用最后一個。它的這些參數的含義如下:
- `corePoolSize`: 注意,這是核心線程池的尺寸,但並不是初始化時線程數。當提交的任務數量少於這個數值時,不會經過`workQueue`中轉,直接創建新線程來執行。如果線程數量多於這個數值,且線程空閑(任務執行完畢)達到指定時間(keepAliveTime),則會干掉多出來的線程。 -- **需要結合下面的參來理解**
- `maximumPoolSize`: 這個很好理解,就是`線程池`所能提供的最大線程數量。
- `keepAliveTime`: 線程空閑時的存活時間 - 參考`corePoolSize`
- `unit`: 存活時間的時間單位。
- `workQueue`: 工作隊列 - 其實是任務隊列。注意,可以使用有界隊列,如ArrayBlockingQueue,也可以使用無界隊列,如LinkedBlockingQueue。區別在於,能夠接收有限/無限的任務。
- `threadFactory`: 線程工廠,提供線程用的。注意,不一定負責線程的管理,僅負責提供線程!線程的管理可能是ExecutorService負責的。
- `handler`: 任務被拒絕(執行或者提交)時的處理器。
## 怎么使用?
實際上,我們很少直接使用這個類,更多的時候是使用`Executors`這個工廠類的`#newFixedThreadPool(int nThreads)`,源碼如下:
```java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
```
對比前面的構造參數,很容易理解。
1. `corePoolSize``maximumPoolSize`相同 - 所以線程數量是一定的(當然開始的時候肯定不是,從0開始,見前面`corePoolSize`的說明部分)。
2. `keepAliveTime`**0**,所以沒有等待時間(實際上這個參數用不到,因為最后的線程數量是一定的 - 如果都使用了)。
3. `workQueue``new LinkedBlockingQueue<Runnable>()`,所以這是無界隊列,可以接收任意多任務。
這樣使用的話,`threadFactory``handler`都是默認的,前者是 `Executors.defaultThreadFactory()`,看源碼僅是在創建線程的時候添加了"線程組"、"線程名";后者是 `ThreadPoolExecutor.defaultHandler` 。源碼如下:
```java
//Executors.defaultThreadFactory() {return new DefaultThreadFactory();}
static class DefaultThreadFactory implements ThreadFactory {
// ...

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
// ...
}

//ThreadPoolExecutor.defaultHandler
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //什么都不做,直接拋出異常!
```
## 驗證
我們來驗證下:
1. `corePoolSize``workQueue`的關系 - 也就是前面提到的“當提交的任務數量少於`corePoolSize`時,直接開啟新線程,不經過`workQueue`”。
2. 當任務無法提交或執行時,直接拋出異常!
```java
@Test
public void testRejectionHandler(){
int corePoolSize = 2; //提交任務時,如果線程數低於2,則創建新線程。完成后則會始終維持最少2個線程。
int maximumPoolSize = 5; //線程池最多允許5個線程存在。-和workQueue什么關系呢?
long keepAliveTime = 5; // 線程數量超過corePoolSize時,如果空閑了,那會空閑多久。
TimeUnit timeUnit = TimeUnit.SECONDS;

// TODO 注意,ArrayBlockingQueue是有界隊列,還可以用LinkedBlockingQueue 無界隊列 - 就是可以submit無限任務!
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);//TODO 該隊列僅hold由execute方法提交的Runnable tasks。submit會調用execute!
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, workQueue);
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName());
try{
Thread.sleep(1000L * 1000); //任務一直進行,這樣線程就一直不釋放
} catch(InterruptedException e){
e.printStackTrace();
}
};
try{
for(int i = 0; i < 10; i++){ //提交10個任務
System.out.println(i);
Future<?> submit = threadPoolExecutor.submit(runnable);
//submit后,Queue里可能有內容,也可能沒有 - 可能已經移交給worker線程了!
//FIXME 下面隊列用法不對。因為 只有在線程數少於corePoolSize時,才不走隊列。
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
System.out.println("queue size: " + queue.size());
System.out.println("queue.remainingCapacity: " + queue.remainingCapacity()); // 這個可能有用
// queue.take();//wait until
System.out.println("queue.peek: " + queue.peek());//奇怪,總是同一個,難道需要同步?
// queue.poll();//retrive and remove head
}
System.out.println("----a");
threadPoolExecutor.execute(runnable); //嗯?什么時候用這個比較好?
System.out.println("----b");
} catch(Exception e){
System.out.println(threadPoolExecutor.isShutdown());
System.out.println(threadPoolExecutor.isTerminated());
System.out.println(threadPoolExecutor.isTerminating());
while(true){
try{
System.out.println(threadPoolExecutor.getQueue().take());
} catch(InterruptedException e1){
e1.printStackTrace();
}
}
}
try{
threadPoolExecutor.awaitTermination(1L, TimeUnit.HOURS);
} catch(InterruptedException e){
e.printStackTrace();
}
}
```
執行結果如下:
```txt
0
queue size: 0
queue.remainingCapacity: 2
queue.peek: null
1
pool-1-thread-1
queue size: 0
queue.remainingCapacity: 2
queue.peek: null
2
queue size: 1
pool-1-thread-2
queue.remainingCapacity: 1
queue.peek: java.util.concurrent.FutureTask@3fb4f649
3
queue size: 2
queue.remainingCapacity: 0
queue.peek: java.util.concurrent.FutureTask@3fb4f649
4
queue size: 2
queue.remainingCapacity: 0
queue.peek: java.util.concurrent.FutureTask@3fb4f649
5
pool-1-thread-3
queue size: 2
queue.remainingCapacity: 0
queue.peek: java.util.concurrent.FutureTask@3fb4f649
6
pool-1-thread-4
queue size: 2
queue.remainingCapacity: 0
queue.peek: java.util.concurrent.FutureTask@3fb4f649
7
pool-1-thread-5
false
false
false
```

能夠看到,前面2個任務提交的時候,隊列中並沒有內容,之后則一直有內容。很好的驗證了第一條。
最后5行,則是異常后輸出的內容,可以看出`threadPoolExecutor`仍在進行,但新提交的任務則被拒絕執行。- 其實這里應該吧try-catch放到循環里面,這樣可以看到后續的提交都失敗了。
感興趣的可以自己試一下,同時輸出下e.printStackTrace()。
## 其他
其實`ThreadPoolExecutor`的狀態設計非常贊,不過那是另外的事了。
套用網友一句話,“李大爺設計的api,很巧妙,處處是坑”,哈哈。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM