為什么要單獨講解TimedSupervisorTask這個類呢?因為這個類在我們DiscoveryClient類的initScheduledTasks方法進行定時任務初始化時被使用得比較多,所以我們需要了解下這個類,我們先看下TimedSupervisorTask這個類在initScheduledTasks的具體使用:
private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
…省略其他代碼
// 初始化定時拉取服務注冊信息
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
…省略其他代碼
// 初始化定時服務續約任務
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
…省略其他代碼
}
由此可見,TimedSupervisorTask類被使用在了定時任務的初始化中,我們具體來看看這個類的結構:
public class TimedSupervisorTask extends TimerTask {
private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
private final Runnable task;
private final AtomicLong delay;
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
// 如果出現異常,則將時間*2,然后取 定時時間 和 最長定時時間中最小的為下次任務執行的延時時間
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
我們可以仔細看看run方法的具體實現,因為這里有一個值得借鑒的設計思路!!!
我們簡單來看看這個方法具體執行流程:
1.執行submit()方法提交任務
2.執行future.get()方法,如果沒有在規定的時間得到返回值或者任務出現異常,則進入異常處理catch代碼塊。
3.如果發生異常
a. 發生TimeoutException異常,則執行Math.min(maxDelay, currentDelay ✖️ 2);得到任務延時時間 ✖️ 2 和 最大延時時間的最小值,然后改變任務的延時時間timeoutMillis(延時任務時間默認值是30s)
b.發生RejectedExecutionException異常,則將rejectedCounter值+1
c.發生Throwable異常,則將throwableCounter值+1
4.如果沒有發生異常,則再設置一次延時任務時間timeoutMillis
5.進入finally代碼塊
a.如果future不為null,則執行future.cancel(true),中斷線程停止任務
b.如果線程池沒有shutdown,則創建一個新的定時任務
\(\color{red}{注意}\):不知道有沒有小伙伴發現,不管我們的定時任務執行是成功還是結束(如果還沒有執行結束,也會被中斷),然后會再重新初始化一個新的任務。並且這個任務的延時時間還會因為不同的情況受到改變,在try代碼塊中如果不發現異常,則會重新初始化延時時間,如果發生TimeoutException異常,則會更改延時時間,更改為 任務延時時間 ✖️ 2 和 最大延時時間的最小值。所以我們會發現這樣的設計會讓整個延時任務很靈活。如果不發生異常,則延時時間不會變;如果發現異常,則增長延時時間;如果程序又恢復正常了,則延時時間又恢復成了默認值。
總結:我們在設計延時/周期性任務時就可以參考TimedSupervisorTask的實現,程序一旦遇到發生超時異常,就將間隔時間調大,如果連續超時,那么每次間隔時間都會增大一倍,一直到達外部參數設定的上限為止,一旦新任務不再發生超時異常,間隔時間又會自動恢復為初始值。