看到alibaba的nacos注冊中心中client端用這個作為心跳任務工具
BeatReactor類中
executorService spring 管理。
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
threadCount 取值UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT
DEFAULT_CLIENT_BEAT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ? Runtime.getRuntime().availableProcessors() / 2 : 1;
ThreadFactory定義,線程池中的線程為守護線程。thread.setDaemon(true);
守護線程:
定義:守護線程--也稱“服務線程”,在沒有用戶線程可服務時會自動離開。
優先級:守護線程的優先級比較低,用於為系統中的其它對象和線程提供服務。
設置:通過setDaemon(true)來設置線程為“守護線程”;將一個用戶線程設置為
守護線程的方式是在 線程對象創建 之前 用線程對象的setDaemon方法。

調用
// 注冊時新建任務
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); 5秒后執行一次。
static {
DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); // 心跳超時時間
DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); // 刪除注冊服務提供者時間
DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L); // 心跳調用服務提供者時間
}
beatInfo.getPeriod()為心跳時間5秒
ScheduledThreadPoolExecutor其底層數據結果是小根堆結構。
堆結果是完全二叉樹,分為大根堆和小根堆,大根堆,父節點大於兩個子節點。小根堆,父節點小於兩個子節點。
因此每次循環都是最先到時間的任務。
new BeatReactor.BeatTask(beatInfo) 每次執行完就重新新建任務
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
public void run() {
if (!this.beatInfo.isStopped()) {
long nextTime = this.beatInfo.getPeriod();
try {
JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has("lightBeatEnabled")) {
lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0L) {
nextTime = interval;
}
int code = 10200;
if (result.has("code")) {
code = result.get("code").asInt();
}
if (code == 20404) {
Instance instance = new Instance();
instance.setPort(this.beatInfo.getPort());
instance.setIp(this.beatInfo.getIp());
instance.setWeight(this.beatInfo.getWeight());
instance.setMetadata(this.beatInfo.getMetadata());
instance.setClusterName(this.beatInfo.getCluster());
instance.setServiceName(this.beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
} catch (Exception var10) {
}
}
} catch (NacosException var11) {
LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
}
// 新創建任務
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
take方法(延遲任務實現)
take方法通過lock來實現任務獲取的線程安全性,通過一個死循環來使申請任務的工作線程等待,直到獲取到任務。獲取任務可以分為以下幾步:
獲取鎖。
若隊列為空,則當前工作線程等待,否則進行下一步。
獲取隊列頭結點,判斷該節點任務的延遲時間是否還有剩余,時間到則返回該任務節點並調整堆,否則執行下一步。
判斷leader是否為空,為空說明該任務已被其它工作線程獲取,陷入等待,否則執行下一步。
等任務執行時間到,然后獲得該任務,直接執行。
喚醒下一個等待的工作線程來獲取任務,釋放鎖。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) { // 死循環,直到獲得任務為止
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await(); // 隊列為空,工作線程等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 任務執行時間到了
return finishPoll(first); // 任務出隊,然后調整堆
// 任務執行時間未到
first = null; // don't retain ref while waiting
if (leader != null) // 該任務已被其它工作線程鎖定,等待
available.await();
else { // 可以獲取任務
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等任務執行時間到,然后執行
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal(); // 喚醒其它工作線程來獲取任務
lock.unlock();
}
}
