spring的 ThreadPoolTaskExecutor類最終是通過調用java 的ThreadPoolExecutor的void execute(Runnable task)方法或Future<?> submit(Runnable task)方法執行任務的
下面是spring的任務執行類和接口的繼承層次
interface Executor
void execute(Runnable command);
interface TaskExecutor extends Executor
void execute(Runnable task);
interface AsyncTaskExecutor extends TaskExecutor
void execute(Runnable task, long startTimeout);
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);
interface SchedulingTaskExecutor extends AsyncTaskExecutor
boolean prefersShortLivedTasks();
任務執行類
class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor
成員變量
private ThreadPoolExecutor threadPoolExecutor;
執行任務方法
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
return this.threadPoolExecutor;
}
基類ExecutorConfigurationSupport
abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean
其中基類CustomizableThreadFactory為自定義線程工廠類
成員變量
private ExecutorService executor;
生命周期初始化
public void afterPropertiesSet() {
initialize();
}
初始化方法
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
抽象方法(子類ThreadPoolTaskExecutor實現)
protected abstract ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
生命周期銷毀方法
public void destroy() {
shutdown();
}
ThreadPoolTaskExecutor實現ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);方法
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
我們注意到上面的方法 initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler)是父類ExecutorConfigurationSupport調用的,初始化父類成員變量private ExecutorService executor;
而ThreadPoolTaskExecutor實際執行任務是采用的自身成員變量private ThreadPoolExecutor threadPoolExecutor;
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
return this.threadPoolExecutor;
}
不明白這里為什么要這么處理?
再看其他部分
ExecutorConfigurationSupport 里面配置默認拒絕策略
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolTaskExecutor成員變量
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;//默認最大線程池
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = false;
private int queueCapacity = Integer.MAX_VALUE;//默認隊列容量
阻塞隊列創建方法
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<Runnable>(queueCapacity);
}
else {
return new SynchronousQueue<Runnable>();
}
}